__all__ = (
"BaseBasisFunction",
"HealpixLimitedBasisFunctionMixin",
"ConstantBasisFunction",
"SimpleArrayBasisFunction",
"DelayStartBasisFunction",
"TargetMapBasisFunction",
"AvoidLongGapsBasisFunction",
"AvoidFastRevisits",
"AvoidFastRevisitsBasisFunction",
"VisitRepeatBasisFunction",
"M5DiffBasisFunction",
"M5DiffAtHpixBasisFunction",
"StrictFilterBasisFunction",
"GoalStrictFilterBasisFunction",
"FilterChangeBasisFunction",
"SlewtimeBasisFunction",
"AggressiveSlewtimeBasisFunction",
"SkybrightnessLimitAtHpixBasisFunction",
"SkybrightnessLimitBasisFunction",
"CablewrapUnwrapBasisFunction",
"CadenceEnhanceBasisFunction",
"CadenceEnhanceTrapezoidBasisFunction",
"AzimuthBasisFunction",
"AzModuloBasisFunction",
"DecModuloBasisFunction",
"MapModuloBasisFunction",
"TemplateGenerateBasisFunction",
"FootprintNvisBasisFunction",
"ThirdObservationBasisFunction",
"SeasonCoverageBasisFunction",
"NObsPerYearBasisFunction",
"CadenceInSeasonBasisFunction",
"NearSunTwilightBasisFunction",
"NearSunHighAirmassBasisFunction",
"NObsHighAmBasisFunction",
"GoodSeeingBasisFunction",
"ObservedTwiceBasisFunction",
"EclipticBasisFunction",
"LimitRepeatBasisFunction",
"VisitGap",
"NGoodSeeingBasisFunction",
"AvoidDirectWind",
"BalanceVisits",
"RewardNObsSequence",
"FilterDistBasisFunction",
"RewardRisingBasisFunction",
"send_unused_deprecation_warning",
)
import warnings
import healpy as hp
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from rubin_scheduler.scheduler import features, utils
from rubin_scheduler.scheduler.utils import IntRounded, get_current_footprint
from rubin_scheduler.skybrightness_pre import dark_m5
from rubin_scheduler.utils import _hpid2_ra_dec, survey_start_mjd
def send_unused_deprecation_warning(name):
message = (
f"The basis function {name} is not in use by the current "
"baseline scheduler and may be deprecated shortly. "
"Please contact the rubin_scheduler maintainers if "
"this is in use elsewhere."
)
warnings.warn(message, FutureWarning)
[docs]
class BaseBasisFunction:
"""Class that takes features and computes a reward function when
called."""
def __init__(self, nside=None, filtername=None, **kwargs):
# Set if basis function needs to be recalculated if there is a new
# observation
self.update_on_newobs = True
# Set if basis function needs to be recalculated if conditions
# change
self.update_on_mjd = True
# Dict to hold all the features we want to track
self.survey_features = {}
# Keep track of the last time the basis function was called.
# If mjd doesn't change, use cached value
self.mjd_last = None
self.value = 0
# list the attributes to compare to check if basis functions
# are equal.
self.attrs_to_compare = []
# Do we need to recalculate the basis function
self.recalc = True
# Basis functions don't technically all need an nside, but so
# many do might as well set it here
if nside is None:
self.nside = utils.set_default_nside()
else:
self.nside = nside
self.filtername = filtername
[docs]
def add_observations_array(self, observations_array, observations_hpid):
"""Similar to add_observation, but for loading a whole
array of observations at a time.
Parameters
----------
observations_array_in : `np.array`
An array of completed observations (with columns like
rubin_scheduler.scheduler.utils.ObservationArray).
Should be sorted by MJD.
observations_hpid_in : `np.array`
Same as observations_array_in, but larger and with an
additional column for HEALpix id.
Each observation is listed multiple times,
once for every HEALpix it overlaps.
"""
for feature in self.survey_features:
self.survey_features[feature].add_observations_array(observations_array, observations_hpid)
if self.update_on_newobs:
self.recalc = True
[docs]
def add_observation(self, observation, indx=None):
"""
Parameters
----------
observation : `np.array`
An array with information about the input observation
indx : `np.array`
The indices of the healpix map that the observation overlaps
with
"""
for feature in self.survey_features:
self.survey_features[feature].add_observation(observation, indx=indx)
if self.update_on_newobs:
self.recalc = True
[docs]
def check_feasibility(self, conditions):
"""If there is logic to decide if something is feasible
(e.g., only if moon is down), it can be calculated here.
Helps prevent full __call__ from being called more than needed.
"""
return True
def _calc_value(self, conditions, **kwargs):
self.value = 0
# Update the last time we had an mjd
self.mjd_last = conditions.mjd + 0
self.recalc = False
return self.value
def __eq__(self):
# XXX--to work on if we need to make a registry of basis functions.
pass
def __ne__(self):
pass
[docs]
def __call__(self, conditions, **kwargs):
"""
Parameters
----------
conditions : `rubin_scheduler.scheduler.features.conditions` object
Object that has attributes for all the current conditions.
Return a reward healpix map or a reward scalar.
"""
# If we are not feasible, return -inf
if not self.check_feasibility(conditions):
return -np.inf
if self.recalc:
self.value = self._calc_value(conditions, **kwargs)
if self.update_on_mjd:
if conditions.mjd != self.mjd_last:
self.value = self._calc_value(conditions, **kwargs)
return self.value
[docs]
def label(self):
"""Create a label for this basis function.
Returns
-------
label : `str`
A string suitable for labeling the basis function in a
plot or table.
"""
label = self.__class__.__name__.replace("BasisFunction", "")
if self.filtername is not None:
label += f" {self.filtername}"
label += f" @{id(self)}"
return label
[docs]
class HealpixLimitedBasisFunctionMixin:
"""A mixin to limit a basis function to a set of Healpix pixels."""
def __init__(self, hpid, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hpid = hpid
[docs]
def check_feasibility(self, conditions):
"""Check the feasibility of the current set of conditions.
Parameters
----------
conditions : `rubin_scheduler.scheduler.features.Conditions`
The conditions for which to test feasibility.
Returns
-------
feasibility : `bool`
True if the current set of conditions is feasible, False
otherwise.
"""
if super().check_feasibility(conditions):
if self.recalc or (self.update_on_mjd and conditions.mjd != self.mjd_last):
value = self._calc_value(conditions)
else:
value = super().value
feasibility = np.nanmax(value) > -np.inf
else:
feasibility = False
return feasibility
def _calc_value(self, conditions, all_sky=False, **kwargs):
all_sky_value = super()._calc_value(conditions, **kwargs)
if all_sky:
return all_sky_value
if np.isscalar(all_sky_value):
value = all_sky_value
else:
assert len(all_sky_value) == hp.nside2npix(self.nside)
value = all_sky_value[self.hpid]
return value
[docs]
class ConstantBasisFunction(BaseBasisFunction):
"""Just add a constant"""
def __call__(self, conditions, **kwargs):
return 1
[docs]
class SimpleArrayBasisFunction(BaseBasisFunction):
def __init__(self, value, *args, **kwargs):
self.assigned_value = value
super().__init__(*args, **kwargs)
def _calc_value(self, conditions, **kwargs):
self.value = self.assigned_value
return self.value
[docs]
class DelayStartBasisFunction(BaseBasisFunction):
"""Force things to not run before a given night.
Parameters
----------
nights_delay : `float`, optional
Return False until conditions.night >= nights_delay.
"""
def __init__(self, nights_delay=365.25 * 5):
super().__init__()
self.nights_delay = nights_delay
[docs]
def check_feasibility(self, conditions):
result = True
if conditions.night < self.nights_delay:
result = False
return result
[docs]
class FilterDistBasisFunction(BaseBasisFunction):
"""Track filter distribution, increase reward as fraction of observations
in specified filter drops.
"""
def __init__(self, filtername="r"):
super(FilterDistBasisFunction, self).__init__(filtername=filtername)
self.survey_features = {}
# Count of all the observations
self.survey_features["n_obs_count_all"] = features.NObsCount(filtername=None)
# Count in filter
self.survey_features["n_obs_count_in_filt"] = features.NObsCount(filtername=filtername)
def _calc_value(self, conditions, indx=None):
result = self.survey_features["n_obs_count_all"].feature / (
self.survey_features["n_obs_count_in_filt"].feature + 1
)
return result
[docs]
class NObsPerYearBasisFunction(BaseBasisFunction):
"""Reward areas that have not been observed N-times in the last year
Parameters
----------
filtername : `str` ('r')
The filter to track
footprint : `np.array`
Should be a HEALpix map. Values of 0 or np.nan will be ignored.
n_obs : `int` (3)
The number of observations to demand
season : `float` (300)
The amount of time to allow pass before marking a region as "behind".
Default 365.25 (days).
season_start_hour : `float` (-2)
When to start the season relative to RA 180 degrees away
from the sun (hours)
season_end_hour : `float` (2)
When to consider a season ending, the RA relative to the sun + 180
degrees. (hours)
night_max : float (365)
Set value to zero after night_max is reached (days)
"""
def __init__(
self,
filtername="r",
nside=None,
footprint=None,
n_obs=3,
season=300,
season_start_hour=-4.0,
season_end_hour=2.0,
night_max=365,
):
super(NObsPerYearBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.footprint = footprint
self.n_obs = n_obs
self.season = season
self.season_start_hour = (season_start_hour) * np.pi / 12.0 # To radians
self.season_end_hour = season_end_hour * np.pi / 12.0 # To radians
self.survey_features["last_n_mjds"] = features.LastNObsTimes(
nside=nside, filtername=filtername, n_obs=n_obs
)
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
self.out_footprint = np.where((footprint == 0) | np.isnan(footprint))
self.night_max = night_max
def _calc_value(self, conditions, indx=None):
if conditions.night > self.night_max:
return 0
result = self.result.copy()
behind_pix = np.where((conditions.mjd - self.survey_features["last_n_mjds"].feature[0]) > self.season)
result[behind_pix] = 1
# let's ramp up the weight depending on how far into the
# observing season the healpix is
mid_season_ra = (conditions.sun_ra + np.pi) % (2.0 * np.pi)
# relative RA
relative_ra = (conditions.ra - mid_season_ra) % (2.0 * np.pi)
relative_ra = (self.season_end_hour - relative_ra) % (2.0 * np.pi)
# ok, now
relative_ra[
np.where(IntRounded(relative_ra) > IntRounded(self.season_end_hour - self.season_start_hour))
] = 0
weight = relative_ra / (self.season_end_hour - self.season_start_hour)
result *= weight
# mask off anything outside the footprint
result[self.out_footprint] = 0
return result
[docs]
class NGoodSeeingBasisFunction(BaseBasisFunction):
"""Try to get N "good seeing" images each observing season.
Parameters
----------
filtername : `str`
Bandpass in which to count images. Default r.
nside : `int`
The nside of the map for the basis function. Should match
survey and scheduler nside.
Default None uses `set_default_nside`.
seeing_fwhm_max : `float`
Value to consider as "good" threshold (arcsec).
Default of 0.8 arcseconds.
m5_penalty_max : `float`
The maximum depth loss that is considered acceptable (magnitudes),
compared to the dark-sky map in this filter.
Default 0.5 magnitudes.
n_obs_desired : `int`
Number of good seeing observations to collect per season.
Default 3.
mjd_start : `float`
The starting MJD of the survey.
Default None uses `rubin_scheduler.utils.survey_start_mjd()`.
footprint : `np.array`, (N,)
Only use area where footprint > 0. Should be a HEALpix map.
Default None calls `get_current_footprint()`.
"""
def __init__(
self,
filtername="r",
nside=None,
seeing_fwhm_max=0.8,
m5_penalty_max=0.5,
n_obs_desired=3,
mjd_start=None,
footprint=None,
):
super().__init__(nside=nside, filtername=filtername)
self.seeing_fwhm_max = seeing_fwhm_max
self.m5_penalty_max = m5_penalty_max
self.n_obs_desired = n_obs_desired
if mjd_start is None:
mjd_start = survey_start_mjd()
self.mjd_start = mjd_start
self.survey_features["N_good_seeing"] = features.NObservationsCurrentSeason(
filtername=self.filtername,
mjd_start=self.mjd_start,
seeing_fwhm_max=self.seeing_fwhm_max,
m5_penalty_max=self.m5_penalty_max,
nside=self.nside,
)
# Set footprint to current survey footprint class if undefined.
if footprint is None:
footprints, labels = get_current_footprint(self.nside)
footprint = footprints[self.filtername]
self.footprint = footprint
self.result = np.zeros(hp.nside2npix(self.nside))
self.dark_map = None
def _calc_value(self, conditions, indx=None):
if self.filtername is not None:
if self.dark_map is None:
self.dark_map = dark_m5(
conditions.dec, self.filtername, conditions.site.latitude_rad, fiducial_FWHMEff=0.7
)
# Return the same kind of array (not float) regardless
# of result
result = self.result.copy()
# Update the feature to the current time.
self.survey_features["N_good_seeing"].season_update(conditions=conditions)
m5_penalty = self.dark_map - conditions.m5_depth[self.filtername]
potential_pixels = np.where(
(m5_penalty <= self.m5_penalty_max)
& (conditions.fwhm_eff[self.filtername] <= self.seeing_fwhm_max)
& (self.survey_features["N_good_seeing"].feature < self.n_obs_desired)
& (self.footprint > 0)
)[0]
result[potential_pixels] = 1
return result
[docs]
class AvoidLongGapsBasisFunction(BaseBasisFunction):
"""Boost the reward on parts of the survey that haven't been
observed for a while.
Parameters
----------
filtername : `str`, optional
The filter to consider when tracking visits.
nside : `int`, optional
The nside to use for the basis function.
Default None uses `set_default_nside()`.
footprint : `np.ndarray`, (N,)
The footprint to consider when tracking visits.
Default None uses `get_current_footprint()`.
min_gap : `float`, optional
The minimum gap of time before boosting (in days).
max_gap : `float`, optional
The maximum gap of time before stopping boosting (in days).
ha_limit : `float, optional
Only boost visits at parts of the sky with HA < ha_limit
(in hours).
"""
def __init__(
self,
filtername=None,
nside=None,
footprint=None,
min_gap=4.0,
max_gap=40.0,
ha_limit=3.5,
):
super(AvoidLongGapsBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.min_gap = min_gap
self.max_gap = max_gap
self.filtername = filtername
if footprint is None:
footprints, labels = get_current_footprint(self.nside)
footprint = footprints[self.filtername]
self.footprint = footprint
self.ha_limit = 2.0 * np.pi * ha_limit / 24.0 # To radians
self.survey_features = {}
self.survey_features["last_observed"] = features.LastObserved(nside=nside, filtername=filtername)
self.result = np.zeros(hp.nside2npix(self.nside))
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
gap = conditions.mjd - self.survey_features["last_observed"].feature
in_range = np.where((gap > self.min_gap) & (gap < self.max_gap) & (self.footprint > 0))
result[in_range] = 1
# mask out areas beyond the hour angle limit.
out_ha = np.where((conditions.HA > self.ha_limit) & (conditions.HA < (2.0 * np.pi - self.ha_limit)))[
0
]
result[out_ha] = 0
return result
[docs]
class TargetMapBasisFunction(BaseBasisFunction):
"""Basis function that tracks number of observations and tries
to match a specified spatial distribution.
In general, this is deprecated in favor of
`FootprintBasisFunction`.
Parameters
----------
filtername: `str` ('r')
The name of the filter for this target map.
nside: `int` (default_nside)
The healpix resolution.
target_map : `np.array` (None)
A healpix map showing the ratio of observations desired
for all points on the sky
norm_factor : `float` (0.00010519)
for converting target map to number of observations.
Should be the area of the camera divided by the area of a healpixel
divided by the sum of all your goal maps.
Default value assumes LSST foV has 1.75 degree radius and the
standard goal maps.
If using multiple filters, see
rubin_scheduler.utils.calc_norm_factor
for a utility that computes norm_factor.
out_of_bounds_val : `float` (-10.)
Reward value to give regions where there are no
observations requested (unitless).
"""
def __init__(
self,
filtername="r",
nside=None,
target_map=None,
norm_factor=None,
out_of_bounds_val=-10.0,
):
super(TargetMapBasisFunction, self).__init__(nside=nside, filtername=filtername)
if norm_factor is None:
warnings.warn("No norm_factor set, use utils.calc_norm_factor if using multiple filters.")
self.norm_factor = 0.00010519
else:
self.norm_factor = norm_factor
self.survey_features = {}
# Map of the number of observations in filter
self.survey_features["n_obs"] = features.NObservations(filtername=filtername, nside=self.nside)
# Count of all the observations
self.survey_features["n_obs_count_all"] = features.NObsCount(filtername=None)
if target_map is None:
target_maps, labels = utils.get_current_footprint(self.nside)
self.target_map = target_maps[filtername]
else:
self.target_map = target_map
self.out_of_bounds_area = np.where(self.target_map == 0)[0]
self.out_of_bounds_val = out_of_bounds_val
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
self.all_indx = np.arange(self.result.size)
# As of 4/2024,
# this is used in the ts_fbs_utils maintel "anytime" test survey.
# It is also used in unit tests.
# send_unused_deprecation_warning(self.__class__.__name__)
# In general, it is deprecated in favor of FootprintBasisFunction
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
if indx is None:
indx = self.all_indx
# Find out how many observations we want now at those points
goal_n = self.target_map[indx] * self.survey_features["n_obs_count_all"].feature * self.norm_factor
result[indx] = goal_n - self.survey_features["n_obs"].feature[indx]
result[self.out_of_bounds_area] = self.out_of_bounds_val
return result
def az_rel_point(azs, point_az):
az_rel_moon = (azs - point_az) % (2.0 * np.pi)
if isinstance(azs, np.ndarray):
over = np.where(az_rel_moon > np.pi)
az_rel_moon[over] = 2.0 * np.pi - az_rel_moon[over]
else:
if az_rel_moon > np.pi:
az_rel_moon = 2.0 * np.pi - az_rel_moon
return az_rel_moon
[docs]
class NObsHighAmBasisFunction(BaseBasisFunction):
"""Reward only reward/count observations at high airmass."""
def __init__(
self,
nside=None,
filtername="r",
footprint=None,
n_obs=3,
season=300.0,
am_limits=[1.5, 2.2],
out_of_bounds_val=np.nan,
):
super(NObsHighAmBasisFunction, self).__init__(nside=nside, filtername=filtername)
if footprint is None:
footprints, labels = get_current_footprint(self.nside)
footprint = footprints[self.filtername]
self.footprint = footprint
self.out_footprint = np.where((footprint == 0) | np.isnan(footprint))
self.am_limits = am_limits
self.season = season
self.survey_features["last_n_mjds"] = features.Last_n_obs_times(
nside=nside, filtername=filtername, n_obs=n_obs
)
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) + out_of_bounds_val
self.out_of_bounds_val = out_of_bounds_val
[docs]
def add_observation(self, observation, indx=None):
# Only count the observations if they are at the airmass limits
if (observation["airmass"] > np.min(self.am_limits)) & (
observation["airmass"] < np.max(self.am_limits)
):
for feature in self.survey_features:
self.survey_features[feature].add_observation(observation, indx=indx)
if self.update_on_newobs:
self.recalc = True
[docs]
def check_feasibility(self, conditions):
result = True
reward = self._calc_value(conditions)
# If there are no non-NaN values, we're not feasible now
if True not in np.isfinite(reward):
result = False
return result
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
behind_pix = np.where(
(
IntRounded(conditions.mjd - self.survey_features["last_n_mjds"].feature[0])
> IntRounded(self.season)
)
& (IntRounded(conditions.airmass) > IntRounded(np.min(self.am_limits)))
& (IntRounded(conditions.airmass) < IntRounded(np.max(self.am_limits)))
)
result[behind_pix] = 1
result[self.out_footprint] = self.out_of_bounds_val
# Update the last time we had an mjd
self.mjd_last = conditions.mjd + 0
self.recalc = False
self.value = result
return result
[docs]
class EclipticBasisFunction(BaseBasisFunction):
"""Mark the area around the ecliptic"""
def __init__(self, nside=None, distance_to_eclip=25.0):
super(EclipticBasisFunction, self).__init__(nside=nside)
self.distance_to_eclip = np.radians(distance_to_eclip)
ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(self.nside)))
self.result = np.zeros(ra.size)
coord = SkyCoord(ra=ra * u.rad, dec=dec * u.rad)
eclip_lat = coord.barycentrictrueecliptic.lat.radian
good = np.where(np.abs(eclip_lat) < self.distance_to_eclip)
self.result[good] += 1
def __call__(self, conditions, indx=None):
return self.result
[docs]
class CadenceInSeasonBasisFunction(BaseBasisFunction):
"""Drive observations at least every N days in a given area
Parameters
----------
drive_map : `np.ndarray`, (N,)
A HEALpix map with values of 1 where the cadence should be driven.
filtername : `str`
The filters that can count.
season_span : `float`
How long to consider a spot "in_season" (hours).
cadence : `float`
How long to wait before activating the basis function (days).
"""
def __init__(self, drive_map, filtername="griz", season_span=2.5, cadence=2.5, nside=None):
super(CadenceInSeasonBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.drive_map = drive_map
self.season_span = season_span / 12.0 * np.pi # To radians
self.cadence = cadence
self.survey_features["last_observed"] = features.LastObserved(nside=nside, filtername=filtername)
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
ra_mid_season = (conditions.sunRA + np.pi) % (2.0 * np.pi)
angle_to_mid_season = np.abs(conditions.ra - ra_mid_season)
over = np.where(IntRounded(angle_to_mid_season) > IntRounded(np.pi))
angle_to_mid_season[over] = 2.0 * np.pi - angle_to_mid_season[over]
days_lag = conditions.mjd - self.survey_features["last_observed"].feature
active_pix = np.where(
(IntRounded(days_lag) >= IntRounded(self.cadence))
& (self.drive_map == 1)
& (IntRounded(angle_to_mid_season) < IntRounded(self.season_span))
)
result[active_pix] = 1.0
return result
[docs]
class SeasonCoverageBasisFunction(BaseBasisFunction):
"""Basis function to encourage N observations per observing season.
Parameters
----------
filtername : `str`, optional
Count observations in this filter. Default 'r'.
nside : `int`, optional
Nside for the healpix map to use for the feature.
This should match the nside of the survey and scheduler.
footprint : `np.array` (N,), optional
Healpix map of the footprint where one should demand coverage
every season. Default None will call `get_current_footprint()`.
n_per_season : `int`, optional
The number of observations to attempt to gather every season.
Default of 3 is suitable for first year template building.
mjd_start : `float`, optional
The mjd of the start of the survey (days).
Default None uses `rubin_scheduler.utils.survey_start_mjd()`.
season_frac_start : `float`
Only start trying to gather observations after a season
is fractionally this far along.
Seasons start when the apparent position of sun is at the RA of
the pixel (0) and finish when the sun returns again to this RA.
The default of 0.5 means that the basis function will not
start returning values until the RA reaches the peak of its season.
"""
def __init__(
self,
filtername="r",
nside=None,
footprint=None,
n_per_season=3,
mjd_start=None,
season_frac_start=0.5,
):
super().__init__(nside=nside, filtername=filtername)
if footprint is None:
footprints, labels = get_current_footprint(self.nside)
footprint = footprints[self.filtername]
self.footprint = footprint
# Calculate the RA values for each spot on the footprint
ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside)))
self.ra_deg = np.degrees(ra)
self.n_per_season = n_per_season
if mjd_start is None:
mjd_start = survey_start_mjd()
self.mjd_start = mjd_start
self.season_frac_start = season_frac_start
# Track how many observations have been taken at each RA/Dec
# in the current observing season (for that point on the sky).
self.survey_features["n_obs_season"] = features.NObservationsCurrentSeason(
filtername=filtername, nside=nside, mjd_start=mjd_start
)
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
# Update the feature to the current time
self.survey_features["n_obs_season"].season_update(conditions)
feature = self.survey_features["n_obs_season"].feature
# Get the season from the conditions object.
season = conditions.season
# Evaluate where there are not yet enough observations and also
# that season is far enough along to require more weight.
not_enough = np.where(
(self.footprint > 0)
& (feature < self.n_per_season)
& ((IntRounded(season - np.floor(season)) > IntRounded(self.season_frac_start)))
)
result[not_enough] = 1
return result
[docs]
class ThirdObservationBasisFunction(BaseBasisFunction):
"""If there have been observations in two filters long enough ago,
go for a third
Parameters
----------
gap_min : `float` (40.)
The minimum time gap to consider a pixel good (minutes)
gap_max : `float` (120)
The maximum time to consider going for a pair (minutes)
"""
def __init__(self, nside=32, filtername1="r", filtername2="z", gap_min=40.0, gap_max=120.0):
super(ThirdObservationBasisFunction, self).__init__(nside=nside)
self.filtername1 = filtername1
self.filtername2 = filtername2
self.gap_min = IntRounded(gap_min / 60.0 / 24.0)
self.gap_max = IntRounded(gap_max / 60.0 / 24.0)
self.survey_features = {}
self.survey_features["last_obs_f1"] = features.LastObserved(filtername=filtername1, nside=nside)
self.survey_features["last_obs_f2"] = features.LastObserved(filtername=filtername2, nside=nside)
self.result = np.empty(hp.nside2npix(self.nside))
self.result.fill(np.nan)
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
d1 = IntRounded(conditions.mjd - self.survey_features["last_obs_f1"].feature)
d2 = IntRounded(conditions.mjd - self.survey_features["last_obs_f2"].feature)
good = np.where((d1 > self.gap_min) & (d1 < self.gap_max) & (d2 > self.gap_min) & (d2 < self.gap_max))
result[good] = 1
return result
[docs]
class AvoidFastRevisitsBasisFunction(BaseBasisFunction):
"""Marks targets as unseen if they are in a specified time window
in order to avoid fast revisits.
Parameters
----------
filtername: `str` or None
The name of the filter for this target map.
Using None will match visits in any filter.
gap_min : `float`
Minimum time for the gap (minutes).
nside: `int` or None
The healpix resolution.
penalty_val : `float`
The reward value to use for regions to penalize.
Will be masked if set to np.nan (default).
"""
def __init__(self, filtername="r", nside=None, gap_min=25.0, penalty_val=np.nan):
super().__init__(nside=nside, filtername=filtername)
self.filtername = filtername
self.penalty_val = penalty_val
self.gap_min = IntRounded(gap_min / 60.0 / 24.0)
self.nside = nside
self.survey_features = dict()
self.survey_features["Last_observed"] = features.LastObserved(
filtername=filtername, nside=nside, fill=0
)
def _calc_value(self, conditions, indx=None):
result = np.ones(hp.nside2npix(self.nside), dtype=float)
diff = IntRounded(conditions.mjd - self.survey_features["Last_observed"].feature)
bad = np.where(diff < self.gap_min)[0]
result[bad] = self.penalty_val
return result
[docs]
class AvoidFastRevisits(AvoidFastRevisitsBasisFunction):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
warnings.warn("Class has been renamed AvoidFastRevisitsBasisFunction", DeprecationWarning, 2)
[docs]
class NearSunHighAirmassBasisFunction(BaseBasisFunction):
"""Reward areas on the sky at high airmass, within 90 degrees azimuth
of the Sun, such as suitable for the near-sun twilight microsurvey for
near- or interior-to earth asteroids.
Parameters
----------
nside : `int`, optional
Nside for the basis function. If None, uses `set_default_nside()`.
max_airmass : `float`, oprionl
The maximum airmass to try and observe (unitless).
penalty : `float`, optional
The value to fill in non-rewarded parts of the sky.
Default np.nan, which serves to mask regions exceeding the airmass
limit and more than 90 degrees azimuth toward the sun.
"""
def __init__(self, nside=None, max_airmass=2.5, penalty=np.nan):
super().__init__(nside=nside)
self.max_airmass = IntRounded(max_airmass)
self.result = np.empty(hp.nside2npix(self.nside))
self.result.fill(penalty)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
valid_airmass = np.where(np.isfinite(conditions.airmass) == True)[0]
good_pix = np.where(
(conditions.airmass[valid_airmass] >= 1.0)
& (IntRounded(conditions.airmass[valid_airmass]) < self.max_airmass)
& (IntRounded(np.abs(conditions.az_to_sun[valid_airmass])) < IntRounded(np.pi / 2.0))
)
result[valid_airmass[good_pix]] = (
conditions.airmass[valid_airmass][good_pix] / self.max_airmass.initial
)
return result
[docs]
class NearSunTwilightBasisFunction(NearSunHighAirmassBasisFunction):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
warnings.warn("Class has been renamed NearSunHighAirmassBasisFunction", DeprecationWarning, 2)
[docs]
class VisitRepeatBasisFunction(BaseBasisFunction):
"""
Basis function to reward re-visiting an area on the sky.
Looking for Solar System objects.
Parameters
----------
gap_min : `float` (15.)
Minimum time for the gap (minutes)
gap_max : `float` (45.)
Maximum time for a gap
filtername : `str` ('r')
The filter(s) to count with pairs
npairs : `int` (1)
The number of pairs of observations to attempt to gather
"""
def __init__(self, gap_min=25.0, gap_max=45.0, filtername="r", nside=None, npairs=1):
super(VisitRepeatBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.gap_min = IntRounded(gap_min / 60.0 / 24.0)
self.gap_max = IntRounded(gap_max / 60.0 / 24.0)
self.npairs = npairs
self.survey_features = {}
# Track the number of pairs that have been taken in a night
self.survey_features["Pair_in_night"] = features.PairInNight(
filtername=filtername, gap_min=gap_min, gap_max=gap_max, nside=nside
)
# When was it last observed
# XXX--since this feature is also in Pair_in_night, I should just
# access that one!
self.survey_features["Last_observed"] = features.LastObserved(filtername=filtername, nside=nside)
def _calc_value(self, conditions, indx=None):
result = np.zeros(hp.nside2npix(self.nside), dtype=float)
if indx is None:
indx = np.arange(result.size)
diff = conditions.mjd - self.survey_features["Last_observed"].feature[indx]
mask = np.isnan(diff)
# remove NaNs from diff, but save mask so we exclude those values
# later.
diff[mask] = 0.0
ir_diff = IntRounded(diff)
good = np.where(
(ir_diff >= self.gap_min)
& (ir_diff <= self.gap_max)
& (self.survey_features["Pair_in_night"].feature[indx] < self.npairs)
& (~mask)
)[0]
result[indx[good]] += 1.0
return result
[docs]
class M5DiffBasisFunction(BaseBasisFunction):
"""Basis function based on the 5-sigma depth.
Look up the best depth a healpixel achieves, and compute
the limiting depth difference given current conditions
Parameters
----------
filtername : `str`, optional
The filter to consider for visits.
fiducial_FWHMEff : `float`, optional
The zenith seeing to assume for "good" conditions.
While the dark sky depth map simply scales with this value,
picking a reasonable fiducial_FWHMEff is important because
this effects the overall value and scale of the reward
from the basis function.
nside : `int`, optional
The nside for the basis function.
Default None uses `set_default_nside()`.
"""
def __init__(self, filtername="r", fiducial_FWHMEff=0.7, nside=None):
super().__init__(nside=nside, filtername=filtername)
# The dark sky surface brightness values
self.dark_map = None
self.fiducial_FWHMEff = fiducial_FWHMEff
self.filtername = filtername
def _calc_value(self, conditions, indx=None):
if self.dark_map is None:
self.dark_map = dark_m5(
conditions.dec, self.filtername, conditions.site.latitude_rad, self.fiducial_FWHMEff
)
# No way to get the sign on this right the first time.
result = conditions.m5_depth[self.filtername] - self.dark_map
return result
[docs]
class M5DiffAtHpixBasisFunction(HealpixLimitedBasisFunctionMixin, M5DiffBasisFunction):
pass
[docs]
class StrictFilterBasisFunction(BaseBasisFunction):
"""Remove the bonus for staying in the same filter
if certain conditions are met.
If the moon rises/sets or twilight starts/ends, it makes a lot of sense
to consider a filter change. This basis function rewards if it matches
the current filter, the moon rises or sets, twilight starts or stops,
or there has been a large gap since the last observation.
Parameters
----------
time_lag : `float` (10.)
If there is a gap between observations longer than this,
let the filter change (minutes)
twi_change : `float` (-18.)
The sun altitude to consider twilight starting/ending (degrees)
note_free : `str` ('DD')
No penalty for changing filters if the last observation note field
includes `note_free` string.
Useful for giving a free filter change after deep drilling sequence
"""
def __init__(self, time_lag=10.0, filtername="r", twi_change=-18.0, note_free="DD"):
super(StrictFilterBasisFunction, self).__init__(filtername=filtername)
self.time_lag = time_lag / 60.0 / 24.0 # Convert to days
self.twi_change = np.radians(twi_change)
self.survey_features = {}
self.survey_features["Last_observation"] = features.LastObservation()
self.note_free = note_free
def _calc_value(self, conditions, **kwargs):
# Did the moon set or rise since last observation?
moon_changed = conditions.moon_alt * self.survey_features["Last_observation"].feature["moonAlt"] < 0
# Are we already in the filter (or at start of night)?
in_filter = (conditions.current_filter == self.filtername) | (conditions.current_filter is None)
# Has enough time past?
time_past = IntRounded(
conditions.mjd - self.survey_features["Last_observation"].feature["mjd"]
) > IntRounded(self.time_lag)
# Did twilight start/end?
twi_changed = (conditions.sun_alt - self.twi_change) * (
self.survey_features["Last_observation"].feature["sunAlt"] - self.twi_change
) < 0
# Did we just finish a DD sequence
was_dd = self.note_free in self.survey_features["Last_observation"].feature["scheduler_note"]
# Is the filter mounted?
mounted = self.filtername in conditions.mounted_filters
if (moon_changed | in_filter | time_past | twi_changed | was_dd) & mounted:
result = 1.0
else:
result = 0.0
return result
[docs]
class GoalStrictFilterBasisFunction(BaseBasisFunction):
"""Remove the bonus for staying in the same filter
if certain conditions are met.
If the moon rises/sets or twilight starts/ends, it makes a lot of sense
to consider a filter change. This basis function rewards if it matches
the current filter, the moon rises or sets, twilight starts or stops,
or there has been a large gap since the last observation.
Parameters
----------
time_lag_min : `float`
Minimum time after a filter change for which a new filter change
will receive zero reward, or be denied at all (see unseen_before_lag).
time_lag_max : `float`
Time after a filter change where the reward for changing filters
achieve its maximum.
time_lag_boost : `float`
Time after a filter change to apply a boost on the reward.
boost_gain : `float`
A multiplier factor for the reward after time_lag_boost.
unseen_before_lag : `bool`
If True will make it impossible to switch filter before time_lag
has passed.
filtername : `str`
The filter for which this basis function will be used.
tag: `str` or None
When using filter proportion use only regions with this tag to
count for observations.
twi_change : `float`
Switch reward on when twilight changes.
proportion : `float`
The expected filter proportion distribution.
aways_available: `bool`
If this is true the basis function will aways be computed
regardless of the feasibility.
If False a more detailed feasibility check is performed.
When set to False, it may speed up the computation process by
avoiding the computation of the reward functions paired with this bf,
when observation is not feasible.
"""
def __init__(
self,
time_lag_min=10.0,
time_lag_max=30.0,
time_lag_boost=60.0,
boost_gain=2.0,
unseen_before_lag=False,
filtername="r",
tag=None,
twi_change=-18.0,
proportion=1.0,
aways_available=False,
):
super(GoalStrictFilterBasisFunction, self).__init__(filtername=filtername)
self.time_lag_min = time_lag_min / 60.0 / 24.0 # Convert to days
self.time_lag_max = time_lag_max / 60.0 / 24.0 # Convert to days
self.time_lag_boost = time_lag_boost / 60.0 / 24.0
self.boost_gain = boost_gain
self.unseen_before_lag = unseen_before_lag
self.twi_change = np.radians(twi_change)
self.proportion = proportion
self.aways_available = aways_available
self.survey_features = {}
self.survey_features["Last_observation"] = features.LastObservation()
self.survey_features["Last_filter_change"] = features.LastFilterChange()
self.survey_features["n_obs_all"] = features.NObsCount(filtername=None)
# Tag is not actually supported at observation level.
self.survey_features["n_obs"] = features.NObsCount(filtername=filtername, tag=tag)
send_unused_deprecation_warning(self.__class__.__name__)
def filter_change_bonus(self, time):
lag_min = self.time_lag_min
lag_max = self.time_lag_max
a = 1.0 / (lag_max - lag_min)
b = -a * lag_min
bonus = a * time + b
# How far behind we are with respect to proportion?
nobs = self.survey_features["n_obs"].feature
nobs_all = self.survey_features["n_obs_all"].feature
goal = self.proportion
# need = 1. - nobs / nobs_all + goal if nobs_all > 0 else 1. + goal
need = goal / nobs * nobs_all if nobs > 0 else 1.0
# need /= goal
if hasattr(time, "__iter__"):
before_lag = np.where(time <= lag_min)
bonus[before_lag] = -np.inf if self.unseen_before_lag else 0.0
after_lag = np.where(time >= lag_max)
bonus[after_lag] = 1.0 if time < self.time_lag_boost else self.boost_gain
elif IntRounded(time) <= IntRounded(lag_min):
return -np.inf if self.unseen_before_lag else 0.0
elif IntRounded(time) >= IntRounded(lag_max):
return 1.0 if IntRounded(time) < IntRounded(self.time_lag_boost) else self.boost_gain
return bonus * need
[docs]
def check_feasibility(self, conditions):
"""
This method makes a pre-check of the feasibility of this
basis function. If a basis function returns False on the
feasibility check, it won't computed at all.
Returns
-------
feasibility : `bool`
"""
# Make a quick check about the feasibility of this basis function.
# If current filter is none, telescope is parked and we could,
# in principle, switch to any filter. If this basis function
# computes reward for the current filter, then it is also feasible.
# At last we check for an "aways_available" flag. Meaning, we
# force this basis function to be aways be computed.
if (
conditions.current_filter is None
or conditions.current_filter == self.filtername
or self.aways_available
):
return True
# If we arrive here,
# we make some extra checks to make sure this bf is
# feasible and should be computed.
# Did the moon set or rise since last observation?
moon_changed = conditions.moon_alt * self.survey_features["Last_observation"].feature["moonAlt"] < 0
# Are we already in the filter (or at start of night)?
not_in_filter = conditions.current_filter != self.filtername
# Has enough time past?
lag = conditions.mjd - self.survey_features["Last_filter_change"].feature["mjd"]
time_past = IntRounded(lag) > IntRounded(self.time_lag_min)
# Did twilight start/end?
twi_changed = (conditions.sun_alt - self.twi_change) * (
self.survey_features["Last_observation"].feature["sun_alt"] - self.twi_change
) < 0
# Did we just finish a DD sequence
was_dd = self.survey_features["Last_observation"].feature["scheduler_note"] == "DD"
# Is the filter mounted?
mounted = self.filtername in conditions.mounted_filters
if (moon_changed | time_past | twi_changed | was_dd) & mounted & not_in_filter:
return True
else:
return False
def _calc_value(self, conditions, **kwargs):
if conditions.current_filter is None:
return 0.0 # no bonus if no filter is mounted
# Did the moon set or rise since last observation?
moon_changed = conditions.moon_alt * self.survey_features["Last_observation"].feature["moonAlt"] < 0
# Has enough time past?
lag = conditions.mjd - self.survey_features["Last_filter_change"].feature["mjd"]
time_past = lag > self.time_lag_min
# Did twilight start/end?
twi_changed = (conditions.sun_alt - self.twi_change) * (
self.survey_features["Last_observation"].feature["sun_alt"] - self.twi_change
) < 0
# Did we just finish a DD sequence
was_dd = self.survey_features["Last_observation"].feature["scheduler_note"] == "DD"
# Is the filter mounted?
mounted = self.filtername in conditions.mounted_filters
if (moon_changed | time_past | twi_changed | was_dd) & mounted:
result = self.filter_change_bonus(lag) if time_past else 0.0
else:
result = -100.0 if self.unseen_before_lag else 0.0
return result
[docs]
class FilterChangeBasisFunction(BaseBasisFunction):
"""Reward staying in the current filter."""
def __init__(self, filtername="r"):
super(FilterChangeBasisFunction, self).__init__(filtername=filtername)
def _calc_value(self, conditions, **kwargs):
if (conditions.current_filter == self.filtername) | (conditions.current_filter is None):
result = 1.0
else:
result = 0.0
return result
[docs]
class SlewtimeBasisFunction(BaseBasisFunction):
"""Reward slews that take little time
Parameters
----------
max_time : `float`
The estimated maximum slewtime (seconds).
Used to normalize so the basis function spans ~ -1-0
in reward units. Default 135 seconds corresponds to just
slightly less than a filter change.
filtername : `str`, optional
The filter to check for pre-post slewtime estimates.
If a slew includes a filter change, other basis functions will
decide on the reward, so the result here can be 0.
nside : `int`, optional
Nside for the basis function.
Default None will use `set_default_nside()`.
"""
def __init__(self, max_time=135.0, filtername="r", nside=None):
super(SlewtimeBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.maxtime = max_time
self.nside = nside
self.filtername = filtername
[docs]
def add_observation(self, observation, indx=None):
# No tracking of observations in this basis function.
# Purely based on conditions.
pass
def _calc_value(self, conditions, indx=None):
# If we are in a different filter, the
# FilterChangeBasisFunction will take it
# But we can still use the MASK returned by
# the slewtime map to remove inaccessible parts of the sky
if conditions.current_filter != self.filtername:
if np.size(conditions.slewtime) > 1:
result = np.where(np.isfinite(conditions.slewtime), 0, np.nan)
else:
result = 0
else:
# Need to make sure smaller slewtime is larger reward.
if np.size(conditions.slewtime) > 1:
# Slewtime map can contain nans and/or
# infs - mask these with nans
result = np.where(
np.isfinite(conditions.slewtime),
-conditions.slewtime / self.maxtime,
np.nan,
)
else:
result = -conditions.slewtime / self.maxtime
return result
[docs]
class AggressiveSlewtimeBasisFunction(BaseBasisFunction):
"""Reward slews that take little time
XXX--not sure how this is different from SlewtimeBasisFunction?
Looks like it's checking the slewtime to the field position
rather than the healpix maybe?
"""
def __init__(self, max_time=135.0, order=1.0, hard_max=None, filtername="r", nside=None):
super(AggressiveSlewtimeBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.maxtime = max_time
self.hard_max = hard_max
self.order = order
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
# If we are in a different filter, the
# FilterChangeBasisFunction will take it
if conditions.current_filter != self.filtername:
result = 0.0
else:
# Need to make sure smaller slewtime is larger reward.
if np.size(self.condition_features["slewtime"].feature) > 1:
result = self.result.copy()
result.fill(np.nan)
good = np.where(np.bitwise_and(conditions.slewtime > 0.0, conditions.slewtime < self.maxtime))
result[good] = ((self.maxtime - conditions.slewtime[good]) / self.maxtime) ** self.order
if self.hard_max is not None:
not_so_good = np.where(conditions.slewtime > self.hard_max)
result[not_so_good] -= 10.0
fields = np.unique(conditions.hp2fields[good])
for field in fields:
hp_indx = np.where(conditions.hp2fields == field)
result[hp_indx] = np.min(result[hp_indx])
else:
result = (self.maxtime - conditions.slewtime) / self.maxtime
return result
[docs]
class SkybrightnessLimitBasisFunction(BaseBasisFunction):
"""Mask regions that are outside a sky brightness limit.
Parameters
----------
nside : `int`, optional
The nside for the basis function. Default None uses
`set_default_nside()`.
filtername : `str`, optional
The filter to consider for the skybrightness pre values.
sbmin : `float`, optional
The minimum (brightest) skybrightness to consider (mags).
Default of 20 will cut out some times of night or parts of the
sky.
sbmax : `float`, optional
The maximum (faintest) skybrightness to consider (mags).
Default of 30 will pass all skybrightness values.
"""
def __init__(self, nside=None, filtername="r", sbmin=20.0, sbmax=30.0):
super(SkybrightnessLimitBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.min = IntRounded(sbmin)
self.max = IntRounded(sbmax)
self.result = np.empty(hp.nside2npix(self.nside), dtype=float)
self.result.fill(np.nan)
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
# Replace non-finite values so IntRounded works, then set
# the result to nan.
sky_brightness = conditions.skybrightness[self.filtername].copy()
not_finite = np.where(~np.isfinite(sky_brightness))
sky_brightness[not_finite] = 0
rounded_sky_brightness = IntRounded(sky_brightness)
good = np.where(
np.bitwise_and(
rounded_sky_brightness > self.min,
rounded_sky_brightness < self.max,
)
)
result[not_finite] = np.nan
result[good] = 1.0
return result
[docs]
class SkybrightnessLimitAtHpixBasisFunction(
HealpixLimitedBasisFunctionMixin, SkybrightnessLimitBasisFunction
):
pass
[docs]
class CablewrapUnwrapBasisFunction(BaseBasisFunction):
"""
Parameters
----------
min_az : `float` (20.)
The minimum azimuth to activate bf (degrees)
max_az : `float` (82.)
The maximum azimuth to activate bf (degrees)
unwrap_until: `float` (90.)
The window in which the bf is activated (degrees)
"""
def __init__(
self,
nside=None,
min_az=-270.0,
max_az=270.0,
min_alt=20.0,
max_alt=82.0,
activate_tol=20.0,
delta_unwrap=1.2,
unwrap_until=70.0,
max_duration=30.0,
):
super(CablewrapUnwrapBasisFunction, self).__init__(nside=nside)
self.min_az = np.radians(min_az)
self.max_az = np.radians(max_az)
self.activate_tol = np.radians(activate_tol)
self.delta_unwrap = np.radians(delta_unwrap)
self.unwrap_until = np.radians(unwrap_until)
self.min_alt = np.radians(min_alt)
self.max_alt = np.radians(max_alt)
# Convert to half-width for convienence
self.nside = nside
self.active = False
self.unwrap_direction = 0.0 # either -1., 0., 1.
self.max_duration = max_duration / 60.0 / 24.0 # Convert to days
self.activation_time = None
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
current_abs_rad = np.radians(conditions.az)
unseen = np.where(np.bitwise_or(conditions.alt < self.min_alt, conditions.alt > self.max_alt))
result[unseen] = np.nan
if (
self.min_az + self.activate_tol < current_abs_rad < self.max_az - self.activate_tol
) and not self.active:
return result
elif self.active and self.unwrap_direction == 1 and current_abs_rad > self.min_az + self.unwrap_until:
self.active = False
self.unwrap_direction = 0.0
self.activation_time = None
return result
elif (
self.active and self.unwrap_direction == -1 and current_abs_rad < self.max_az - self.unwrap_until
):
self.active = False
self.unwrap_direction = 0.0
self.activation_time = None
return result
elif self.activation_time is not None and conditions.mjd - self.activation_time > self.max_duration:
self.active = False
self.unwrap_direction = 0.0
self.activation_time = None
return result
if not self.active:
self.activation_time = conditions.mjd
if current_abs_rad < 0.0:
self.unwrap_direction = 1 # clock-wise unwrap
else:
self.unwrap_direction = -1 # counter-clock-wise unwrap
self.active = True
max_abs_rad = self.max_az
min_abs_rad = self.min_az
TWOPI = 2.0 * np.pi
# Compute distance and accumulated az.
norm_az_rad = np.divmod(conditions.az - min_abs_rad, TWOPI)[1] + min_abs_rad
distance_rad = divmod(norm_az_rad - current_abs_rad, TWOPI)[1]
get_shorter = np.where(distance_rad > np.pi)
distance_rad[get_shorter] -= TWOPI
accum_abs_rad = current_abs_rad + distance_rad
# Compute wrap regions and fix distances
mask_max = np.where(accum_abs_rad > max_abs_rad)
distance_rad[mask_max] -= TWOPI
mask_min = np.where(accum_abs_rad < min_abs_rad)
distance_rad[mask_min] += TWOPI
# Step-2: Repeat but now with compute reward to unwrap
# using specified delta_unwrap
unwrap_current_abs_rad = current_abs_rad - (
np.abs(self.delta_unwrap) if self.unwrap_direction > 0 else -np.abs(self.delta_unwrap)
)
unwrap_distance_rad = divmod(norm_az_rad - unwrap_current_abs_rad, TWOPI)[1]
unwrap_get_shorter = np.where(unwrap_distance_rad > np.pi)
unwrap_distance_rad[unwrap_get_shorter] -= TWOPI
unwrap_distance_rad = np.abs(unwrap_distance_rad)
if self.unwrap_direction < 0:
mask = np.where(accum_abs_rad > unwrap_current_abs_rad)
else:
mask = np.where(accum_abs_rad < unwrap_current_abs_rad)
# Finally build reward map
result = (1.0 - unwrap_distance_rad / np.max(unwrap_distance_rad)) ** 2.0
result[mask] = 0.0
result[unseen] = np.nan
return result
[docs]
class CadenceEnhanceBasisFunction(BaseBasisFunction):
"""Drive a certain cadence
Parameters
----------
filtername : `str` ('gri')
The filter(s) that should be grouped together
supress_window : `list` of `float`
The start and stop window for when observations should be repressed
(days)
apply_area : healpix map
The area over which to try and drive the cadence.
Good values as 1, no candece drive 0.
Probably works as a bool array too.
"""
def __init__(
self,
filtername="gri",
nside=None,
supress_window=[0, 1.8],
supress_val=-0.5,
enhance_window=[2.1, 3.2],
enhance_val=1.0,
apply_area=None,
):
super(CadenceEnhanceBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.supress_window = np.sort(supress_window)
self.supress_val = supress_val
self.enhance_window = np.sort(enhance_window)
self.enhance_val = enhance_val
self.survey_features = {}
self.survey_features["last_observed"] = features.LastObserved(filtername=filtername)
self.empty = np.zeros(hp.nside2npix(self.nside), dtype=float)
# No map, try to drive the whole area
if apply_area is None:
self.apply_indx = np.arange(self.empty.size)
else:
self.apply_indx = np.where(apply_area != 0)[0]
def _calc_value(self, conditions, indx=None):
# copy an empty array
result = self.empty.copy()
if indx is not None:
ind = np.intersect1d(indx, self.apply_indx)
else:
ind = self.apply_indx
if np.size(ind) == 0:
result = 0
else:
mjd_diff = conditions.mjd - self.survey_features["last_observed"].feature[ind]
to_supress = np.where(
(IntRounded(mjd_diff) > IntRounded(self.supress_window[0]))
& (IntRounded(mjd_diff) < IntRounded(self.supress_window[1]))
)
result[ind[to_supress]] = self.supress_val
to_enhance = np.where(
(IntRounded(mjd_diff) > IntRounded(self.enhance_window[0]))
& (IntRounded(mjd_diff) < IntRounded(self.enhance_window[1]))
)
result[ind[to_enhance]] = self.enhance_val
return result
# https://docs.astropy.org/en/stable/_modules/astropy/modeling
# functional_models.html#Trapezoid1D
def trapezoid(x, amplitude, x_0, width, slope):
"""One dimensional Trapezoid model function"""
# Compute the four points where the trapezoid changes slope
# x1 <= x2 <= x3 <= x4
x2 = x_0 - width / 2.0
x3 = x_0 + width / 2.0
x1 = x2 - amplitude / slope
x4 = x3 + amplitude / slope
result = x * 0
# Compute model values in pieces between the change points
range_a = np.logical_and(x >= x1, x < x2)
range_b = np.logical_and(x >= x2, x < x3)
range_c = np.logical_and(x >= x3, x < x4)
result[range_a] = slope * (x[range_a] - x1)
result[range_b] = amplitude
result[range_c] = slope * (x4 - x[range_c])
return result
[docs]
class CadenceEnhanceTrapezoidBasisFunction(BaseBasisFunction):
"""Drive a certain cadence, like CadenceEnhanceBasisFunction
but with smooth transitions
Parameters
----------
filtername : `str` ('gri')
The filter(s) that should be grouped together
XXX--fill out doc string!
"""
def __init__(
self,
filtername="gri",
nside=None,
delay_width=2,
delay_slope=2.0,
delay_peak=0,
delay_amp=0.5,
enhance_width=3.0,
enhance_slope=2.0,
enhance_peak=4.0,
enhance_amp=1.0,
apply_area=None,
season_limit=None,
):
super(CadenceEnhanceTrapezoidBasisFunction, self).__init__(nside=nside, filtername=filtername)
self.delay_width = delay_width
self.delay_slope = delay_slope
self.delay_peak = delay_peak
self.delay_amp = delay_amp
self.enhance_width = enhance_width
self.enhance_slope = enhance_slope
self.enhance_peak = enhance_peak
self.enhance_amp = enhance_amp
self.season_limit = season_limit / 12 * np.pi # To radians
self.survey_features = {}
self.survey_features["last_observed"] = features.LastObserved(filtername=filtername)
self.empty = np.zeros(hp.nside2npix(self.nside), dtype=float)
# No map, try to drive the whole area
if apply_area is None:
self.apply_indx = np.arange(self.empty.size)
else:
self.apply_indx = np.where(apply_area != 0)[0]
def suppress_enhance(self, x):
result = x * 0
result -= trapezoid(x, self.delay_amp, self.delay_peak, self.delay_width, self.delay_slope)
result += trapezoid(
x,
self.enhance_amp,
self.enhance_peak,
self.enhance_width,
self.enhance_slope,
)
return result
def season_len(self, conditions):
ra_mid_season = (conditions.sunRA + np.pi) % (2.0 * np.pi)
angle_to_mid_season = np.abs(conditions.ra - ra_mid_season)
over = np.where(IntRounded(angle_to_mid_season) > IntRounded(np.pi))
angle_to_mid_season[over] = 2.0 * np.pi - angle_to_mid_season[over]
return angle_to_mid_season
def _calc_value(self, conditions, indx=None):
# copy an empty array
result = self.empty.copy()
if indx is not None:
ind = np.intersect1d(indx, self.apply_indx)
else:
ind = self.apply_indx
if np.size(ind) == 0:
result = 0
else:
mjd_diff = conditions.mjd - self.survey_features["last_observed"].feature[ind]
result[ind] += self.suppress_enhance(mjd_diff)
if self.season_limit is not None:
radians_to_midseason = self.season_len(conditions)
outside_season = np.where(radians_to_midseason > self.season_limit)
result[outside_season] = 0
return result
[docs]
class AzimuthBasisFunction(BaseBasisFunction):
"""Reward staying in the same azimuth range.
Possibly better than using slewtime, especially when selecting a
large area of sky.
"""
def __init__(self, nside=None):
super(AzimuthBasisFunction, self).__init__(nside=nside)
def _calc_value(self, conditions, indx=None):
az_dist = conditions.az - conditions.telAz
az_dist = az_dist % (2.0 * np.pi)
over = np.where(az_dist > np.pi)
az_dist[over] = 2.0 * np.pi - az_dist[over]
# Normalize sp between 0 and 1
result = az_dist / np.pi
return result
[docs]
class AzModuloBasisFunction(BaseBasisFunction):
"""Try to replicate the Rothchild et al cadence forcing by
only observing on limited az ranges per night.
Parameters
----------
az_limits : `list` of `float` pairs (None)
The azimuth limits (degrees) to use.
"""
def __init__(self, nside=None, az_limits=None, out_of_bounds_val=-1.0):
super(AzModuloBasisFunction, self).__init__(nside=nside)
self.result = np.ones(hp.nside2npix(self.nside))
if az_limits is None:
spread = 100.0 / 2.0
self.az_limits = np.radians(
[
[360 - spread, spread],
[90.0 - spread, 90.0 + spread],
[180.0 - spread, 180.0 + spread],
]
)
else:
self.az_limits = np.radians(az_limits)
self.mod_val = len(self.az_limits)
self.out_of_bounds_val = out_of_bounds_val
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
az_lim = self.az_limits[np.max(conditions.night) % self.mod_val]
if az_lim[0] < az_lim[1]:
out_pix = np.where(
(IntRounded(conditions.az) < IntRounded(az_lim[0]))
| (IntRounded(conditions.az) > IntRounded(az_lim[1]))
)
else:
out_pix = np.where(
(IntRounded(conditions.az) < IntRounded(az_lim[0]))
| (IntRounded(conditions.az) > IntRounded(az_lim[1]))
)[0]
result[out_pix] = self.out_of_bounds_val
return result
[docs]
class DecModuloBasisFunction(BaseBasisFunction):
"""Emphasize dec bands on a nightly varying basis
Parameters
----------
dec_limits : `list` of `float` pairs (None)
The azimuth limits (degrees) to use.
"""
def __init__(self, nside=None, dec_limits=None, out_of_bounds_val=-1.0):
super(DecModuloBasisFunction, self).__init__(nside=nside)
npix = hp.nside2npix(nside)
hpids = np.arange(npix)
ra, dec = _hpid2_ra_dec(nside, hpids)
self.results = []
if dec_limits is None:
self.dec_limits = np.radians([[-90.0, -32.8], [-32.8, -12.0], [-12.0, 35.0]])
else:
self.dec_limits = np.radians(dec_limits)
self.mod_val = len(self.dec_limits)
self.out_of_bounds_val = out_of_bounds_val
for limits in self.dec_limits:
good = np.where((dec >= limits[0]) & (dec < limits[1]))[0]
tmp = np.zeros(npix)
tmp[good] = 1
self.results.append(tmp)
def _calc_value(self, conditions, indx=None):
night_index = np.max(conditions.night % self.mod_val)
result = self.results[night_index]
return result
[docs]
class MapModuloBasisFunction(BaseBasisFunction):
"""Similar to Dec_modulo, but now use input masks
Parameters
----------
inmaps : `list` of hp arrays
"""
def __init__(self, inmaps):
nside = hp.npix2nside(np.size(inmaps[0]))
super(MapModuloBasisFunction, self).__init__(nside=nside)
self.maps = inmaps
self.mod_val = len(inmaps)
def _calc_value(self, conditions, indx=None):
indx = np.max(conditions.night % self.mod_val)
result = self.maps[indx]
return result
[docs]
class GoodSeeingBasisFunction(BaseBasisFunction):
"""Drive observations in good seeing conditions"""
def __init__(
self,
nside=None,
filtername="r",
footprint=None,
fwhm_eff_limit=0.8,
mag_diff=0.75,
):
super(GoodSeeingBasisFunction, self).__init__(nside=nside)
self.filtername = filtername
self.fwhm_eff_limit = IntRounded(fwhm_eff_limit)
if footprint is None:
footprints, labels = get_current_footprint(nside=self.nside)
fp = footprints[self.filtername]
else:
fp = footprint
self.out_of_bounds = np.where(fp == 0)[0]
self.result = fp * 0
self.mag_diff = IntRounded(mag_diff)
self.survey_features = {}
self.survey_features["coadd_depth_all"] = features.CoaddedDepth(
filtername=self.filtername, nside=self.nside
)
self.survey_features["coadd_depth_good"] = features.CoaddedDepth(
filtername=self.filtername, nside=self.nside, fwhm_eff_limit=fwhm_eff_limit
)
def _calc_value(self, conditions, **kwargs):
# Seeing is "bad"
if IntRounded(conditions.FWHMeff[self.filtername].min()) > self.fwhm_eff_limit:
return 0
result = self.result.copy()
diff = (
self.survey_features["coadd_depth_all"].feature - self.survey_features["coadd_depth_good"].feature
)
# Where are there things we want to observe?
good_pix = np.where(
(IntRounded(diff) > self.mag_diff)
& (IntRounded(conditions.FWHMeff[self.filtername]) <= self.fwhm_eff_limit)
)
# Hm, should this scale by the mag differences? Probably.
result[good_pix] = diff[good_pix]
result[self.out_of_bounds] = 0
return result
[docs]
class TemplateGenerateBasisFunction(BaseBasisFunction):
"""Emphasize areas that have not been observed in a long time
Parameters
----------
nside : `int`, optional
The nside for the basis function and feature.
Default None uses `set_default_nside()`.
day_gap : `float`, optional
How long to wait before boosting the reward (days).
Default of 250 pushes visits into parts of the sky which missed
a significant chunk of a season.
filtername : `str`, optional
The filter to consider when tracking observations.
footprint : `np.array`, (N,)
The indices of the healpixels to apply the boost to.
Default None will call `get_current_footprint()`.
"""
def __init__(self, nside=None, day_gap=250.0, filtername="r", footprint=None):
super(TemplateGenerateBasisFunction, self).__init__(nside=nside)
self.day_gap = day_gap
self.filtername = filtername
self.survey_features = {}
self.survey_features["Last_observed"] = features.LastObserved(filtername=filtername)
self.result = np.zeros(hp.nside2npix(self.nside))
if footprint is None:
footprints, labels = get_current_footprint(self.nside)
fp = footprints[self.filtername]
else:
fp = footprint
self.out_of_bounds = np.where(fp == 0)
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, **kwargs):
result = self.result.copy()
overdue = np.where(
(IntRounded(conditions.mjd - self.survey_features["Last_observed"].feature))
> IntRounded(self.day_gap)
)
result[overdue] = 1
result[self.out_of_bounds] = 0
return result
[docs]
class LimitRepeatBasisFunction(BaseBasisFunction):
"""Mask out pixels that haven't been observed in the night.
Parameters
----------
nside : `int` or None
Nside for the basis function values. Default None will use
default nside.
filtername : `str` or None
Filter to consider when tracking number of acquired observations.
"""
def __init__(self, nside=None, filtername="r", n_limit=2):
super(LimitRepeatBasisFunction, self).__init__(nside=nside)
self.filtername = filtername
self.n_limit = n_limit
self.survey_features = {}
self.survey_features["n_obs"] = features.NObsNight(nside=nside, filtername=filtername)
self.result = np.zeros(hp.nside2npix(self.nside))
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, **kwargs):
result = self.result.copy()
good_pix = np.where(self.survey_features["n_obs"].feature >= self.n_limit)[0]
result[good_pix] = 1
return result
[docs]
class ObservedTwiceBasisFunction(BaseBasisFunction):
"""Mask out pixels that haven't been observed in the night"""
def __init__(self, nside=None, filtername="r", n_obs_needed=2, n_obs_in_filt_needed=1):
super(ObservedTwiceBasisFunction, self).__init__(nside=nside)
self.n_obs_needed = n_obs_needed
self.n_obs_in_filt_needed = n_obs_in_filt_needed
self.filtername = filtername
self.survey_features = {}
self.survey_features["n_obs_infilt"] = features.NObsNight(nside=nside, filtername=filtername)
self.survey_features["n_obs_all"] = features.NObsNight(nside=nside, filtername="")
self.result = np.zeros(hp.nside2npix(self.nside))
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, **kwargs):
result = self.result.copy()
good_pix = np.where(
(self.survey_features["n_obs_infilt"].feature >= self.n_obs_in_filt_needed)
& (self.survey_features["n_obs_all"].feature >= self.n_obs_needed)
)[0]
result[good_pix] = 1
return result
[docs]
class VisitGap(BaseBasisFunction):
"""Basis function to create a visit gap based on the survey note field.
Parameters
----------
note : str
Value of the observation "scheduler_note" field to be masked.
filter_names : list [str], optional
List of filter names that will be considered when evaluating
if the gap has passed.
gap_min : float (optional)
Time gap (default=25, in minutes).
penalty_val : float or np.nan
Value of the penalty to apply (default is np.nan).
Notes
-----
When a list of filters is provided, all filters must be observed before
the gap requirement will be activated, and once activated, only
observations in these filters will be evaluated in context of whether
the last observation was at least gap in the past.
"""
def __init__(self, note, filter_names=None, gap_min=25.0, penalty_val=np.nan):
super().__init__()
self.penalty_val = penalty_val
self.gap = gap_min / 60.0 / 24.0
self.filter_names = filter_names
self.survey_features = dict()
if self.filter_names is not None:
for filtername in self.filter_names:
self.survey_features[f"NoteLastObserved::{filtername}"] = features.NoteLastObserved(
note=note, filtername=filtername
)
else:
self.survey_features["NoteLastObserved"] = features.NoteLastObserved(note=note)
[docs]
def check_feasibility(self, conditions):
notes_last_observed = [last_observed.feature for last_observed in self.survey_features.values()]
if any([last_observed is None for last_observed in notes_last_observed]):
return True
after_gap = [conditions.mjd - last_observed > self.gap for last_observed in notes_last_observed]
return all(after_gap)
def _calc_value(self, conditions, indx=None):
return 1.0 if self.check_feasibility(conditions) else self.penalty_val
[docs]
class AvoidDirectWind(BaseBasisFunction):
"""Mask the sky where the wind pressure exceeds `wind_speed_maximum`.
Parameters
----------
wind_speed_maximum : `float`, optional
Wind speed to mark regions as unobservable (in m/s).
nside : `int`, optional
The nside for the basis function. Default None uses
`set_default_nside()`.
"""
def __init__(self, wind_speed_maximum=20.0, nside=None):
super().__init__(nside=nside)
self.wind_speed_maximum = wind_speed_maximum
def _calc_value(self, conditions, indx=None):
reward_map = np.zeros(hp.nside2npix(self.nside))
if conditions.wind_speed is None or conditions.wind_direction is None:
return reward_map
wind_pressure = conditions.wind_speed * np.cos(conditions.az - conditions.wind_direction)
reward_map -= wind_pressure**2.0
mask = wind_pressure > self.wind_speed_maximum
reward_map[mask] = np.nan
return reward_map
[docs]
class BalanceVisits(BaseBasisFunction):
"""Balance visits across multiple surveys.
Parameters
----------
nobs_reference : `int`
Expected number of observations across all interested surveys.
note_survey : `str`
Note value for the current survey.
note_interest : `str`
Substring with the name of interested surveys to be accounted.
nside : `int`
Healpix map resolution.
Notes
-----
This basis function is designed to balance the reward of a group of
surveys, such that the group get a reward boost based on the required
collective number of observations.
For example, if you have 3 surveys (e.g. SURVEY_A_REGION_1,
SURVEY_A_REGION_2, SURVEY_A_REGION_3), when one of them is observed
once (SURVEY_A_REGION_1) they all get a small reward boost proportional
to the collective number of observations (`nobs_reference`). Further
observations of SURVEY_A_REGION_1 would now cause the other surveys
to gain a reward boost in relative to it.
"""
def __init__(self, nobs_reference, note_survey, note_interest, nside=None):
super().__init__(nside=nside)
self.nobs_reference = nobs_reference
self.survey_features = {}
self.survey_features["n_obs_survey"] = features.NObsSurvey(note=note_survey)
self.survey_features["n_obs_survey_interest"] = features.NObsSurvey(note=note_interest)
def _calc_value(self, conditions, indx=None):
return (1 + np.floor(self.survey_features["n_obs_survey_interest"].feature / self.nobs_reference)) / (
self.survey_features["n_obs_survey"].feature
if self.survey_features["n_obs_survey"].feature > 0
else 1
)
[docs]
class RewardNObsSequence(BaseBasisFunction):
"""Reward taking a sequence of observations.
Parameters
----------
n_obs_survey : `int`
Number of observations to reward.
note_survey : `str`
The value of the observation note, to take into account.
nside : `int`, optional
Healpix map resolution (ignored).
Notes
-----
This basis function is useful when a survey is composed of more than
one observation (e.g. in different filters) and one wants to make sure
they are all taken together.
"""
def __init__(self, n_obs_survey, note_survey, nside=None):
super().__init__(nside=nside)
self.n_obs_survey = n_obs_survey
self.survey_features = {}
self.survey_features["n_obs_survey"] = features.NObsSurvey(note=note_survey)
def _calc_value(self, conditions, indx=None):
return self.survey_features["n_obs_survey"].feature % self.n_obs_survey
[docs]
class RewardRisingBasisFunction(BaseBasisFunction):
"""Reward parts of the sky that are rising.
Optionally, mask out parts of the sky that are not rising.
This produces a reward that increases
as the field rises toward zenith, then abruptly
falls as the field passes zenith.
Negative hour angles (or hour angles > 180 degrees)
indicate a rising point on the sky.
Parameters
----------
slope : `float`
Sets the 'slope' of how fast the basis function
value changes with hour angle.
penalty_val : `float` or `np.nan`, optional
Sets the value for the part of the sky which is
not rising (hour angle between 0 and 180).
Using a value of np.nan will mask this region of sky,
a value of 0 will just make this non-rewarding.
nside : `int` or None, optional
Nside for the healpix map, default of None uses scheduler default.
"""
def __init__(self, slope=0.1, penalty_val=0, nside=None):
super().__init__(nside=nside)
self.slope = slope
self.penalty_val = penalty_val
# Probably not needed
[docs]
def check_feasibility(self, conditions):
return True
def _calc_value(self, conditions, indx=None):
# HA should be available in the conditions object
value = self.slope * conditions.HA
past_zenith = np.where(conditions.HA < np.pi)
value[past_zenith] = self.penalty_val
return value