__all__ = (
"BaseDetailer",
"ZeroRotDetailer",
"Comcam90rotDetailer",
"Rottep2RotspDesiredDetailer",
"CloseAltDetailer",
"TakeAsPairsDetailer",
"TwilightTripleDetailer",
"FlushForSchedDetailer",
"FilterNexp",
"BandNexp",
"FixedSkyAngleDetailer",
"ParallacticRotationDetailer",
"FlushByDetailer",
"RandomFilterDetailer",
"RandomBandDetailer",
"TrackingInfoDetailer",
"AltAz2RaDecDetailer",
"StartFieldSequenceDetailer",
"BandToFilterDetailer",
)
import copy
import warnings
import numpy as np
import rubin_scheduler.scheduler.features as features
from rubin_scheduler.scheduler.utils import IntRounded, ObservationArray
from rubin_scheduler.utils import (
DEFAULT_NSIDE,
_angular_separation,
_approx_alt_az2_ra_dec,
_approx_altaz2pa,
_approx_ra_dec2_alt_az,
pseudo_parallactic_angle,
rotation_converter,
)
[docs]
class BaseDetailer:
"""
A Detailer is an object that takes a list of proposed observations and
adds "details" to them. The primary purpose is that the Markov Decision
Process does an excelent job selecting RA,Dec,band combinations, but we
may want to add additional logic such as what to set the camera rotation
angle to, or what to use for an exposure time. We could also modify the
order of the proposed observations. For Deep Drilling Fields, a detailer
could be useful for computing dither positions and modifying the exact
RA,Dec positions.
"""
def __init__(self, nside=DEFAULT_NSIDE):
""""""
# Dict to hold all the features we want to track
self.survey_features = {}
self.nside = nside
[docs]
def add_observations_array(self, observations_array, observations_hpid):
"""Like add_observation, but for loading a whole array of
observations at a time"""
for feature in self.survey_features:
self.survey_features[feature].add_observations_array(observations_array, observations_hpid)
[docs]
def add_observation(self, observation, indx=None):
"""
Parameters
----------
observation : `np.array`
An array with information about the input observation
indx : `np.array`
The indices of the healpix map that the observation overlaps
with
"""
for feature in self.survey_features:
self.survey_features[feature].add_observation(observation, indx=indx)
[docs]
def __call__(self, observation_list, conditions):
"""
Parameters
----------
observation_list : `list` of observations
The observations to detail.
conditions : `rubin_scheduler.scheduler.conditions` object
Returns
-------
ObservationArray
"""
return ObservationArray()
[docs]
class TrackingInfoDetailer(BaseDetailer):
"""Fill in lots of the different tracking strings for an observation.
Does not clobber information that has already been set"""
def __init__(self, target_name=None, science_program=None, observation_reason=None):
self.survey_features = {}
self.target_name = target_name
self.science_program = science_program
self.observation_reason = observation_reason
self.keys = ["science_program", "target_name", "observation_reason"]
def __call__(self, observation_array, conditions):
for key in self.keys:
indx = np.where((observation_array[key] == "") | (observation_array[key] is None))[0]
observation_array[key][indx] = getattr(self, key)
return observation_array
[docs]
class FlushByDetailer(BaseDetailer):
"""Set the MJD an observation should be flushed from the scheduler
queue if not yet completed.
Parameters
----------
flush_time : float
The time to flush after the current MJD. Default 60 minutes
"""
def __init__(self, flush_time=60, nside=DEFAULT_NSIDE):
self.survey_features = {}
self.nside = nside
self.flush_time = flush_time / 60 / 24.0
def __call__(self, observation_array, conditions):
observation_array["flush_by_mjd"] = conditions.mjd + self.flush_time
return observation_array
[docs]
class BandToFilterDetailer(BaseDetailer):
"""If we want to fill in the physical filter to request rather
than just the band.
Parameters
----------
band_to_filter_dict : `dict`
A dict that maps band name (usually ugrizy) to
specific filter names. Default value of None
will set the filter name to the same as the band name.
"""
def __init__(self, band_to_filter_dict=None):
self.survey_features = {}
if band_to_filter_dict is None:
self.band_to_filter_dict = {}
for bandname in "ugrizy":
self.band_to_filter_dict[bandname] = bandname
else:
self.band_to_filter_dict = band_to_filter_dict
def __call__(self, observation_array, conditions):
u_bands = np.unique(observation_array["band"])
for band in u_bands:
indx = np.where(observation_array["band"] == band)[0]
if band not in self.band_to_filter_dict.keys():
warnings.warn(
"No mapping of band %s to a filter name. Using %s as filter name" % (band, band)
)
observation_array[indx]["filter"] = band
observation_array[indx]["filter"] = self.band_to_filter_dict[band]
return observation_array
[docs]
class AltAz2RaDecDetailer(BaseDetailer):
"""Set RA,dec for an observation that only has alt,az"""
def __call__(self, observation_array, conditions):
ra, dec = _approx_alt_az2_ra_dec(
observation_array["alt"],
observation_array["az"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
observation_array["RA"] = ra
observation_array["dec"] = dec
return observation_array
[docs]
class RandomFilterDetailer(BaseDetailer):
"""Deprecated in favor of RandomBandDetailer"""
def __init__(self, filters="riz", nights_to_prep=10000, seed=42, fallback_order="rizgyu"):
warnings.warn("Deprecated in favor of RandomBandDetailer", FutureWarning)
super().__init__(
bands=filters, nights_to_prep=nights_to_prep, seed=seed, fallback_order=fallback_order
)
[docs]
class RandomBandDetailer(BaseDetailer):
"""Pick a random band for the observations
Parameters
----------
bands : `str`
The bands to randomize. Default 'riz'
nights_to_prep : `int`
The number of nights to generate random bands for.
Default 10000.
seed : number
Seed for RNG. Defaut 42
fallback_order : `str`
If the desired band is not mounted, goes through
`fallback_order` and uses the first band that is
available
"""
def __init__(self, bands="riz", nights_to_prep=10000, seed=42, fallback_order="rizgyu"):
self.survey_features = []
self.fallback_order = fallback_order
rng = np.random.default_rng(seed)
self.night2band_int = rng.integers(low=0, high=len(bands), size=nights_to_prep)
self.band_dict = {}
for i, bandname in enumerate(bands):
self.band_dict[i] = bandname
def __call__(self, observation_array, conditions):
band_to_use = self.band_dict[self.night2band_int[conditions.night]]
# Band not available
if band_to_use not in conditions.mounted_bands:
is_mounted = [bandname in conditions.mounted_bands for bandname in self.fallback_order]
indx = np.min(np.where(is_mounted))
band_to_use = self.fallback_order[indx]
observation_array["band"] = band_to_use
return observation_array
[docs]
class ParallacticRotationDetailer(BaseDetailer):
"""Set the rotator to near the parallactic angle"""
def __init__(self, telescope="rubin"):
self.rc = rotation_converter(telescope=telescope)
self.survey_features = {}
def __call__(self, observation_array, conditions, limits=[-270, 270]):
limits = np.radians(limits)
alt, az = _approx_ra_dec2_alt_az(
observation_array["RA"],
observation_array["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad)
observation_array["rotSkyPos_desired"] = obs_pa
resulting_rot_tel_pos = self.rc._rotskypos2rottelpos(observation_array["rotSkyPos_desired"], obs_pa)
indx = np.where(resulting_rot_tel_pos > np.max(limits))[0]
resulting_rot_tel_pos[indx] -= 2 * np.pi
indx = np.where(resulting_rot_tel_pos < np.min(limits))
resulting_rot_tel_pos[indx] += 2 * np.pi
# If those corrections still leave us bad, just pull it back 180.
indx = np.where(resulting_rot_tel_pos > np.max(limits))
resulting_rot_tel_pos[indx] -= np.pi
# The rotTelPos overides everything else.
observation_array["rotTelPos"] = resulting_rot_tel_pos
# if the rotSkyPos_desired isn't possible, fall back to this.
observation_array["rotTelPos_backup"] = 0
return observation_array
[docs]
class Rottep2RotspDesiredDetailer(BaseDetailer):
"""Convert all the rotTelPos values to rotSkyPos_desired"""
def __init__(self, telescope="rubin"):
self.rc = rotation_converter(telescope=telescope)
self.survey_features = {}
def __call__(self, obs_array, conditions):
alt, az = _approx_ra_dec2_alt_az(
obs_array["RA"],
obs_array["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad)
rot_sky_pos_desired = self.rc._rotskypos2rottelpos(obs_array["rotTelPos"], obs_pa)
obs_array["rotTelPos_backup"] = obs_array["rotTelPos"] + 0
obs_array["rotTelPos"] = np.nan
obs_array["rotSkyPos"] = np.nan
obs_array["rotSkyPos_desired"] = rot_sky_pos_desired
return obs_array
[docs]
class ZeroRotDetailer(BaseDetailer):
"""
Detailer to set the camera rotation to be apporximately zero in
rotTelPos.
Parameters
----------
telescope : `str`
Which telescope convention to use for setting the conversion
between rotTelPos and rotSkyPos. Default "rubin".
"""
def __init__(self, telescope="rubin", nside=DEFAULT_NSIDE):
self.rc = rotation_converter(telescope=telescope)
self.survey_features = {}
def __call__(self, observation_array, conditions):
obs_pa, alt, az = pseudo_parallactic_angle(
observation_array["RA"],
observation_array["dec"],
conditions.mjd,
np.degrees(conditions.site.longitude_rad),
np.degrees(conditions.site.latitude_rad),
)
observation_array["rotSkyPos"] = np.radians(self.rc.rottelpos2rotskypos(0.0, obs_pa))
return observation_array
[docs]
class Comcam90rotDetailer(BaseDetailer):
"""
Detailer to set the camera rotation so rotSkyPos is 0, 90, 180, or
270 degrees. Whatever is closest to rotTelPos of zero.
"""
def __init__(self, telescope="rubin", nside=DEFAULT_NSIDE):
self.rc = rotation_converter(telescope=telescope)
self.survey_features = {}
def __call__(self, obs_array, conditions):
favored_rot_sky_pos = np.radians([0.0, 90.0, 180.0, 270.0, 360.0]).reshape(5, 1)
parallactic_angle, alt, az = np.radians(
pseudo_parallactic_angle(
obs_array["RA"],
obs_array["dec"],
conditions.mjd,
np.degrees(conditions.site.longitude_rad),
np.degrees(conditions.site.latitude_rad),
)
)
# need to find the
ang_diff = np.abs(self.rc._rotskypos2rottelpos(favored_rot_sky_pos, np.radians(parallactic_angle)))
min_indxs = np.argmin(ang_diff, axis=0)
# can swap 360 and zero if needed?
obs_array["rotSkyPos"] = favored_rot_sky_pos[min_indxs].ravel()
return obs_array
[docs]
class StartFieldSequenceDetailer(BaseDetailer):
"""Prepend a sequence of observations to the start of an array
Parameters
----------
sequence_obs : `ObservationArray`
ObservationArray object. The observations should
have "scheduler_note" and/or "science_program" set.
ang_distance_match : `float`
How close should an observation be on the sky to be considered
matching (degrees).
time_match_hours : `float`
How close in time to demand an observation be matching (hours).
science_program : `str` or None
The science_program to match against. Default None.
scheduler_note : `str` or None
The scheduler_note to match observations against.
Default "starting_sequence".
ra : `float`
RA to match against. Default 0 (degrees). Ignored
if ang_distance_match is None.
dec : `float`
Dec to match observations against. Default 0 (degrees).
Ignored if ang_distance_match is None.
"""
def __init__(
self,
sequence_obs,
ang_distance_match=3.5,
time_match_hours=5,
science_program=None,
scheduler_note="starting_sequence",
ra=0,
dec=0,
):
super().__init__()
self.survey_features["last_matching"] = features.LastObservedMatching(
ang_distance_match=3.5,
science_program=science_program,
scheduler_note=scheduler_note,
ra=ra,
dec=dec,
)
self.ang_distance_match = np.radians(ang_distance_match)
self.time_match = time_match_hours / 24.0
self.science_program = science_program
self.scheduler_note = scheduler_note
# Make backwards compatible if someone sent in a list
if isinstance(sequence_obs, list):
warnings.warn("sequence_obs should be ObsArray, not list of ObsArray. Concatenating")
sequence_obs = np.concatenate(sequence_obs)
self.sequence_obs = sequence_obs
self.sequence_obs["science_program"] = self.science_program
self.sequence_obs["scheduler_note"] = self.scheduler_note
# Check that things are sensibly set
u_scip = np.unique(sequence_obs["science_program"])
u_sn = np.unique(sequence_obs["scheduler_note"])
if (np.size(u_scip) > 1) | (np.size(u_sn) > 1):
msg = (
"The science_program and/or scheduler_note " "values in sequence_obs_list should be the same."
)
raise ValueError(msg)
if science_program is not None:
if self.science_program != u_scip:
ValueError("science_program kwarg not equal to science_programs from sequence_obs_list")
if scheduler_note is not None:
if self.scheduler_note != u_sn:
ValueError("scheduler_note kwarg not equal to scheduler_notes from sequence_obs_list")
def __call__(self, observation_array, conditions):
# Do we need to add the opening sequence?
if (conditions.mjd - self.survey_features["last_matching"].feature["mjd"]) >= self.time_match:
observation_array = np.concatenate([self.sequence_obs, observation_array])
return observation_array
[docs]
class FixedSkyAngleDetailer(BaseDetailer):
"""Detailer to force a specific sky angle.
Parameters
----------
sky_angle : `float`, optional
Desired sky angle (default = 0, in degrees).
"""
def __init__(self, sky_angle=0.0, nside=DEFAULT_NSIDE):
super().__init__(nside=nside)
self.sky_angle = np.radians(sky_angle)
def __call__(self, observation_array, conditions):
observation_array["rotSkyPos"] = self.sky_angle
return observation_array
[docs]
class CloseAltDetailer(BaseDetailer):
"""
re-order a list of observations so that the closest in altitude to
the current pointing is first.
Parameters
----------
alt_band : `float` (10)
The altitude band to try and stay in (degrees)
"""
def __init__(self, alt_band=10.0):
super(CloseAltDetailer, self).__init__()
self.alt_band = IntRounded(np.radians(alt_band))
def __call__(self, obs_array, conditions):
alt, az = _approx_ra_dec2_alt_az(
obs_array["RA"],
obs_array["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
alt_diff = np.abs(alt - conditions.tel_alt)
in_band = np.where(IntRounded(alt_diff) <= self.alt_band)[0]
if in_band.size == 0:
in_band = np.arange(alt.size)
# Find the closest in angular distance of the points that are in band
ang_dist = _angular_separation(az[in_band], alt[in_band], conditions.tel_az, conditions.tel_alt)
if np.size(ang_dist) == 1:
good = 0
else:
good = np.min(np.where(ang_dist == ang_dist.min())[0])
indx = in_band[good]
result = np.concatenate([obs_array[indx:], obs_array[:indx]])
return result
[docs]
class FlushForSchedDetailer(BaseDetailer):
"""Update the flush-by MJD to be before any scheduled observations
Parameters
----------
tol : `float`
How much before to flush (minutes)
"""
def __init__(self, tol=2.5):
super(FlushForSchedDetailer, self).__init__()
self.tol = tol / 24.0 / 60.0 # To days
def __call__(self, observation_array, conditions):
if np.size(conditions.scheduled_observations) > 0:
new_flush = np.min(conditions.scheduled_observations) - self.tol
indx = np.where(observation_array["flush_by_mjd"] > new_flush)[0]
observation_array[indx]["flush_by_mjd"] = new_flush
return observation_array
[docs]
class BandNexp(BaseDetailer):
"""Demand one band always be taken as a certain number of exposures"""
def __init__(self, bandname="u", nexp=1, exptime=None):
super(BandNexp, self).__init__()
self.bandname = bandname
self.nexp = nexp
self.exptime = exptime
def __call__(self, observation_array, conditions):
indx = np.where(observation_array["band"] == self.bandname)[0]
observation_array["nexp"][indx] = self.nexp
if self.exptime is not None:
observation_array["exptime"][indx] = self.exptime
return observation_array
[docs]
class FilterNexp(BandNexp):
"""Deprecated in favor of BandNexp"""
def __init__(self, filtername="u", nexp=1, exptime=None):
warnings.warn("Deprecated in favor of BandNexp", FutureWarning)
super().__init__(bandname=filtername, nexp=nexp, exptime=exptime)
[docs]
class TakeAsPairsDetailer(BaseDetailer):
def __init__(self, bandname="r", exptime=None, nexp_dict=None, filtername=None):
if filtername is not None:
warnings.warn("filtername deprecated in favor of bandname", FutureWarning)
bandname = filtername
super(TakeAsPairsDetailer, self).__init__()
self.bandname = bandname
self.exptime = exptime
self.nexp_dict = nexp_dict
def __call__(self, observation_array, conditions):
paired = copy.deepcopy(observation_array)
if self.exptime is not None:
paired["exptime"] = self.exptime
paired["band"] = self.bandname
if self.nexp_dict is not None:
paired["nexp"] = self.nexp_dict[self.bandname]
if conditions.current_band == self.bandname:
tags = ["a", "b"]
else:
tags = ["b", "a"]
paired["scheduler_note"] = np.char.add(paired["scheduler_note"], ", %s" % tags[0])
observation_array["scheduler_note"] = np.char.add(
observation_array["scheduler_note"], ", %s" % tags[1]
)
# Try to avoid extra filter changes and have "a" tag
# come first.
if conditions.current_band == self.bandname:
result = np.concatenate([paired, observation_array])
else:
result = np.concatenate([observation_array, paired])
return result
[docs]
class TwilightTripleDetailer(BaseDetailer):
def __init__(self, slew_estimate=5.0, n_repeat=3, update_note=True):
super(TwilightTripleDetailer, self).__init__()
self.slew_estimate = slew_estimate
self.n_repeat = n_repeat
self.update_note = update_note
def __call__(self, obs_array, conditions):
# Estimate how much time is left in the twilgiht block
potential_times = np.array(
[
conditions.sun_n18_setting - conditions.mjd,
conditions.sun_n12_rising - conditions.mjd,
]
)
potential_times = np.min(potential_times[np.where(potential_times > 0)]) * 24.0 * 3600.0
# How long will observations take?
cumulative_slew = np.arange(obs_array.size) * self.slew_estimate
cumulative_expt = np.cumsum(obs_array["exptime"])
cumulative_time = cumulative_slew + cumulative_expt
# If we are way over, truncate the list before doing the triple
if np.max(cumulative_time) > potential_times:
max_indx = np.where(cumulative_time / self.n_repeat <= potential_times)[0]
if np.size(max_indx) == 0:
# Very bad magic number fudge
max_indx = 3
else:
max_indx = np.max(max_indx)
if max_indx == 0:
max_indx += 1
obs_array = obs_array[0:max_indx]
# Repeat the observations n times
out_obs = []
for i in range(self.n_repeat):
sub_arr = copy.deepcopy(obs_array)
if self.update_note:
sub_arr["scheduler_note"] = np.char.add(sub_arr["scheduler_note"], ", %i" % i)
out_obs.append(sub_arr)
return np.concatenate(out_obs)