Source code for rubin_scheduler.scheduler.basis_functions.basis_functions

__all__ = (
    "BaseBasisFunction",
    "HealpixLimitedBasisFunctionMixin",
    "ConstantBasisFunction",
    "SimpleArrayBasisFunction",
    "DelayStartBasisFunction",
    "AvoidFastRevisitsBasisFunction",
    "VisitRepeatBasisFunction",
    "M5DiffBasisFunction",
    "M5DiffAtHpixBasisFunction",
    "StrictFilterBasisFunction",
    "FilterChangeBasisFunction",
    "SlewtimeBasisFunction",
    "CadenceEnhanceBasisFunction",
    "CadenceEnhanceTrapezoidBasisFunction",
    "AzimuthBasisFunction",
    "AzModuloBasisFunction",
    "DecModuloBasisFunction",
    "MapModuloBasisFunction",
    "SeasonCoverageBasisFunction",
    "NObsPerYearBasisFunction",
    "CadenceInSeasonBasisFunction",
    "NearSunHighAirmassBasisFunction",
    "NObsHighAmBasisFunction",
    "GoodSeeingBasisFunction",
    "EclipticBasisFunction",
    "VisitGap",
    "NGoodSeeingBasisFunction",
    "AvoidDirectWind",
    "BalanceVisits",
    "RewardNObsSequence",
    "FilterDistBasisFunction",
    "RewardRisingBasisFunction",
    "send_unused_deprecation_warning",
)

import warnings

import healpy as hp
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord

from rubin_scheduler.scheduler import features, utils
from rubin_scheduler.scheduler.utils import IntRounded, get_current_footprint
from rubin_scheduler.skybrightness_pre import dark_m5
from rubin_scheduler.utils import DEFAULT_NSIDE, SURVEY_START_MJD, _hpid2_ra_dec


def send_unused_deprecation_warning(name):
    message = (
        f"The basis function {name} is not in use by the current "
        "baseline scheduler and may be deprecated shortly. "
        "Please contact the rubin_scheduler maintainers if "
        "this is in use elsewhere."
    )
    warnings.warn(message, FutureWarning)


