Source code for rubin_scheduler.scheduler.basis_functions.mask_basis_funcs

__all__ = (
    "SolarElongMaskBasisFunction",
    "HaMaskBasisFunction",
    "MoonAvoidanceBasisFunction",
    "MapCloudBasisFunction",
    "PlanetMaskBasisFunction",
    "SolarElongationMaskBasisFunction",
    "AreaCheckMaskBasisFunction",
    "AltAzShadowMaskBasisFunction",
)

import warnings

import healpy as hp
import numpy as np

from rubin_scheduler.scheduler.basis_functions import BaseBasisFunction
from rubin_scheduler.scheduler.utils import HpInLsstFov, IntRounded
from rubin_scheduler.utils import DEFAULT_NSIDE, _angular_separation, _hp_grow_mask


[docs] class SolarElongMaskBasisFunction(BaseBasisFunction): """Mask regions larger than some solar elongation limit Parameters ---------- elong_limit : float (45) The limit beyond which to mask (degrees) """ def __init__(self, elong_limit=45.0, nside=DEFAULT_NSIDE): msg = ( "SolarElongMaskBasisFunction is scheduled for" " deprecation, swap to SolarElongationMaskBasisFunction." ) warnings.warn(msg, DeprecationWarning) super(SolarElongMaskBasisFunction, self).__init__(nside=nside) self.elong_limit = IntRounded(np.radians(elong_limit)) self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) def _calc_value(self, conditions, indx=None): result = self.result.copy() to_mask = np.where(IntRounded(conditions.solar_elongation) > self.elong_limit)[0] result[to_mask] = np.nan return result
[docs] class HaMaskBasisFunction(BaseBasisFunction): """Limit the sky based on hour angle Parameters ---------- ha_min : float (None) The minimum hour angle to accept (hours) ha_max : float (None) The maximum hour angle to accept (hours) """ def __init__(self, ha_min=None, ha_max=None, nside=DEFAULT_NSIDE): super(HaMaskBasisFunction, self).__init__(nside=nside) self.ha_max = ha_max self.ha_min = ha_min self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) def _calc_value(self, conditions, **kwargs): result = self.result.copy() if self.ha_min is not None: good = np.where(conditions.HA < (self.ha_min / 12.0 * np.pi))[0] result[good] = np.nan if self.ha_max is not None: good = np.where(conditions.HA > (self.ha_max / 12.0 * np.pi))[0] result[good] = np.nan return result
[docs] class AreaCheckMaskBasisFunction(BaseBasisFunction): """Take a list of other mask basis functions, and do an additional check for area available""" def __init__(self, bf_list, nside=DEFAULT_NSIDE, min_area=1000.0): super(AreaCheckMaskBasisFunction, self).__init__(nside=nside) self.bf_list = bf_list self.result = np.zeros(hp.nside2npix(self.nside), dtype=float) self.min_area = min_area
[docs] def check_feasibility(self, conditions): result = True for bf in self.bf_list: if not bf.check_feasibility(conditions): return False area_map = self.result.copy() for bf in self.bf_list: area_map *= bf(conditions) good_pix = np.where(area_map == 0)[0] if hp.nside2pixarea(self.nside, degrees=True) * good_pix.size < self.min_area: result = False return result
def _calc_value(self, conditions, **kwargs): result = self.result.copy() for bf in self.bf_list: result *= bf(conditions) return result
[docs] class SolarElongationMaskBasisFunction(BaseBasisFunction): """Mask things at various solar elongations Parameters ---------- min_elong : float (0) The minimum solar elongation to consider (degrees). max_elong : float (60.) The maximum solar elongation to consider (degrees). """ def __init__(self, min_elong=0.0, max_elong=60.0, nside=DEFAULT_NSIDE, penalty=np.nan): super(SolarElongationMaskBasisFunction, self).__init__(nside=nside) self.min_elong = np.radians(min_elong) self.max_elong = np.radians(max_elong) self.penalty = penalty self.result = np.empty(hp.nside2npix(self.nside), dtype=float) self.result.fill(self.penalty) def _calc_value(self, conditions, indx=None): result = self.result.copy() in_range = np.where( (IntRounded(conditions.solar_elongation) >= IntRounded(self.min_elong)) & (IntRounded(conditions.solar_elongation) <= IntRounded(self.max_elong)) )[0] result[in_range] = 1 return result
[docs] class PlanetMaskBasisFunction(BaseBasisFunction): """Mask the bright planets. Parameters ---------- mask_radius : float (3.5) The radius to mask around a planet (degrees). planets : list of str (None) A list of planet names to mask. Defaults to ['venus', 'mars', 'jupiter']. Not including Saturn because it moves really slowly and has average apparent mag of ~0.4, so fainter than Vega. """ def __init__(self, mask_radius=3.5, planets=None, nside=DEFAULT_NSIDE, scale=1e5): super(PlanetMaskBasisFunction, self).__init__(nside=nside) if planets is None: planets = ["venus", "mars", "jupiter"] self.planets = planets self.mask_radius = np.radians(mask_radius) self.result = np.zeros(hp.nside2npix(nside)) # set up a kdtree. Could maybe use healpy.query_disc instead. self.in_fov = HpInLsstFov(nside=nside, fov_radius=mask_radius, scale=scale) def _calc_value(self, conditions, indx=None): result = self.result.copy() for pn in self.planets: indices = self.in_fov( np.max(conditions.planet_positions[pn + "_RA"]), np.max(conditions.planet_positions[pn + "_dec"]), ) result[indices] = np.nan return result
[docs] class AltAzShadowMaskBasisFunction(BaseBasisFunction): """Mask out a range of altitudes and azimuths, including regions which will enter the mask within `shadow_minutes`. Masks any alt/az regions as specified by the conditions object in `conditions.sky_az_limits` and `conditions.sky_alt_limits`, as well as values as provided by `min_alt`, `max_alt`, `min_az` and `max_az`. Parameters ---------- nside : `int` or None HEALpix nside. Default None will look up the package-wide default. min_alt : `float` Minimum altitude to apply to the mask. Default 20 (degrees). max_alt : `float` Maximum altitude to allow. Default 82 (degrees). min_az : `float` Minimum azimuth value to apply to the mask. Default 0 (degrees). These azimuth values are absolute azimuth, not cumulative. The absolute and cumulative azimuth only diverge if the azimuth range is greater than 360 degrees. max_az : `float` Maximum azimuth value to apply to the mask. Default 360 (degrees). shadow_minutes : `float` How long to extend masked area in longitude. Default 40 (minutes). Choose this value based on the time between when a field might be chosen to be scheduled and when it might be observed. For pairs, the minimum pair time + some buffer is good. For sequences, try the length of the sequence + some buffer. Note that this just sets up the shadow *at* the shadow_minutes time (and not all times up to shadow_minutes). pad : `float` The value by which to pad the telescope limits, to avoid healpix values mapping into pointings from the field tesselations which are actually out of bounds. This should typically be a bit more than the radius of the fov. (degrees). """ def __init__( self, nside=DEFAULT_NSIDE, min_alt=20.0, max_alt=86.5, min_az=0, max_az=360, shadow_minutes=40.0, pad=3.0, scale=1000, ): super().__init__(nside=nside) self.min_alt = np.radians(min_alt) self.max_alt = np.radians(max_alt) self.min_az = np.radians(min_az) self.max_az = np.radians(max_az) self.shadow_time = shadow_minutes / 60.0 / 24.0 # To days self.pad = np.radians(pad) self.r_min_alt = IntRounded(self.min_alt) self.r_max_alt = IntRounded(self.max_alt) self.scale = scale def _calc_value(self, conditions, indx=None): # Basis function value will be 0 except where masked (then np.nan) result = np.zeros(hp.nside2npix(self.nside), dtype=float) # Compute the alt,az values in the future. Use the conditions object # so the results are cached and can be used by other surveys is # needed. Technically this could fail if the masked region is # very narrow or shadow time is very large. future_alt, future_az = conditions.future_alt_az(float(np.max(conditions.mjd)) + self.shadow_time) r_future_alt = IntRounded(future_alt) r_current_alt = IntRounded(conditions.alt) # Check the basis function altitude limits, now and future result[np.where(r_current_alt < self.r_min_alt)] = np.nan result[np.where(r_current_alt > self.r_max_alt)] = np.nan result[np.where(r_future_alt < self.r_min_alt)] = np.nan result[np.where(r_future_alt > self.r_max_alt)] = np.nan # Check the conditions objects 'sky_alt_limit', now and future if (conditions.sky_alt_limits is not None) and (len(conditions.sky_alt_limits) > 0): combined = np.zeros(hp.nside2npix(self.nside), dtype=float) for limits in conditions.sky_alt_limits: # For conditions-based limits, must add pad # And remember that discontinuous areas can be allowed in_bounds = np.ones(hp.nside2npix(self.nside), dtype=float) min_alt = IntRounded(limits[0]) max_alt = IntRounded(limits[1]) in_bounds[np.where(r_current_alt < min_alt)] = 0 in_bounds[np.where(r_current_alt > max_alt)] = 0 in_bounds[np.where(r_future_alt < min_alt)] = 0 in_bounds[np.where(r_future_alt > max_alt)] = 0 combined += in_bounds result[np.where(combined == 0)] = np.nan # And check against the telescope 'tel_alt_limits'. # The tel_alt_limits could be combined with the sky_alt_limits, # but it's a bit tricky because sky_alt_limits are (potentially) # disjoint allowable areas, while the tel_alt_limits are a single # wide set of allowable area which explicitly disallows anything # outside that range. The az limits versions are similar. if conditions.tel_alt_limits is not None: min_alt = IntRounded(conditions.tel_alt_limits[0]) max_alt = IntRounded(conditions.tel_alt_limits[1]) result[np.where(r_current_alt < min_alt)] = np.nan result[np.where(r_current_alt > max_alt)] = np.nan result[np.where(r_future_alt < min_alt)] = np.nan result[np.where(r_future_alt > max_alt)] = np.nan # note that % (mod) is not supported for IntRounded two_pi = 2 * np.pi # Check the basis function azimuth limits, now and future if np.abs(self.max_az - self.min_az) < two_pi: az_range = (self.max_az - self.min_az) % (two_pi) out_of_bounds = np.where((conditions.az - self.min_az) % (two_pi) > az_range)[0] result[out_of_bounds] = np.nan out_of_bounds = np.where((future_az - self.min_az) % (two_pi) > az_range)[0] result[out_of_bounds] = np.nan # Check the conditions objects azimuth limits, now and future if (conditions.sky_az_limits is not None) and (len(conditions.sky_az_limits) > 0): combined = np.zeros(hp.nside2npix(self.nside), dtype=float) for limits in conditions.sky_az_limits: in_bounds = np.ones(hp.nside2npix(self.nside), dtype=float) min_az = limits[0] max_az = limits[1] if np.abs(max_az - min_az) < two_pi: az_range = (max_az - min_az) % (two_pi) out_of_bounds = np.where((conditions.az - min_az) % (two_pi) > az_range)[0] in_bounds[out_of_bounds] = 0 out_of_bounds = np.where((future_az - min_az) % (two_pi) > az_range)[0] in_bounds[out_of_bounds] = 0 combined += in_bounds result[np.where(combined == 0)] = np.nan # Check against the kinematic hard limits. if conditions.tel_az_limits is not None: if np.abs(conditions.tel_az_limits[1] - conditions.tel_az_limits[0]) < two_pi: az_range = (conditions.tel_az_limits[1] - conditions.tel_az_limits[0]) % (two_pi) out_of_bounds = np.where((conditions.az - conditions.tel_az_limits[0]) % (two_pi) > az_range)[ 0 ] result[out_of_bounds] = np.nan out_of_bounds = np.where((future_az - conditions.tel_az_limits[0]) % (two_pi) > az_range)[0] result[out_of_bounds] = np.nan # Grow the resulting mask by self.pad, to avoid field centers # falling outside the boundaries of what is actually reachable. if self.pad > 0: mask_indx = np.where(np.isnan(result))[0] to_mask_indx = _hp_grow_mask(self.nside, tuple(mask_indx), scale=self.scale, grow_size=self.pad) result[to_mask_indx] = np.nan return result
[docs] class MoonAvoidanceBasisFunction(BaseBasisFunction): """Avoid observing within `moon_distance` of the moon. Parameters ---------- moon_distance: float (30.) Minimum allowed moon distance. (degrees) Notes ----- As the current specified requirements for the observatory are "observe more than 30 degrees from the moon", this basis function simply does that at present. This includes times the moon is below the horizon or if the moon close to new. Most likely, this avoidance region should depend on lunar phase, the band used for observations, and whether the moon is above or below the horizon. """ def __init__(self, nside=DEFAULT_NSIDE, moon_distance=30.0): super(MoonAvoidanceBasisFunction, self).__init__(nside=nside) self.update_on_newobs = False self.moon_distance = IntRounded(np.radians(moon_distance)) self.result = np.ones(hp.nside2npix(self.nside), dtype=float) def _calc_value(self, conditions, indx=None): result = self.result.copy() angular_distance = _angular_separation( conditions.az, conditions.alt, conditions.moon_az, conditions.moon_alt ) result[IntRounded(angular_distance) < self.moon_distance] = np.nan return result
class BulkCloudBasisFunction(BaseBasisFunction): """Mark healpixels on a map if their cloud values are greater than the same healpixels on a maximum cloud map. Parameters ---------- nside: int (default_nside) The healpix resolution. max_cloud_map : numpy array (None) A healpix map showing the maximum allowed cloud values for all points on the sky out_of_bounds_val : float (10.) Point value to give regions where there are no observations requested """ def __init__(self, nside=DEFAULT_NSIDE, max_cloud_map=None, max_val=0.7, out_of_bounds_val=np.nan): super(BulkCloudBasisFunction, self).__init__(nside=nside) self.update_on_newobs = False if max_cloud_map is None: self.max_cloud_map = np.zeros(hp.nside2npix(nside), dtype=float) + max_val else: self.max_cloud_map = max_cloud_map self.out_of_bounds_area = np.where(self.max_cloud_map > 1.0)[0] self.out_of_bounds_val = out_of_bounds_val self.result = np.ones(hp.nside2npix(self.nside)) def _calc_value(self, conditions, indx=None): """ Parameters ---------- indx : list (None) Index values to compute, if None, full map is computed Returns ------- Healpix map where pixels with a cloud value greater than the max_cloud_map value are marked as unseen. """ result = self.result.copy() clouded = np.where(self.max_cloud_map <= conditions.bulk_cloud) result[clouded] = self.out_of_bounds_val return result
[docs] class MapCloudBasisFunction(BaseBasisFunction): """Mark healpixels on a map if their cloud values are greater than the same healpixels on a maximum cloud map. Currently a placeholder for when the telemetry stream can include a full sky cloud map. Parameters ---------- nside: int (default_nside) The healpix resolution. max_cloud_map : numpy array (None) A healpix map showing the maximum allowed cloud values for all points on the sky out_of_bounds_val : float (10.) Point value to give regions where there are no observations requested """ def __init__(self, nside=DEFAULT_NSIDE, max_cloud_map=None, max_val=0.7, out_of_bounds_val=np.nan): super().__init__(nside=nside) self.update_on_newobs = False if max_cloud_map is None: self.max_cloud_map = np.zeros(hp.nside2npix(nside), dtype=float) + max_val else: self.max_cloud_map = max_cloud_map self.out_of_bounds_area = np.where(self.max_cloud_map > 1.0)[0] self.out_of_bounds_val = out_of_bounds_val self.result = np.ones(hp.nside2npix(self.nside)) def _calc_value(self, conditions, indx=None): """ Parameters ---------- indx : list (None) Index values to compute, if None, full map is computed Returns ------- Healpix map where pixels with a cloud value greater than the max_cloud_map value are marked as unseen. """ result = self.result.copy() clouded = np.where(self.max_cloud_map <= conditions.bulk_cloud) result[clouded] = self.out_of_bounds_val return result