Source code for rubin_scheduler.scheduler.detailers.detailer

__all__ = (
    "BaseDetailer",
    "ZeroRotDetailer",
    "Comcam90rotDetailer",
    "Rottep2RotspDesiredDetailer",
    "CloseAltDetailer",
    "TakeAsPairsDetailer",
    "TwilightTripleDetailer",
    "FlushForSchedDetailer",
    "FilterNexp",
    "BandNexp",
    "FixedSkyAngleDetailer",
    "ParallacticRotationDetailer",
    "FlushByDetailer",
    "RandomFilterDetailer",
    "RandomBandDetailer",
    "TrackingInfoDetailer",
    "AltAz2RaDecDetailer",
    "StartFieldSequenceDetailer",
    "BandToFilterDetailer",
)

import copy
import warnings

import numpy as np

import rubin_scheduler.scheduler.features as features
from rubin_scheduler.scheduler.utils import IntRounded, ObservationArray
from rubin_scheduler.utils import (
    DEFAULT_NSIDE,
    _angular_separation,
    _approx_alt_az2_ra_dec,
    _approx_altaz2pa,
    _approx_ra_dec2_alt_az,
    pseudo_parallactic_angle,
    rotation_converter,
)


[docs] class BaseDetailer: """ A Detailer is an object that takes a list of proposed observations and adds "details" to them. The primary purpose is that the Markov Decision Process does an excelent job selecting RA,Dec,band combinations, but we may want to add additional logic such as what to set the camera rotation angle to, or what to use for an exposure time. We could also modify the order of the proposed observations. For Deep Drilling Fields, a detailer could be useful for computing dither positions and modifying the exact RA,Dec positions. """ def __init__(self, nside=DEFAULT_NSIDE): """""" # Dict to hold all the features we want to track self.survey_features = {} self.nside = nside
[docs] def add_observations_array(self, observations_array, observations_hpid): """Like add_observation, but for loading a whole array of observations at a time""" for feature in self.survey_features: self.survey_features[feature].add_observations_array(observations_array, observations_hpid)
[docs] def add_observation(self, observation, indx=None): """ Parameters ---------- observation : `np.array` An array with information about the input observation indx : `np.array` The indices of the healpix map that the observation overlaps with """ for feature in self.survey_features: self.survey_features[feature].add_observation(observation, indx=indx)
[docs] def __call__(self, observation_list, conditions): """ Parameters ---------- observation_list : `list` of observations The observations to detail. conditions : `rubin_scheduler.scheduler.conditions` object Returns ------- ObservationArray """ return ObservationArray()
[docs] class TrackingInfoDetailer(BaseDetailer): """Fill in lots of the different tracking strings for an observation. Does not clobber information that has already been set""" def __init__(self, target_name=None, science_program=None, observation_reason=None): self.survey_features = {} self.target_name = target_name self.science_program = science_program self.observation_reason = observation_reason self.keys = ["science_program", "target_name", "observation_reason"] def __call__(self, observation_array, conditions): for key in self.keys: indx = np.where((observation_array[key] == "") | (observation_array[key] is None))[0] observation_array[key][indx] = getattr(self, key) return observation_array
[docs] class FlushByDetailer(BaseDetailer): """Set the MJD an observation should be flushed from the scheduler queue if not yet completed. Parameters ---------- flush_time : float The time to flush after the current MJD. Default 60 minutes """ def __init__(self, flush_time=60, nside=DEFAULT_NSIDE): self.survey_features = {} self.nside = nside self.flush_time = flush_time / 60 / 24.0 def __call__(self, observation_array, conditions): observation_array["flush_by_mjd"] = conditions.mjd + self.flush_time return observation_array
[docs] class BandToFilterDetailer(BaseDetailer): """If we want to fill in the physical filter to request rather than just the band. Parameters ---------- band_to_filter_dict : `dict` A dict that maps band name (usually ugrizy) to specific filter names. Default value of None will set the filter name to the same as the band name. """ def __init__(self, band_to_filter_dict=None): self.survey_features = {} if band_to_filter_dict is None: self.band_to_filter_dict = {} for bandname in "ugrizy": self.band_to_filter_dict[bandname] = bandname else: self.band_to_filter_dict = band_to_filter_dict def __call__(self, observation_array, conditions): u_bands = np.unique(observation_array["band"]) for band in u_bands: indx = np.where(observation_array["band"] == band)[0] if band not in self.band_to_filter_dict.keys(): warnings.warn( "No mapping of band %s to a filter name. Using %s as filter name" % (band, band) ) observation_array[indx]["filter"] = band observation_array[indx]["filter"] = self.band_to_filter_dict[band] return observation_array
[docs] class AltAz2RaDecDetailer(BaseDetailer): """Set RA,dec for an observation that only has alt,az""" def __call__(self, observation_array, conditions): ra, dec = _approx_alt_az2_ra_dec( observation_array["alt"], observation_array["az"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) observation_array["RA"] = ra observation_array["dec"] = dec return observation_array
[docs] class RandomFilterDetailer(BaseDetailer): """Deprecated in favor of RandomBandDetailer""" def __init__(self, filters="riz", nights_to_prep=10000, seed=42, fallback_order="rizgyu"): warnings.warn("Deprecated in favor of RandomBandDetailer", FutureWarning) super().__init__( bands=filters, nights_to_prep=nights_to_prep, seed=seed, fallback_order=fallback_order )
[docs] class RandomBandDetailer(BaseDetailer): """Pick a random band for the observations Parameters ---------- bands : `str` The bands to randomize. Default 'riz' nights_to_prep : `int` The number of nights to generate random bands for. Default 10000. seed : number Seed for RNG. Defaut 42 fallback_order : `str` If the desired band is not mounted, goes through `fallback_order` and uses the first band that is available """ def __init__(self, bands="riz", nights_to_prep=10000, seed=42, fallback_order="rizgyu"): self.survey_features = [] self.fallback_order = fallback_order rng = np.random.default_rng(seed) self.night2band_int = rng.integers(low=0, high=len(bands), size=nights_to_prep) self.band_dict = {} for i, bandname in enumerate(bands): self.band_dict[i] = bandname def __call__(self, observation_array, conditions): band_to_use = self.band_dict[self.night2band_int[conditions.night]] # Band not available if band_to_use not in conditions.mounted_bands: is_mounted = [bandname in conditions.mounted_bands for bandname in self.fallback_order] indx = np.min(np.where(is_mounted)) band_to_use = self.fallback_order[indx] observation_array["band"] = band_to_use return observation_array
[docs] class ParallacticRotationDetailer(BaseDetailer): """Set the rotator to near the parallactic angle""" def __init__(self, telescope="rubin"): self.rc = rotation_converter(telescope=telescope) self.survey_features = {} def __call__(self, observation_array, conditions, limits=[-270, 270]): limits = np.radians(limits) alt, az = _approx_ra_dec2_alt_az( observation_array["RA"], observation_array["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) observation_array["rotSkyPos_desired"] = obs_pa resulting_rot_tel_pos = self.rc._rotskypos2rottelpos(observation_array["rotSkyPos_desired"], obs_pa) indx = np.where(resulting_rot_tel_pos > np.max(limits))[0] resulting_rot_tel_pos[indx] -= 2 * np.pi indx = np.where(resulting_rot_tel_pos < np.min(limits)) resulting_rot_tel_pos[indx] += 2 * np.pi # If those corrections still leave us bad, just pull it back 180. indx = np.where(resulting_rot_tel_pos > np.max(limits)) resulting_rot_tel_pos[indx] -= np.pi # The rotTelPos overides everything else. observation_array["rotTelPos"] = resulting_rot_tel_pos # if the rotSkyPos_desired isn't possible, fall back to this. observation_array["rotTelPos_backup"] = 0 return observation_array
[docs] class Rottep2RotspDesiredDetailer(BaseDetailer): """Convert all the rotTelPos values to rotSkyPos_desired""" def __init__(self, telescope="rubin"): self.rc = rotation_converter(telescope=telescope) self.survey_features = {} def __call__(self, obs_array, conditions): alt, az = _approx_ra_dec2_alt_az( obs_array["RA"], obs_array["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) rot_sky_pos_desired = self.rc._rotskypos2rottelpos(obs_array["rotTelPos"], obs_pa) obs_array["rotTelPos_backup"] = obs_array["rotTelPos"] + 0 obs_array["rotTelPos"] = np.nan obs_array["rotSkyPos"] = np.nan obs_array["rotSkyPos_desired"] = rot_sky_pos_desired return obs_array
[docs] class ZeroRotDetailer(BaseDetailer): """ Detailer to set the camera rotation to be apporximately zero in rotTelPos. Parameters ---------- telescope : `str` Which telescope convention to use for setting the conversion between rotTelPos and rotSkyPos. Default "rubin". """ def __init__(self, telescope="rubin", nside=DEFAULT_NSIDE): self.rc = rotation_converter(telescope=telescope) self.survey_features = {} def __call__(self, observation_array, conditions): obs_pa, alt, az = pseudo_parallactic_angle( observation_array["RA"], observation_array["dec"], conditions.mjd, np.degrees(conditions.site.longitude_rad), np.degrees(conditions.site.latitude_rad), ) observation_array["rotSkyPos"] = np.radians(self.rc.rottelpos2rotskypos(0.0, obs_pa)) return observation_array
[docs] class Comcam90rotDetailer(BaseDetailer): """ Detailer to set the camera rotation so rotSkyPos is 0, 90, 180, or 270 degrees. Whatever is closest to rotTelPos of zero. """ def __init__(self, telescope="rubin", nside=DEFAULT_NSIDE): self.rc = rotation_converter(telescope=telescope) self.survey_features = {} def __call__(self, obs_array, conditions): favored_rot_sky_pos = np.radians([0.0, 90.0, 180.0, 270.0, 360.0]).reshape(5, 1) parallactic_angle, alt, az = np.radians( pseudo_parallactic_angle( obs_array["RA"], obs_array["dec"], conditions.mjd, np.degrees(conditions.site.longitude_rad), np.degrees(conditions.site.latitude_rad), ) ) # need to find the ang_diff = np.abs(self.rc._rotskypos2rottelpos(favored_rot_sky_pos, np.radians(parallactic_angle))) min_indxs = np.argmin(ang_diff, axis=0) # can swap 360 and zero if needed? obs_array["rotSkyPos"] = favored_rot_sky_pos[min_indxs].ravel() return obs_array
[docs] class StartFieldSequenceDetailer(BaseDetailer): """Prepend a sequence of observations to the start of an array Parameters ---------- sequence_obs : `ObservationArray` ObservationArray object. The observations should have "scheduler_note" and/or "science_program" set. ang_distance_match : `float` How close should an observation be on the sky to be considered matching (degrees). time_match_hours : `float` How close in time to demand an observation be matching (hours). science_program : `str` or None The science_program to match against. Default None. scheduler_note : `str` or None The scheduler_note to match observations against. Default "starting_sequence". ra : `float` RA to match against. Default 0 (degrees). Ignored if ang_distance_match is None. dec : `float` Dec to match observations against. Default 0 (degrees). Ignored if ang_distance_match is None. """ def __init__( self, sequence_obs, ang_distance_match=3.5, time_match_hours=5, science_program=None, scheduler_note="starting_sequence", ra=0, dec=0, ): super().__init__() self.survey_features["last_matching"] = features.LastObservedMatching( ang_distance_match=3.5, science_program=science_program, scheduler_note=scheduler_note, ra=ra, dec=dec, ) self.ang_distance_match = np.radians(ang_distance_match) self.time_match = time_match_hours / 24.0 self.science_program = science_program self.scheduler_note = scheduler_note # Make backwards compatible if someone sent in a list if isinstance(sequence_obs, list): warnings.warn("sequence_obs should be ObsArray, not list of ObsArray. Concatenating") sequence_obs = np.concatenate(sequence_obs) self.sequence_obs = sequence_obs self.sequence_obs["science_program"] = self.science_program self.sequence_obs["scheduler_note"] = self.scheduler_note # Check that things are sensibly set u_scip = np.unique(sequence_obs["science_program"]) u_sn = np.unique(sequence_obs["scheduler_note"]) if (np.size(u_scip) > 1) | (np.size(u_sn) > 1): msg = ( "The science_program and/or scheduler_note " "values in sequence_obs_list should be the same." ) raise ValueError(msg) if science_program is not None: if self.science_program != u_scip: ValueError("science_program kwarg not equal to science_programs from sequence_obs_list") if scheduler_note is not None: if self.scheduler_note != u_sn: ValueError("scheduler_note kwarg not equal to scheduler_notes from sequence_obs_list") def __call__(self, observation_array, conditions): # Do we need to add the opening sequence? if (conditions.mjd - self.survey_features["last_matching"].feature["mjd"]) >= self.time_match: observation_array = np.concatenate([self.sequence_obs, observation_array]) return observation_array
[docs] class FixedSkyAngleDetailer(BaseDetailer): """Detailer to force a specific sky angle. Parameters ---------- sky_angle : `float`, optional Desired sky angle (default = 0, in degrees). """ def __init__(self, sky_angle=0.0, nside=DEFAULT_NSIDE): super().__init__(nside=nside) self.sky_angle = np.radians(sky_angle) def __call__(self, observation_array, conditions): observation_array["rotSkyPos"] = self.sky_angle return observation_array
[docs] class CloseAltDetailer(BaseDetailer): """ re-order a list of observations so that the closest in altitude to the current pointing is first. Parameters ---------- alt_band : `float` (10) The altitude band to try and stay in (degrees) """ def __init__(self, alt_band=10.0): super(CloseAltDetailer, self).__init__() self.alt_band = IntRounded(np.radians(alt_band)) def __call__(self, obs_array, conditions): alt, az = _approx_ra_dec2_alt_az( obs_array["RA"], obs_array["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) alt_diff = np.abs(alt - conditions.tel_alt) in_band = np.where(IntRounded(alt_diff) <= self.alt_band)[0] if in_band.size == 0: in_band = np.arange(alt.size) # Find the closest in angular distance of the points that are in band ang_dist = _angular_separation(az[in_band], alt[in_band], conditions.tel_az, conditions.tel_alt) if np.size(ang_dist) == 1: good = 0 else: good = np.min(np.where(ang_dist == ang_dist.min())[0]) indx = in_band[good] result = np.concatenate([obs_array[indx:], obs_array[:indx]]) return result
[docs] class FlushForSchedDetailer(BaseDetailer): """Update the flush-by MJD to be before any scheduled observations Parameters ---------- tol : `float` How much before to flush (minutes) """ def __init__(self, tol=2.5): super(FlushForSchedDetailer, self).__init__() self.tol = tol / 24.0 / 60.0 # To days def __call__(self, observation_array, conditions): if np.size(conditions.scheduled_observations) > 0: new_flush = np.min(conditions.scheduled_observations) - self.tol indx = np.where(observation_array["flush_by_mjd"] > new_flush)[0] observation_array[indx]["flush_by_mjd"] = new_flush return observation_array
[docs] class BandNexp(BaseDetailer): """Demand one band always be taken as a certain number of exposures""" def __init__(self, bandname="u", nexp=1, exptime=None): super(BandNexp, self).__init__() self.bandname = bandname self.nexp = nexp self.exptime = exptime def __call__(self, observation_array, conditions): indx = np.where(observation_array["band"] == self.bandname)[0] observation_array["nexp"][indx] = self.nexp if self.exptime is not None: observation_array["exptime"][indx] = self.exptime return observation_array
[docs] class FilterNexp(BandNexp): """Deprecated in favor of BandNexp""" def __init__(self, filtername="u", nexp=1, exptime=None): warnings.warn("Deprecated in favor of BandNexp", FutureWarning) super().__init__(bandname=filtername, nexp=nexp, exptime=exptime)
[docs] class TakeAsPairsDetailer(BaseDetailer): def __init__(self, bandname="r", exptime=None, nexp_dict=None, filtername=None): if filtername is not None: warnings.warn("filtername deprecated in favor of bandname", FutureWarning) bandname = filtername super(TakeAsPairsDetailer, self).__init__() self.bandname = bandname self.exptime = exptime self.nexp_dict = nexp_dict def __call__(self, observation_array, conditions): paired = copy.deepcopy(observation_array) if self.exptime is not None: paired["exptime"] = self.exptime paired["band"] = self.bandname if self.nexp_dict is not None: paired["nexp"] = self.nexp_dict[self.bandname] if conditions.current_band == self.bandname: tags = ["a", "b"] else: tags = ["b", "a"] paired["scheduler_note"] = np.char.add(paired["scheduler_note"], ", %s" % tags[0]) observation_array["scheduler_note"] = np.char.add( observation_array["scheduler_note"], ", %s" % tags[1] ) # Try to avoid extra filter changes and have "a" tag # come first. if conditions.current_band == self.bandname: result = np.concatenate([paired, observation_array]) else: result = np.concatenate([observation_array, paired]) return result
[docs] class TwilightTripleDetailer(BaseDetailer): def __init__(self, slew_estimate=5.0, n_repeat=3, update_note=True): super(TwilightTripleDetailer, self).__init__() self.slew_estimate = slew_estimate self.n_repeat = n_repeat self.update_note = update_note def __call__(self, obs_array, conditions): # Estimate how much time is left in the twilgiht block potential_times = np.array( [ conditions.sun_n18_setting - conditions.mjd, conditions.sun_n12_rising - conditions.mjd, ] ) potential_times = np.min(potential_times[np.where(potential_times > 0)]) * 24.0 * 3600.0 # How long will observations take? cumulative_slew = np.arange(obs_array.size) * self.slew_estimate cumulative_expt = np.cumsum(obs_array["exptime"]) cumulative_time = cumulative_slew + cumulative_expt # If we are way over, truncate the list before doing the triple if np.max(cumulative_time) > potential_times: max_indx = np.where(cumulative_time / self.n_repeat <= potential_times)[0] if np.size(max_indx) == 0: # Very bad magic number fudge max_indx = 3 else: max_indx = np.max(max_indx) if max_indx == 0: max_indx += 1 obs_array = obs_array[0:max_indx] # Repeat the observations n times out_obs = [] for i in range(self.n_repeat): sub_arr = copy.deepcopy(obs_array) if self.update_note: sub_arr["scheduler_note"] = np.char.add(sub_arr["scheduler_note"], ", %i" % i) out_obs.append(sub_arr) return np.concatenate(out_obs)