[docs] class BaseBasisFunction: """Class that takes features and computes a reward function when called.""" def __init__(self, nside=DEFAULT_NSIDE, filtername=None, **kwargs): # Set if basis function needs to be recalculated if there is a new # observation self.update_on_newobs = True # Set if basis function needs to be recalculated if conditions # change self.update_on_mjd = True # Dict to hold all the features we want to track self.survey_features = {} # Keep track of the last time the basis function was called. # If mjd doesn't change, use cached value self.mjd_last = None self.value = 0 # list the attributes to compare to check if basis functions # are equal. self.attrs_to_compare = [] # Do we need to recalculate the basis function self.recalc = True # Basis functions don't technically all need an nside, but so # many do might as well set it here if nside is None: self.nside = utils.set_default_nside() else: self.nside = nside self.filtername = filtername
[docs] def add_observations_array(self, observations_array, observations_hpid): """Similar to add_observation, but for loading a whole array of observations at a time. Parameters ---------- observations_array_in : `np.array` An array of completed observations (with columns like rubin_scheduler.scheduler.utils.ObservationArray). Should be sorted by MJD. observations_hpid_in : `np.array` Same as observations_array_in, but larger and with an additional column for HEALpix id. Each observation is listed multiple times, once for every HEALpix it overlaps. """ for feature in self.survey_features: self.survey_features[feature].add_observations_array(observations_array, observations_hpid) if self.update_on_newobs: self.recalc = True
[docs] def add_observation(self, observation, indx=None): """ Parameters ---------- observation : `np.array` An array with information about the input observation indx : `np.array` The indices of the healpix map that the observation overlaps with """ for feature in self.survey_features: self.survey_features[feature].add_observation(observation, indx=indx) if self.update_on_newobs: self.recalc = True
[docs] def check_feasibility(self, conditions): """If there is logic to decide if something is feasible (e.g., only if moon is down), it can be calculated here. Helps prevent full __call__ from being called more than needed. """ return True
def _calc_value(self, conditions, **kwargs): self.value = 0 # Update the last time we had an mjd self.mjd_last = conditions.mjd + 0 self.recalc = False return self.value def __eq__(self): # XXX--to work on if we need to make a registry of basis functions. pass def __ne__(self): pass
[docs] def __call__(self, conditions, **kwargs): """ Parameters ---------- conditions : `rubin_scheduler.scheduler.features.conditions` object Object that has attributes for all the current conditions. Return a reward healpix map or a reward scalar. """ # If we are not feasible, return -inf if not self.check_feasibility(conditions): return -np.inf if self.recalc: self.value = self._calc_value(conditions, **kwargs) if self.update_on_mjd: if conditions.mjd != self.mjd_last: self.value = self._calc_value(conditions, **kwargs) return self.value
[docs] def label(self): """Create a label for this basis function. Returns ------- label : `str` A string suitable for labeling the basis function in a plot or table. """ label = self.__class__.__name__.replace("BasisFunction", "") if self.filtername is not None: label += f" {self.filtername}" label += f" @{id(self)}" return label
[docs] class HealpixLimitedBasisFunctionMixin: """A mixin to limit a basis function to a set of Healpix pixels.""" def __init__(self, hpid, *args, **kwargs): super().__init__(*args, **kwargs) self.hpid = hpid
[docs] def check_feasibility(self, conditions): """Check the feasibility of the current set of conditions. Parameters ---------- conditions : `rubin_scheduler.scheduler.features.Conditions` The conditions for which to test feasibility. Returns ------- feasibility : `bool` True if the current set of conditions is feasible, False otherwise. """ if super().check_feasibility(conditions): if self.recalc or (self.update_on_mjd and conditions.mjd != self.mjd_last): value = self._calc_value(conditions) else: value = super().value feasibility = np.nanmax(value) > -np.inf else: feasibility = False return feasibility
def _calc_value(self, conditions, all_sky=False, **kwargs): all_sky_value = super()._calc_value(conditions, **kwargs) if all_sky: return all_sky_value if np.isscalar(all_sky_value): value = all_sky_value else: assert len(all_sky_value) == hp.nside2npix(self.nside) value = all_sky_value[self.hpid] return value
[docs] class ConstantBasisFunction(BaseBasisFunction): """Just add a constant""" def __call__(self, conditions, **kwargs): return 1
[docs] class SimpleArrayBasisFunction(BaseBasisFunction): def __init__(self, value, *args, **kwargs): self.assigned_value = value super().__init__(*args, **kwargs) def _calc_value(self, conditions, **kwargs): self.value = self.assigned_value return self.value
[docs] class DelayStartBasisFunction(BaseBasisFunction): """Force things to not run before a given night. Parameters ---------- nights_delay : `float`, optional Return False until conditions.night >= nights_delay. """ def __init__(self, nights_delay=365.25 * 5): super().__init__() self.nights_delay = nights_delay
[docs] def check_feasibility(self, conditions): result = True if conditions.night < self.nights_delay: result = False return result
[docs] class FilterDistBasisFunction(BaseBasisFunction): """Track filter distribution, increase reward as fraction of observations in specified filter drops. """ def __init__(self, filtername="r"): super(FilterDistBasisFunction, self).__init__(filtername=filtername) self.survey_features = {} # Count of all the observations self.survey_features["n_obs_count_all"] = features.NObsCount(filtername=None) # Count in filter self.survey_features["n_obs_count_in_filt"] = features.NObsCount(filtername=filtername) def _calc_value(self, conditions, indx=None): result = self.survey_features["n_obs_count_all"].feature / ( self.survey_features["n_obs_count_in_filt"].feature + 1 ) return result
[docs] class NObsPerYearBasisFunction(BaseBasisFunction): """Reward areas that have not been observed N-times in the last year Parameters ---------- filtername : `str` ('r') The filter to track footprint : `np.array` Should be a HEALpix map. Values of 0 or np.nan will be ignored. n_obs : `int` (3) The number of observations to demand season : `float` (300) The amount of time to allow pass before marking a region as "behind". Default 365.25 (days). season_start_hour : `float` (-2) When to start the season relative to RA 180 degrees away from the sun (hours) season_end_hour : `float` (2) When to consider a season ending, the RA relative to the sun + 180 degrees. (hours) night_max : float (365) Set value to zero after night_max is reached (days) """ def __init__( self, filtername="r", nside=DEFAULT_NSIDE, footprint=None, n_obs=3, season=300, season_start_hour=-4.0, season_end_hour=2.0, night_max=365, ): super(NObsPerYearBasisFunction, self).__init__(nside=nside, filtername=filtername) self.footprint = footprint self.n_obs = n_obs self.season = season self.season_start_hour = (season_start_hour) * np.pi / 12.0 # To radians self.season_end_hour = season_end_hour * np.pi / 12.0 # To radians self.survey_features["last_n_mjds"] = features.LastNObsTimes( nside=nside, filtername=filtername, n_obs=n_obs ) self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) self.out_footprint = np.where((footprint == 0) | np.isnan(footprint)) self.night_max = night_max def _calc_value(self, conditions, indx=None): if conditions.night > self.night_max: return 0 result = self.result.copy() behind_pix = np.where((conditions.mjd - self.survey_features["last_n_mjds"].feature[0]) > self.season) result[behind_pix] = 1 # let's ramp up the weight depending on how far into the # observing season the healpix is mid_season_ra = (conditions.sun_ra + np.pi) % (2.0 * np.pi) # relative RA relative_ra = (conditions.ra - mid_season_ra) % (2.0 * np.pi) relative_ra = (self.season_end_hour - relative_ra) % (2.0 * np.pi) # ok, now relative_ra[ np.where(IntRounded(relative_ra) > IntRounded(self.season_end_hour - self.season_start_hour)) ] = 0 weight = relative_ra / (self.season_end_hour - self.season_start_hour) result *= weight # mask off anything outside the footprint result[self.out_footprint] = 0 return result
[docs] class NGoodSeeingBasisFunction(BaseBasisFunction): """Try to get N "good seeing" images each observing season. Parameters ---------- filtername : `str` Bandpass in which to count images. Default r. nside : `int` The nside of the map for the basis function. Should match survey and scheduler nside. Default None uses `set_default_nside`. seeing_fwhm_max : `float` Value to consider as "good" threshold (arcsec). Default of 0.8 arcseconds. m5_penalty_max : `float` The maximum depth loss that is considered acceptable (magnitudes), compared to the dark-sky map in this filter. Default 0.5 magnitudes. n_obs_desired : `int` Number of good seeing observations to collect per season. Default 3. mjd_start : `float` The starting MJD of the survey. Default None uses `rubin_scheduler.utils.SURVEY_START_MJD`. footprint : `np.array`, (N,) Only use area where footprint > 0. Should be a HEALpix map. Default None calls `get_current_footprint()`. """ def __init__( self, filtername="r", nside=DEFAULT_NSIDE, seeing_fwhm_max=0.8, m5_penalty_max=0.5, n_obs_desired=3, mjd_start=None, footprint=None, ): super().__init__(nside=nside, filtername=filtername) self.seeing_fwhm_max = seeing_fwhm_max self.m5_penalty_max = m5_penalty_max self.n_obs_desired = n_obs_desired if mjd_start is None: mjd_start = SURVEY_START_MJD self.mjd_start = mjd_start self.survey_features["N_good_seeing"] = features.NObservationsCurrentSeason( filtername=self.filtername, mjd_start=self.mjd_start, seeing_fwhm_max=self.seeing_fwhm_max, m5_penalty_max=self.m5_penalty_max, nside=self.nside, ) # Set footprint to current survey footprint class if undefined. if footprint is None: footprints, labels = get_current_footprint(self.nside) footprint = footprints[self.filtername] self.footprint = footprint self.result = np.zeros(hp.nside2npix(self.nside)) self.dark_map = None def _calc_value(self, conditions, indx=None): if self.filtername is not None: if self.dark_map is None: self.dark_map = dark_m5( conditions.dec, self.filtername, conditions.site.latitude_rad, fiducial_FWHMEff=0.7 ) # Return the same kind of array (not float) regardless # of result result = self.result.copy() # Update the feature to the current time. self.survey_features["N_good_seeing"].season_update(conditions=conditions) m5_penalty = self.dark_map - conditions.m5_depth[self.filtername] potential_pixels = np.where( (m5_penalty <= self.m5_penalty_max) & (conditions.fwhm_eff[self.filtername] <= self.seeing_fwhm_max) & (self.survey_features["N_good_seeing"].feature < self.n_obs_desired) & (self.footprint > 0) )[0] result[potential_pixels] = 1 return result
def az_rel_point(azs, point_az): az_rel_moon = (azs - point_az) % (2.0 * np.pi) if isinstance(azs, np.ndarray): over = np.where(az_rel_moon > np.pi) az_rel_moon[over] = 2.0 * np.pi - az_rel_moon[over] else: if az_rel_moon > np.pi: az_rel_moon = 2.0 * np.pi - az_rel_moon return az_rel_moon
[docs] class NObsHighAmBasisFunction(BaseBasisFunction): """Reward only reward/count observations at high airmass.""" def __init__( self, nside=DEFAULT_NSIDE, filtername="r", footprint=None, n_obs=3, season=300.0, am_limits=[1.5, 2.2], out_of_bounds_val=np.nan, ): send_unused_deprecation_warning("NObsHighAmBasisFunction") return super(NObsHighAmBasisFunction, self).__init__(nside=nside, filtername=filtername) if footprint is None: footprints, labels = get_current_footprint(self.nside) footprint = footprints[self.filtername] self.footprint = footprint self.out_footprint = np.where((footprint == 0) | np.isnan(footprint)) self.am_limits = am_limits self.season = season self.survey_features["last_n_mjds"] = features.Last_n_obs_times( nside=nside, filtername=filtername, n_obs=n_obs ) self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) + out_of_bounds_val self.out_of_bounds_val = out_of_bounds_val
[docs] def add_observation(self, observation, indx=None): # Only count the observations if they are at the airmass limits if (observation["airmass"] > np.min(self.am_limits)) & ( observation["airmass"] < np.max(self.am_limits) ): for feature in self.survey_features: self.survey_features[feature].add_observation(observation, indx=indx) if self.update_on_newobs: self.recalc = True
[docs] def check_feasibility(self, conditions): result = True reward = self._calc_value(conditions) # If there are no non-NaN values, we're not feasible now if True not in np.isfinite(reward): result = False return result
def _calc_value(self, conditions, indx=None): result = self.result.copy() behind_pix = np.where( ( IntRounded(conditions.mjd - self.survey_features["last_n_mjds"].feature[0]) > IntRounded(self.season) ) & (IntRounded(conditions.airmass) > IntRounded(np.min(self.am_limits))) & (IntRounded(conditions.airmass) < IntRounded(np.max(self.am_limits))) ) result[behind_pix] = 1 result[self.out_footprint] = self.out_of_bounds_val # Update the last time we had an mjd self.mjd_last = conditions.mjd + 0 self.recalc = False self.value = result return result
[docs] class EclipticBasisFunction(BaseBasisFunction): """Mark the area around the ecliptic""" def __init__(self, nside=DEFAULT_NSIDE, distance_to_eclip=25.0): super(EclipticBasisFunction, self).__init__(nside=nside) self.distance_to_eclip = np.radians(distance_to_eclip) ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(self.nside))) self.result = np.zeros(ra.size) coord = SkyCoord(ra=ra * u.rad, dec=dec * u.rad) eclip_lat = coord.barycentrictrueecliptic.lat.radian good = np.where(np.abs(eclip_lat) < self.distance_to_eclip) self.result[good] += 1 def __call__(self, conditions, indx=None): return self.result
[docs] class CadenceInSeasonBasisFunction(BaseBasisFunction): """Drive observations at least every N days in a given area Parameters ---------- drive_map : `np.ndarray`, (N,) A HEALpix map with values of 1 where the cadence should be driven. filtername : `str` The filters that can count. season_span : `float` How long to consider a spot "in_season" (hours). cadence : `float` How long to wait before activating the basis function (days). """ def __init__(self, drive_map, filtername="griz", season_span=2.5, cadence=2.5, nside=DEFAULT_NSIDE): super(CadenceInSeasonBasisFunction, self).__init__(nside=nside, filtername=filtername) self.drive_map = drive_map self.season_span = season_span / 12.0 * np.pi # To radians self.cadence = cadence self.survey_features["last_observed"] = features.LastObserved(nside=nside, filtername=filtername) self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) def _calc_value(self, conditions, indx=None): result = self.result.copy() ra_mid_season = (conditions.sunRA + np.pi) % (2.0 * np.pi) angle_to_mid_season = np.abs(conditions.ra - ra_mid_season) over = np.where(IntRounded(angle_to_mid_season) > IntRounded(np.pi)) angle_to_mid_season[over] = 2.0 * np.pi - angle_to_mid_season[over] days_lag = conditions.mjd - self.survey_features["last_observed"].feature active_pix = np.where( (IntRounded(days_lag) >= IntRounded(self.cadence)) & (self.drive_map == 1) & (IntRounded(angle_to_mid_season) < IntRounded(self.season_span)) ) result[active_pix] = 1.0 return result
[docs] class SeasonCoverageBasisFunction(BaseBasisFunction): """Basis function to encourage N observations per observing season. Parameters ---------- filtername : `str`, optional Count observations in this filter. Default 'r'. nside : `int`, optional Nside for the healpix map to use for the feature. This should match the nside of the survey and scheduler. footprint : `np.array` (N,), optional Healpix map of the footprint where one should demand coverage every season. Default None will call `get_current_footprint()`. n_per_season : `int`, optional The number of observations to attempt to gather every season. Default of 3 is suitable for first year template building. mjd_start : `float`, optional The mjd of the start of the survey (days). Default None uses `rubin_scheduler.utils.SURVEY_START_MJD`. season_frac_start : `float` Only start trying to gather observations after a season is fractionally this far along. Seasons start when the apparent position of sun is at the RA of the pixel (0) and finish when the sun returns again to this RA. The default of 0.5 means that the basis function will not start returning values until the RA reaches the peak of its season. """ def __init__( self, filtername="r", nside=DEFAULT_NSIDE, footprint=None, n_per_season=3, mjd_start=None, season_frac_start=0.5, ): send_unused_deprecation_warning("SeasonCoverageBasisFunction") super().__init__(nside=nside, filtername=filtername) if footprint is None: footprints, labels = get_current_footprint(self.nside) footprint = footprints[self.filtername] self.footprint = footprint # Calculate the RA values for each spot on the footprint ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside))) self.ra_deg = np.degrees(ra) self.n_per_season = n_per_season if mjd_start is None: mjd_start = SURVEY_START_MJD self.mjd_start = mjd_start self.season_frac_start = season_frac_start # Track how many observations have been taken at each RA/Dec # in the current observing season (for that point on the sky). self.survey_features["n_obs_season"] = features.NObservationsCurrentSeason( filtername=filtername, nside=nside, mjd_start=mjd_start ) self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) def _calc_value(self, conditions, indx=None): result = self.result.copy() # Update the feature to the current time self.survey_features["n_obs_season"].season_update(conditions=conditions) feature = self.survey_features["n_obs_season"].feature # Get the season from the conditions object. season = conditions.season # Evaluate where there are not yet enough observations and also # that season is far enough along to require more weight. not_enough = np.where( (self.footprint > 0) & (feature < self.n_per_season) & ((IntRounded(season - np.floor(season)) > IntRounded(self.season_frac_start))) ) result[not_enough] = 1 return result
[docs] class AvoidFastRevisitsBasisFunction(BaseBasisFunction): """Marks targets as unseen if they are in a specified time window in order to avoid fast revisits. Parameters ---------- filtername : `str` or None The name of the filter for this target map. Using None will match visits in any filter. gap_min : `float` Minimum time for the gap (minutes). nside: `int` or None The healpix resolution. penalty_val : `float` The reward value to use for regions to penalize. Will be masked if set to np.nan (default). """ def __init__(self, filtername="r", nside=DEFAULT_NSIDE, gap_min=25.0, penalty_val=np.nan): super().__init__(nside=nside, filtername=filtername) self.filtername = filtername self.penalty_val = penalty_val self.gap_min = IntRounded(gap_min / 60.0 / 24.0) self.nside = nside self.survey_features = dict() self.survey_features["Last_observed"] = features.LastObserved( filtername=filtername, nside=nside, fill=0 ) def _calc_value(self, conditions, indx=None): result = np.ones(hp.nside2npix(self.nside), dtype=float) diff = IntRounded(conditions.mjd - self.survey_features["Last_observed"].feature) bad = np.where(diff < self.gap_min)[0] result[bad] = self.penalty_val return result
[docs] class NearSunHighAirmassBasisFunction(BaseBasisFunction): """Reward areas on the sky at high airmass, within 90 degrees azimuth of the Sun, such as suitable for the near-sun twilight microsurvey for near- or interior-to earth asteroids. Parameters ---------- nside : `int`, optional Nside for the basis function. If None, uses `set_default_nside()`. max_airmass : `float`, oprionl The maximum airmass to try and observe (unitless). penalty : `float`, optional The value to fill in non-rewarded parts of the sky. Default np.nan, which serves to mask regions exceeding the airmass limit and more than 90 degrees azimuth toward the sun. """ def __init__(self, nside=DEFAULT_NSIDE, max_airmass=2.5, penalty=np.nan): super().__init__(nside=nside) self.max_airmass = IntRounded(max_airmass) self.result = np.empty(hp.nside2npix(self.nside)) self.result.fill(penalty) def _calc_value(self, conditions, indx=None): result = self.result.copy() valid_airmass = np.where(np.isfinite(conditions.airmass) == True)[0] good_pix = np.where( (conditions.airmass[valid_airmass] >= 1.0) & (IntRounded(conditions.airmass[valid_airmass]) < self.max_airmass) & (IntRounded(np.abs(conditions.az_to_sun[valid_airmass])) < IntRounded(np.pi / 2.0)) ) result[valid_airmass[good_pix]] = ( conditions.airmass[valid_airmass][good_pix] / self.max_airmass.initial ) return result
[docs] class VisitRepeatBasisFunction(BaseBasisFunction): """ Basis function to reward re-visiting an area on the sky. Looking for Solar System objects. Parameters ---------- gap_min : `float` (15.) Minimum time for the gap (minutes) gap_max : `float` (45.) Maximum time for a gap filtername : `str` ('r') The filter(s) to count with pairs npairs : `int` (1) The number of pairs of observations to attempt to gather """ def __init__(self, gap_min=25.0, gap_max=45.0, filtername="r", nside=DEFAULT_NSIDE, npairs=1): super(VisitRepeatBasisFunction, self).__init__(nside=nside, filtername=filtername) self.gap_min = IntRounded(gap_min / 60.0 / 24.0) self.gap_max = IntRounded(gap_max / 60.0 / 24.0) self.npairs = npairs self.survey_features = {} # Track the number of pairs that have been taken in a night self.survey_features["Pair_in_night"] = features.PairInNight( filtername=filtername, gap_min=gap_min, gap_max=gap_max, nside=nside ) # When was it last observed # XXX--since this feature is also in Pair_in_night, I should just # access that one! self.survey_features["Last_observed"] = features.LastObserved(filtername=filtername, nside=nside) def _calc_value(self, conditions, indx=None): result = np.zeros(hp.nside2npix(self.nside), dtype=float) if indx is None: indx = np.arange(result.size) diff = conditions.mjd - self.survey_features["Last_observed"].feature[indx] mask = np.isnan(diff) # remove NaNs from diff, but save mask so we exclude those values # later. diff[mask] = 0.0 ir_diff = IntRounded(diff) good = np.where( (ir_diff >= self.gap_min) & (ir_diff <= self.gap_max) & (self.survey_features["Pair_in_night"].feature[indx] < self.npairs) & (~mask) )[0] result[indx[good]] += 1.0 return result
[docs] class M5DiffBasisFunction(BaseBasisFunction): """Basis function based on the 5-sigma depth. Look up the best depth a healpixel achieves, and compute the limiting depth difference given current conditions Parameters ---------- filtername : `str`, optional The filter to consider for visits. fiducial_FWHMEff : `float`, optional The zenith seeing to assume for "good" conditions. While the dark sky depth map simply scales with this value, picking a reasonable fiducial_FWHMEff is important because this effects the overall value and scale of the reward from the basis function. nside : `int`, optional The nside for the basis function. Default None uses `set_default_nside()`. """ def __init__(self, filtername="r", fiducial_FWHMEff=0.7, nside=DEFAULT_NSIDE): super().__init__(nside=nside, filtername=filtername) # The dark sky surface brightness values self.dark_map = None self.fiducial_FWHMEff = fiducial_FWHMEff self.filtername = filtername def _calc_value(self, conditions, indx=None): if self.dark_map is None: self.dark_map = dark_m5( conditions.dec, self.filtername, conditions.site.latitude_rad, self.fiducial_FWHMEff ) # No way to get the sign on this right the first time. result = conditions.m5_depth[self.filtername] - self.dark_map return result
[docs] class M5DiffAtHpixBasisFunction(HealpixLimitedBasisFunctionMixin, M5DiffBasisFunction): pass
[docs] class StrictFilterBasisFunction(BaseBasisFunction): """Remove the bonus for staying in the same filter if certain conditions are met. If the moon rises/sets or twilight starts/ends, it makes a lot of sense to consider a filter change. This basis function rewards if it matches the current filter, the moon rises or sets, twilight starts or stops, or there has been a large gap since the last observation. Parameters ---------- time_lag : `float` (10.) If there is a gap between observations longer than this, let the filter change (minutes) twi_change : `float` (-18.) The sun altitude to consider twilight starting/ending (degrees) note_free : `str` ('DD') No penalty for changing filters if the last observation note field includes `note_free` string. Useful for giving a free filter change after deep drilling sequence """ def __init__(self, time_lag=10.0, filtername="r", twi_change=-18.0, note_free="DD"): super(StrictFilterBasisFunction, self).__init__(filtername=filtername) self.time_lag = time_lag / 60.0 / 24.0 # Convert to days self.twi_change = np.radians(twi_change) self.survey_features = {} self.survey_features["Last_observation"] = features.LastObservation() self.note_free = note_free def _calc_value(self, conditions, **kwargs): # Did the moon set or rise since last observation? moon_changed = conditions.moon_alt * self.survey_features["Last_observation"].feature["moonAlt"] < 0 # Are we already in the filter (or at start of night)? in_filter = (conditions.current_filter == self.filtername) | (conditions.current_filter is None) # Has enough time past? time_past = IntRounded( conditions.mjd - self.survey_features["Last_observation"].feature["mjd"] ) > IntRounded(self.time_lag) # Did twilight start/end? twi_changed = (conditions.sun_alt - self.twi_change) * ( self.survey_features["Last_observation"].feature["sunAlt"] - self.twi_change ) < 0 # Did we just finish a DD sequence was_dd = self.note_free in self.survey_features["Last_observation"].feature["scheduler_note"] # Is the filter mounted? mounted = self.filtername in conditions.mounted_filters if (moon_changed | in_filter | time_past | twi_changed | was_dd) & mounted: result = 1.0 else: result = 0.0 return result
[docs] class FilterChangeBasisFunction(BaseBasisFunction): """Reward staying in the current filter.""" def __init__(self, filtername="r"): super(FilterChangeBasisFunction, self).__init__(filtername=filtername) def _calc_value(self, conditions, **kwargs): if (conditions.current_filter == self.filtername) | (conditions.current_filter is None): result = 1.0 else: result = 0.0 return result
[docs] class SlewtimeBasisFunction(BaseBasisFunction): """Reward slews that take little time Parameters ---------- max_time : `float` The estimated maximum slewtime (seconds). Used to normalize so the basis function spans ~ -1-0 in reward units. Default 135 seconds corresponds to just slightly less than a filter change. filtername : `str`, optional The filter to check for pre-post slewtime estimates. If a slew includes a filter change, other basis functions will decide on the reward, so the result here can be 0. nside : `int`, optional Nside for the basis function. Default None will use `set_default_nside()`. """ def __init__(self, max_time=135.0, filtername="r", nside=DEFAULT_NSIDE): super(SlewtimeBasisFunction, self).__init__(nside=nside, filtername=filtername) self.maxtime = max_time self.nside = nside self.filtername = filtername def _calc_value(self, conditions, indx=None): # If we are in a different filter, the # FilterChangeBasisFunction will take it # But we can still use the MASK returned by # the slewtime map to remove inaccessible parts of the sky if conditions.current_filter != self.filtername: if np.size(conditions.slewtime) > 1: result = np.where(np.isfinite(conditions.slewtime), 0, np.nan) else: result = 0 else: # Need to make sure smaller slewtime is larger reward. if np.size(conditions.slewtime) > 1: # Slewtime map can contain nans and/or # infs - mask these with nans result = np.where( np.isfinite(conditions.slewtime), -conditions.slewtime / self.maxtime, np.nan, ) else: result = -conditions.slewtime / self.maxtime return result
[docs] class CadenceEnhanceBasisFunction(BaseBasisFunction): """Drive a certain cadence Parameters ---------- filtername : `str` ('gri') The filter(s) that should be grouped together supress_window : `list` of `float` The start and stop window for when observations should be repressed (days) apply_area : healpix map The area over which to try and drive the cadence. Good values as 1, no cadence drive 0. Probably works as a bool array too. """ def __init__( self, filtername="gri", nside=DEFAULT_NSIDE, supress_window=[0, 1.8], supress_val=-0.5, enhance_window=[2.1, 3.2], enhance_val=1.0, apply_area=None, ): super(CadenceEnhanceBasisFunction, self).__init__(nside=nside, filtername=filtername) self.supress_window = np.sort(supress_window) self.supress_val = supress_val self.enhance_window = np.sort(enhance_window) self.enhance_val = enhance_val self.survey_features = {} self.survey_features["last_observed"] = features.LastObserved(filtername=filtername) self.empty = np.zeros(hp.nside2npix(self.nside), dtype=float) # No map, try to drive the whole area if apply_area is None: self.apply_indx = np.arange(self.empty.size) else: self.apply_indx = np.where(apply_area != 0)[0] def _calc_value(self, conditions, indx=None): # copy an empty array result = self.empty.copy() if indx is not None: ind = np.intersect1d(indx, self.apply_indx) else: ind = self.apply_indx if np.size(ind) == 0: result = 0 else: mjd_diff = conditions.mjd - self.survey_features["last_observed"].feature[ind] to_supress = np.where( (IntRounded(mjd_diff) > IntRounded(self.supress_window[0])) & (IntRounded(mjd_diff) < IntRounded(self.supress_window[1])) ) result[ind[to_supress]] = self.supress_val to_enhance = np.where( (IntRounded(mjd_diff) > IntRounded(self.enhance_window[0])) & (IntRounded(mjd_diff) < IntRounded(self.enhance_window[1])) ) result[ind[to_enhance]] = self.enhance_val return result
# https://docs.astropy.org/en/stable/_modules/astropy/modeling # functional_models.html#Trapezoid1D def trapezoid(x, amplitude, x_0, width, slope): """One dimensional Trapezoid model function""" # Compute the four points where the trapezoid changes slope # x1 <= x2 <= x3 <= x4 x2 = x_0 - width / 2.0 x3 = x_0 + width / 2.0 x1 = x2 - amplitude / slope x4 = x3 + amplitude / slope result = x * 0 # Compute model values in pieces between the change points range_a = np.logical_and(x >= x1, x < x2) range_b = np.logical_and(x >= x2, x < x3) range_c = np.logical_and(x >= x3, x < x4) result[range_a] = slope * (x[range_a] - x1) result[range_b] = amplitude result[range_c] = slope * (x4 - x[range_c]) return result
[docs] class CadenceEnhanceTrapezoidBasisFunction(BaseBasisFunction): """Drive a certain cadence, like CadenceEnhanceBasisFunction but with smooth transitions Parameters ---------- filtername : `str` ('gri') The filter(s) that should be grouped together XXX--fill out doc string! """ def __init__( self, filtername="gri", nside=DEFAULT_NSIDE, delay_width=2, delay_slope=2.0, delay_peak=0, delay_amp=0.5, enhance_width=3.0, enhance_slope=2.0, enhance_peak=4.0, enhance_amp=1.0, apply_area=None, season_limit=None, ): super(CadenceEnhanceTrapezoidBasisFunction, self).__init__(nside=nside, filtername=filtername) self.delay_width = delay_width self.delay_slope = delay_slope self.delay_peak = delay_peak self.delay_amp = delay_amp self.enhance_width = enhance_width self.enhance_slope = enhance_slope self.enhance_peak = enhance_peak self.enhance_amp = enhance_amp self.season_limit = season_limit / 12 * np.pi # To radians self.survey_features = {} self.survey_features["last_observed"] = features.LastObserved(filtername=filtername) self.empty = np.zeros(hp.nside2npix(self.nside), dtype=float) # No map, try to drive the whole area if apply_area is None: self.apply_indx = np.arange(self.empty.size) else: self.apply_indx = np.where(apply_area != 0)[0] def suppress_enhance(self, x): result = x * 0 result -= trapezoid(x, self.delay_amp, self.delay_peak, self.delay_width, self.delay_slope) result += trapezoid( x, self.enhance_amp, self.enhance_peak, self.enhance_width, self.enhance_slope, ) return result def season_len(self, conditions): ra_mid_season = (conditions.sunRA + np.pi) % (2.0 * np.pi) angle_to_mid_season = np.abs(conditions.ra - ra_mid_season) over = np.where(IntRounded(angle_to_mid_season) > IntRounded(np.pi)) angle_to_mid_season[over] = 2.0 * np.pi - angle_to_mid_season[over] return angle_to_mid_season def _calc_value(self, conditions, indx=None): # copy an empty array result = self.empty.copy() if indx is not None: ind = np.intersect1d(indx, self.apply_indx) else: ind = self.apply_indx if np.size(ind) == 0: result = 0 else: mjd_diff = conditions.mjd - self.survey_features["last_observed"].feature[ind] result[ind] += self.suppress_enhance(mjd_diff) if self.season_limit is not None: radians_to_midseason = self.season_len(conditions) outside_season = np.where(radians_to_midseason > self.season_limit) result[outside_season] = 0 return result
[docs] class AzimuthBasisFunction(BaseBasisFunction): """Reward staying in the same azimuth range. Possibly better than using slewtime, especially when selecting a large area of sky. """ def __init__(self, nside=DEFAULT_NSIDE): send_unused_deprecation_warning("AzimuthBasisFunction") super(AzimuthBasisFunction, self).__init__(nside=nside) def _calc_value(self, conditions, indx=None): az_dist = conditions.az - conditions.tel_az az_dist = az_dist % (2.0 * np.pi) over = np.where(az_dist > np.pi) az_dist[over] = 2.0 * np.pi - az_dist[over] # Normalize sp between 0 and 1 result = az_dist / np.pi return result
[docs] class AzModuloBasisFunction(BaseBasisFunction): """Try to replicate the Rothchild et al cadence forcing by only observing on limited az ranges per night. Parameters ---------- az_limits : `list` of `float` pairs (None) The azimuth limits (degrees) to use. """ def __init__(self, nside=DEFAULT_NSIDE, az_limits=None, out_of_bounds_val=-1.0): super(AzModuloBasisFunction, self).__init__(nside=nside) self.result = np.ones(hp.nside2npix(self.nside)) if az_limits is None: spread = 100.0 / 2.0 self.az_limits = np.radians( [ [360 - spread, spread], [90.0 - spread, 90.0 + spread], [180.0 - spread, 180.0 + spread], ] ) else: self.az_limits = np.radians(az_limits) self.mod_val = len(self.az_limits) self.out_of_bounds_val = out_of_bounds_val def _calc_value(self, conditions, indx=None): result = self.result.copy() az_lim = self.az_limits[np.max(conditions.night) % self.mod_val] if az_lim[0] < az_lim[1]: out_pix = np.where( (IntRounded(conditions.az) < IntRounded(az_lim[0])) | (IntRounded(conditions.az) > IntRounded(az_lim[1])) ) else: out_pix = np.where( (IntRounded(conditions.az) < IntRounded(az_lim[0])) | (IntRounded(conditions.az) > IntRounded(az_lim[1])) )[0] result[out_pix] = self.out_of_bounds_val return result
[docs] class DecModuloBasisFunction(BaseBasisFunction): """Emphasize dec bands on a nightly varying basis Parameters ---------- dec_limits : `list` of `float` pairs (None) The azimuth limits (degrees) to use. """ def __init__(self, nside=DEFAULT_NSIDE, dec_limits=None, out_of_bounds_val=-1.0): super(DecModuloBasisFunction, self).__init__(nside=nside) npix = hp.nside2npix(nside) hpids = np.arange(npix) ra, dec = _hpid2_ra_dec(nside, hpids) self.results = [] if dec_limits is None: self.dec_limits = np.radians([[-90.0, -32.8], [-32.8, -12.0], [-12.0, 35.0]]) else: self.dec_limits = np.radians(dec_limits) self.mod_val = len(self.dec_limits) self.out_of_bounds_val = out_of_bounds_val for limits in self.dec_limits: good = np.where((dec >= limits[0]) & (dec < limits[1]))[0] tmp = np.zeros(npix) tmp[good] = 1 self.results.append(tmp) def _calc_value(self, conditions, indx=None): night_index = np.max(conditions.night % self.mod_val) result = self.results[night_index] return result
[docs] class MapModuloBasisFunction(BaseBasisFunction): """Similar to Dec_modulo, but now use input masks Parameters ---------- inmaps : `list` of hp arrays """ def __init__(self, inmaps): nside = hp.npix2nside(np.size(inmaps[0])) super(MapModuloBasisFunction, self).__init__(nside=nside) self.maps = inmaps self.mod_val = len(inmaps) def _calc_value(self, conditions, indx=None): indx = np.max(conditions.night % self.mod_val) result = self.maps[indx] return result
[docs] class GoodSeeingBasisFunction(BaseBasisFunction): """Drive observations in good seeing conditions""" def __init__( self, nside=DEFAULT_NSIDE, filtername="r", footprint=None, fwhm_eff_limit=0.8, mag_diff=0.75, ): send_unused_deprecation_warning("GoodSeeingBasisFunction") return super(GoodSeeingBasisFunction, self).__init__(nside=nside) self.filtername = filtername self.fwhm_eff_limit = IntRounded(fwhm_eff_limit) if footprint is None: footprints, labels = get_current_footprint(nside=self.nside) fp = footprints[self.filtername] else: fp = footprint self.out_of_bounds = np.where(fp == 0)[0] self.result = fp * 0 self.mag_diff = IntRounded(mag_diff) self.survey_features = {} self.survey_features["coadd_depth_all"] = features.CoaddedDepth( filtername=self.filtername, nside=self.nside ) self.survey_features["coadd_depth_good"] = features.CoaddedDepth( filtername=self.filtername, nside=self.nside, fwhm_eff_limit=fwhm_eff_limit ) def _calc_value(self, conditions, **kwargs): # Seeing is "bad" if IntRounded(conditions.FWHMeff[self.filtername].min()) > self.fwhm_eff_limit: return 0 result = self.result.copy() diff = ( self.survey_features["coadd_depth_all"].feature - self.survey_features["coadd_depth_good"].feature ) # Where are there things we want to observe? good_pix = np.where( (IntRounded(diff) > self.mag_diff) & (IntRounded(conditions.FWHMeff[self.filtername]) <= self.fwhm_eff_limit) ) # Hm, should this scale by the mag differences? Probably. result[good_pix] = diff[good_pix] result[self.out_of_bounds] = 0 return result
[docs] class VisitGap(BaseBasisFunction): """Basis function to create a visit gap based on the survey note field. Parameters ---------- note : `str` Value of the observation "scheduler_note" field to be masked. filter_names : list [str], optional List of filter names that will be considered when evaluating if the gap has passed. gap_min : float (optional) Time gap (default=25, in minutes). penalty_val : float or np.nan Value of the penalty to apply (default is np.nan). Notes ----- When a list of filters is provided, all filters must be observed before the gap requirement will be activated, and once activated, only observations in these filters will be evaluated in context of whether the last observation was at least gap in the past. """ def __init__(self, note, filter_names=None, gap_min=25.0, penalty_val=np.nan): super().__init__() self.penalty_val = penalty_val self.gap = gap_min / 60.0 / 24.0 self.filter_names = filter_names self.survey_features = dict() if self.filter_names is not None: for filtername in self.filter_names: self.survey_features[f"LastObservationMjd::{filtername}"] = features.LastObservationMjd( note=note, filtername=filtername ) else: self.survey_features["LastObservationMjd"] = features.LastObservationMjd(scheduler_note=note)
[docs] def check_feasibility(self, conditions): notes_last_observed = [last_observed.feature for last_observed in self.survey_features.values()] if any([last_observed is None for last_observed in notes_last_observed]): return True after_gap = [conditions.mjd - last_observed > self.gap for last_observed in notes_last_observed] return all(after_gap)
def _calc_value(self, conditions, indx=None): return 1.0 if self.check_feasibility(conditions) else self.penalty_val
[docs] class AvoidDirectWind(BaseBasisFunction): """Mask the sky where the wind pressure exceeds `wind_speed_maximum`. Parameters ---------- wind_speed_maximum : `float`, optional Wind speed to mark regions as unobservable (in m/s). nside : `int`, optional The nside for the basis function. Default None uses `set_default_nside()`. """ def __init__(self, wind_speed_maximum=20.0, nside=DEFAULT_NSIDE): super().__init__(nside=nside) self.wind_speed_maximum = wind_speed_maximum def _calc_value(self, conditions, indx=None): reward_map = np.zeros(hp.nside2npix(self.nside)) if conditions.wind_speed is None or conditions.wind_direction is None: return reward_map wind_pressure = conditions.wind_speed * np.cos(conditions.az - conditions.wind_direction) reward_map -= wind_pressure**2.0 mask = wind_pressure > self.wind_speed_maximum reward_map[mask] = np.nan return reward_map
[docs] class BalanceVisits(BaseBasisFunction): """Balance visits across multiple surveys. Parameters ---------- nobs_reference : `int` Expected number of observations across all interested surveys. note_survey : `str` Note value for the current survey. note_interest : `str` Substring with the name of interested surveys to be accounted. nside : `int` Healpix map resolution. Notes ----- This basis function is designed to balance the reward of a group of surveys, such that the group get a reward boost based on the required collective number of observations. For example, if you have 3 surveys (e.g. SURVEY_A_REGION_1, SURVEY_A_REGION_2, SURVEY_A_REGION_3), when one of them is observed once (SURVEY_A_REGION_1) they all get a small reward boost proportional to the collective number of observations (`nobs_reference`). Further observations of SURVEY_A_REGION_1 would now cause the other surveys to gain a reward boost in relative to it. """ def __init__(self, nobs_reference, note_survey, note_interest, nside=DEFAULT_NSIDE): super().__init__(nside=nside) self.nobs_reference = nobs_reference self.survey_features = {} self.survey_features["n_obs_survey"] = features.NObsCount(scheduler_note=note_survey) self.survey_features["n_obs_survey_interest"] = features.NObsCount(scheduler_note=note_interest) def _calc_value(self, conditions, indx=None): return (1 + np.floor(self.survey_features["n_obs_survey_interest"].feature / self.nobs_reference)) / ( self.survey_features["n_obs_survey"].feature if self.survey_features["n_obs_survey"].feature > 0 else 1 )
[docs] class RewardNObsSequence(BaseBasisFunction): """Reward taking a sequence of observations. Parameters ---------- n_obs_survey : `int` Number of observations to reward. note_survey : `str` The value of the observation note, to take into account. nside : `int`, optional Healpix map resolution (ignored). Notes ----- This basis function is useful when a survey is composed of more than one observation (e.g. in different filters) and one wants to make sure they are all taken together. """ def __init__(self, n_obs_survey, note_survey, nside=DEFAULT_NSIDE): super().__init__(nside=nside) self.n_obs_survey = n_obs_survey self.survey_features = {} self.survey_features["n_obs_survey"] = features.NObsCount(scheduler_note=note_survey) def _calc_value(self, conditions, indx=None): return self.survey_features["n_obs_survey"].feature % self.n_obs_survey
[docs] class RewardRisingBasisFunction(BaseBasisFunction): """Reward parts of the sky that are rising. Optionally, mask out parts of the sky that are not rising. This produces a reward that increases as the field rises toward zenith, then abruptly falls as the field passes zenith. Negative hour angles (or hour angles > 180 degrees) indicate a rising point on the sky. Parameters ---------- slope : `float` Sets the 'slope' of how fast the basis function value changes with hour angle. penalty_val : `float` or `np.nan`, optional Sets the value for the part of the sky which is not rising (hour angle between 0 and 180). Using a value of np.nan will mask this region of sky, a value of 0 will just make this non-rewarding. nside : `int` or None, optional Nside for the healpix map, default of None uses scheduler default. """ def __init__(self, slope=0.1, penalty_val=0, nside=DEFAULT_NSIDE): super().__init__(nside=nside) self.slope = slope self.penalty_val = penalty_val # Probably not needed
[docs] def check_feasibility(self, conditions): return True
def _calc_value(self, conditions, indx=None): # HA should be available in the conditions object value = self.slope * conditions.HA past_zenith = np.where(conditions.HA < np.pi) value[past_zenith] = self.penalty_val return value