Source code for rubin_scheduler.scheduler.basis_functions.rolling_funcs

__all__ = ("FootprintBasisFunction",)

import warnings

import numpy as np

from rubin_scheduler.scheduler import features, utils
from rubin_scheduler.scheduler.basis_functions import BaseBasisFunction
from rubin_scheduler.scheduler.utils import ConstantFootprint


def send_unused_deprecation_warning(name):
    message = (
        f"The basis function {name} is not in use by the current "
        "baseline scheduler and may be deprecated shortly. "
        "Please contact the rubin_scheduler maintainers if "
        "this is in use elsewhere."
    )
    warnings.warn(message, FutureWarning)


[docs] class FootprintBasisFunction(BaseBasisFunction): """Basis function that tries to maintain a uniformly covered footprint Parameters ---------- filtername : `str`, optional The filter for this footprint. Default r. nside : `int`, optional The nside for the basis function and features. Default None uses `~utils.set_default_nside()` footprint : `~rubin_scheduler.scheduler.utils.Footprint` object The desired footprint. The default will set this to None, but in general this is really not desirable. In order to make default a kwarg, a current baseline footprint is set up with a Constant footprint (not rolling, not even season aware). out_of_bounds_val : `float`, optional The value to set the basis function for regions that are not in the footprint. Default -10, np.nan is another good value to use. """ def __init__( self, filtername="r", nside=None, footprint=None, out_of_bounds_val=-10.0, ): super().__init__(nside=nside, filtername=filtername) if footprint is None: # This is useful as a backup, but really footprint SHOULD # be specified when basis function is set up. # This just uses whole survey, but doesn't set up rolling. warnings.warn("No Footprint set, using a constant default.") target_maps, labels = utils.get_current_footprint(self.nside) fp = ConstantFootprint(nside=self.nside) for f in "ugrizy": fp.set_footprint(f, target_maps[f]) self.footprint = footprint self.survey_features = {} # All the observations in all filters self.survey_features["N_obs_all"] = features.NObservations(nside=self.nside, filtername=None) self.survey_features["N_obs"] = features.NObservations(nside=self.nside, filtername=filtername) # should probably actually loop over all the target maps? self.out_of_bounds_area = np.where(footprint.get_footprint(self.filtername) <= 0)[0] self.out_of_bounds_val = out_of_bounds_val def _calc_value(self, conditions, indx=None): # Find out what the footprint object thinks we should have been # observed desired_footprint_normed = self.footprint(conditions.mjd)[self.filtername] # Compute how many observations we should have on the sky desired = desired_footprint_normed * np.sum(self.survey_features["N_obs_all"].feature) result = desired - self.survey_features["N_obs"].feature result[self.out_of_bounds_area] = self.out_of_bounds_val return result