__all__ = (
"SolarElongMaskBasisFunction",
"ZenithShadowMaskBasisFunction",
"HaMaskBasisFunction",
"MoonAvoidanceBasisFunction",
"MapCloudBasisFunction",
"PlanetMaskBasisFunction",
"MaskAzimuthBasisFunction",
"SolarElongationMaskBasisFunction",
"AreaCheckMaskBasisFunction",
"AltAzShadowMaskBasisFunction",
)
import warnings
import healpy as hp
import numpy as np
from rubin_scheduler.scheduler.basis_functions import BaseBasisFunction
from rubin_scheduler.scheduler.utils import HpInLsstFov, IntRounded
from rubin_scheduler.utils import Site, _angular_separation, _hpid2_ra_dec
from .basis_functions import send_unused_deprecation_warning
[docs]
class SolarElongMaskBasisFunction(BaseBasisFunction):
"""Mask regions larger than some solar elongation limit
Parameters
----------
elong_limit : float (45)
The limit beyond which to mask (degrees)
"""
def __init__(self, elong_limit=45.0, nside=32):
super(SolarElongMaskBasisFunction, self).__init__(nside=nside)
self.elong_limit = IntRounded(np.radians(elong_limit))
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
to_mask = np.where(IntRounded(conditions.solar_elongation) > self.elong_limit)[0]
result[to_mask] = np.nan
return result
[docs]
class HaMaskBasisFunction(BaseBasisFunction):
"""Limit the sky based on hour angle
Parameters
----------
ha_min : float (None)
The minimum hour angle to accept (hours)
ha_max : float (None)
The maximum hour angle to accept (hours)
"""
def __init__(self, ha_min=None, ha_max=None, nside=32):
super(HaMaskBasisFunction, self).__init__(nside=nside)
self.ha_max = ha_max
self.ha_min = ha_min
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
def _calc_value(self, conditions, **kwargs):
result = self.result.copy()
if self.ha_min is not None:
good = np.where(conditions.HA < (self.ha_min / 12.0 * np.pi))[0]
result[good] = np.nan
if self.ha_max is not None:
good = np.where(conditions.HA > (self.ha_max / 12.0 * np.pi))[0]
result[good] = np.nan
return result
[docs]
class AreaCheckMaskBasisFunction(BaseBasisFunction):
"""Take a list of other mask basis functions, and do an additional
check for area available"""
def __init__(self, bf_list, nside=32, min_area=1000.0):
super(AreaCheckMaskBasisFunction, self).__init__(nside=nside)
self.bf_list = bf_list
self.result = np.zeros(hp.nside2npix(self.nside), dtype=float)
self.min_area = min_area
[docs]
def check_feasibility(self, conditions):
result = True
for bf in self.bf_list:
if not bf.check_feasibility(conditions):
return False
area_map = self.result.copy()
for bf in self.bf_list:
area_map *= bf(conditions)
good_pix = np.where(area_map == 0)[0]
if hp.nside2pixarea(self.nside, degrees=True) * good_pix.size < self.min_area:
result = False
return result
def _calc_value(self, conditions, **kwargs):
result = self.result.copy()
for bf in self.bf_list:
result *= bf(conditions)
return result
[docs]
class SolarElongationMaskBasisFunction(BaseBasisFunction):
"""Mask things at various solar elongations
Parameters
----------
min_elong : float (0)
The minimum solar elongation to consider (degrees).
max_elong : float (60.)
The maximum solar elongation to consider (degrees).
"""
def __init__(self, min_elong=0.0, max_elong=60.0, nside=None, penalty=np.nan):
super(SolarElongationMaskBasisFunction, self).__init__(nside=nside)
self.min_elong = np.radians(min_elong)
self.max_elong = np.radians(max_elong)
self.penalty = penalty
self.result = np.empty(hp.nside2npix(self.nside), dtype=float)
self.result.fill(self.penalty)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
in_range = np.where(
(IntRounded(conditions.solar_elongation) >= IntRounded(self.min_elong))
& (IntRounded(conditions.solar_elongation) <= IntRounded(self.max_elong))
)[0]
result[in_range] = 1
return result
[docs]
class PlanetMaskBasisFunction(BaseBasisFunction):
"""Mask the bright planets.
Parameters
----------
mask_radius : float (3.5)
The radius to mask around a planet (degrees).
planets : list of str (None)
A list of planet names to mask. Defaults to ['venus', 'mars',
'jupiter']. Not including Saturn because it moves really slowly
and has average apparent mag of ~0.4, so fainter than Vega.
"""
def __init__(self, mask_radius=3.5, planets=None, nside=None, scale=1e5):
super(PlanetMaskBasisFunction, self).__init__(nside=nside)
if planets is None:
planets = ["venus", "mars", "jupiter"]
self.planets = planets
self.mask_radius = np.radians(mask_radius)
self.result = np.zeros(hp.nside2npix(nside))
# set up a kdtree. Could maybe use healpy.query_disc instead.
self.in_fov = HpInLsstFov(nside=nside, fov_radius=mask_radius, scale=scale)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
for pn in self.planets:
indices = self.in_fov(
np.max(conditions.planet_positions[pn + "_RA"]),
np.max(conditions.planet_positions[pn + "_dec"]),
)
result[indices] = np.nan
return result
[docs]
class AltAzShadowMaskBasisFunction(BaseBasisFunction):
"""Mask out a range of altitudes and azimuths, including
regions which will enter the mask within `shadow_minutes`.
Masks any alt/az regions as specified by the conditions object in
`conditions.tel_az_limits` and `conditions.tel_alt_limits`,
as well as values as provided by `min_alt`, `max_alt`,
`min_az` and `max_az`.
Parameters
----------
nside : `int` or None
HEALpix nside. Default None will look up the package-wide default.
min_alt : `float`
Minimum altitude to apply to the mask. Default 20 (degrees).
max_alt : `float`
Maximum altitude to allow. Default 82 (degrees).
min_az : `float`
Minimum azimuth value to apply to the mask. Default 0 (degrees).
These azimuth values are absolute azimuth, not cumulative.
The absolute and cumulative azimuth only diverge if the azimuth
range is greater than 360 degrees.
max_az : `float`
Maximum azimuth value to apply to the mask. Default 360 (degrees).
shadow_minutes : `float`
How long to extend masked area in longitude. Default 40 (minutes).
Choose this value based on the time between when a field might
be chosen to be scheduled and when it might be observed.
For pairs, the minimum pair time + some buffer is good.
For sequences, try the length of the sequence + some buffer.
Note that this just sets up the shadow *at* the shadow_minutes time
(and not all times up to shadow_minutes).
"""
def __init__(
self,
nside=None,
min_alt=20.0,
max_alt=86.5,
min_az=0,
max_az=360,
shadow_minutes=40.0,
):
super().__init__(nside=nside)
self.min_alt = np.radians(min_alt)
self.max_alt = np.radians(max_alt)
self.min_az = np.radians(min_az)
self.max_az = np.radians(max_az)
self.shadow_time = shadow_minutes / 60.0 / 24.0 # To days
self.r_min_alt = IntRounded(self.min_alt)
self.r_max_alt = IntRounded(self.max_alt)
def _calc_value(self, conditions, indx=None):
# Basis function value will be 0 except where masked (then np.nan)
result = np.zeros(hp.nside2npix(self.nside), dtype=float)
# Compute the alt,az values in the future. Use the conditions object
# so the results are cached and can be used by other surveys is
# needed. Technically this could fail if the masked region is
# very narrow or shadow time is very large.
future_alt, future_az = conditions.future_alt_az(float(np.max(conditions.mjd)) + self.shadow_time)
r_future_alt = IntRounded(future_alt)
r_current_alt = IntRounded(conditions.alt)
# Check the basis function altitude limits, now and future
result[np.where(r_current_alt < self.r_min_alt)] = np.nan
result[np.where(r_current_alt > self.r_max_alt)] = np.nan
result[np.where(r_future_alt < self.r_min_alt)] = np.nan
result[np.where(r_future_alt > self.r_max_alt)] = np.nan
# Check the conditions objects altitude limits, now and future
if (conditions.tel_alt_limits is not None) and (len(conditions.tel_alt_limits) > 0):
combined = np.zeros(hp.nside2npix(self.nside), dtype=float)
for limits in conditions.tel_alt_limits:
# For conditions-based limits, must add pad
# And remember that discontinuous areas can be allowed
in_bounds = np.ones(hp.nside2npix(self.nside), dtype=float)
min_alt = IntRounded(limits[0] + conditions.altaz_limit_pad)
max_alt = IntRounded(limits[1] - conditions.altaz_limit_pad)
in_bounds[np.where(r_current_alt < min_alt)] = 0
in_bounds[np.where(r_current_alt > max_alt)] = 0
in_bounds[np.where(r_future_alt < min_alt)] = 0
in_bounds[np.where(r_future_alt > max_alt)] = 0
combined += in_bounds
result[np.where(combined == 0)] = np.nan
# And check against the kinematic hard limits.
# These are separate so they don't override user tel_alt_limits.
min_alt = IntRounded(conditions.kinematic_alt_limits[0] + conditions.altaz_limit_pad)
max_alt = IntRounded(conditions.kinematic_alt_limits[1] - conditions.altaz_limit_pad)
result[np.where(r_current_alt < min_alt)] = np.nan
result[np.where(r_current_alt > max_alt)] = np.nan
result[np.where(r_future_alt < min_alt)] = np.nan
result[np.where(r_future_alt > max_alt)] = np.nan
# note that % (mod) is not supported for IntRounded
two_pi = 2 * np.pi
# Check the basis function azimuth limits, now and future
if np.abs(self.max_az - self.min_az) < two_pi:
az_range = (self.max_az - self.min_az) % (two_pi)
out_of_bounds = np.where((conditions.az - self.min_az) % (two_pi) > az_range)[0]
result[out_of_bounds] = np.nan
out_of_bounds = np.where((future_az - self.min_az) % (two_pi) > az_range)[0]
result[out_of_bounds] = np.nan
# Check the conditions objects azimuth limits, now and future
if (conditions.tel_az_limits is not None) and (len(conditions.tel_az_limits) > 0):
combined = np.zeros(hp.nside2npix(self.nside), dtype=float)
for limits in conditions.tel_az_limits:
in_bounds = np.ones(hp.nside2npix(self.nside), dtype=float)
min_az = limits[0]
max_az = limits[1]
if np.abs(max_az - min_az) < two_pi:
az_range = (max_az - min_az) % (two_pi) - 2 * conditions.altaz_limit_pad
out_of_bounds = np.where(
(conditions.az - min_az - conditions.altaz_limit_pad) % (two_pi) > az_range
)[0]
in_bounds[out_of_bounds] = 0
out_of_bounds = np.where(
(future_az - min_az - conditions.altaz_limit_pad) % (two_pi) > az_range
)[0]
in_bounds[out_of_bounds] = 0
combined += in_bounds
result[np.where(combined == 0)] = np.nan
# Check against the kinematic hard limits.
if np.abs(conditions.kinematic_az_limits[1] - conditions.kinematic_az_limits[0]) < two_pi:
az_range = (conditions.kinematic_az_limits[1] - conditions.kinematic_az_limits[0]) % (
two_pi
) - conditions.altaz_limit_pad * 2
out_of_bounds = np.where(
(conditions.az - conditions.kinematic_az_limits[0] - conditions.altaz_limit_pad) % (two_pi)
> az_range
)[0]
result[out_of_bounds] = np.nan
out_of_bounds = np.where(
(future_az - conditions.kinematic_az_limits[0] - conditions.altaz_limit_pad) % (two_pi)
> az_range
)[0]
result[out_of_bounds] = np.nan
return result
[docs]
class ZenithShadowMaskBasisFunction(BaseBasisFunction):
"""Mask the zenith, and things that will soon pass near zenith.
Useful for making sure observations will not be too close to zenith
when they need to be observed again (e.g. for a pair).
Parameters
----------
min_alt : float (20.)
The minimum alititude to alow. Everything lower is masked. (degrees)
max_alt : float (82.)
The maximum altitude to alow. Everything higher is masked. (degrees)
shadow_minutes : float (40.)
Mask anything that will pass through the max alt in the next
shadow_minutes time. (minutes)
"""
def __init__(
self,
nside=None,
min_alt=20.0,
max_alt=82.0,
shadow_minutes=40.0,
penalty=np.nan,
site="LSST",
):
warnings.warn(
"Deprecating ZenithShadowMaskBasisFunction in favor of AltAzShadowMaskBasisFunction.",
DeprecationWarning,
)
super(ZenithShadowMaskBasisFunction, self).__init__(nside=nside)
self.update_on_newobs = False
self.penalty = penalty
self.min_alt = np.radians(min_alt)
self.max_alt = np.radians(max_alt)
self.ra, self.dec = _hpid2_ra_dec(self.nside, np.arange(hp.nside2npix(self.nside)))
self.shadow_minutes = np.radians(shadow_minutes / 60.0 * 360.0 / 24.0)
# Compute the declination band where things could drift into zenith
self.decband = np.zeros(self.dec.size, dtype=float)
self.zenith_radius = np.radians(90.0 - max_alt) / 2.0
site = Site(name=site)
self.lat_rad = site.latitude_rad
self.lon_rad = site.longitude_rad
self.decband[
np.where(
(IntRounded(self.dec) < IntRounded(self.lat_rad + self.zenith_radius))
& (IntRounded(self.dec) > IntRounded(self.lat_rad - self.zenith_radius))
)
] = 1
self.result = np.empty(hp.nside2npix(self.nside), dtype=float)
self.result.fill(self.penalty)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
alt_limit = np.where(
(IntRounded(conditions.alt) > IntRounded(self.min_alt))
& (IntRounded(conditions.alt) < IntRounded(self.max_alt))
)[0]
result[alt_limit] = 1
to_mask = np.where(
(IntRounded(conditions.HA) > IntRounded(2.0 * np.pi - self.shadow_minutes - self.zenith_radius))
& (self.decband == 1)
)
result[to_mask] = np.nan
return result
[docs]
class MoonAvoidanceBasisFunction(BaseBasisFunction):
"""Avoid observing within `moon_distance` of the moon.
Parameters
----------
moon_distance: float (30.)
Minimum allowed moon distance. (degrees)
Notes
-----
As the current specified requirements for the observatory
are "observe more than 30 degrees from the moon", this basis
function simply does that at present. This includes
times the moon is below the horizon or if the moon close to new.
Most likely, this avoidance region should depend on lunar phase,
the filter used for observations, and whether the moon is above
or below the horizon.
"""
def __init__(self, nside=None, moon_distance=30.0):
super(MoonAvoidanceBasisFunction, self).__init__(nside=nside)
self.update_on_newobs = False
self.moon_distance = IntRounded(np.radians(moon_distance))
self.result = np.ones(hp.nside2npix(self.nside), dtype=float)
def _calc_value(self, conditions, indx=None):
result = self.result.copy()
angular_distance = _angular_separation(
conditions.az, conditions.alt, conditions.moon_az, conditions.moon_alt
)
result[IntRounded(angular_distance) < self.moon_distance] = np.nan
return result
class BulkCloudBasisFunction(BaseBasisFunction):
"""Mark healpixels on a map if their cloud values are greater than
the same healpixels on a maximum cloud map.
Parameters
----------
nside: int (default_nside)
The healpix resolution.
max_cloud_map : numpy array (None)
A healpix map showing the maximum allowed cloud values for all
points on the sky
out_of_bounds_val : float (10.)
Point value to give regions where there are no observations
requested
"""
def __init__(self, nside=None, max_cloud_map=None, max_val=0.7, out_of_bounds_val=np.nan):
super(BulkCloudBasisFunction, self).__init__(nside=nside)
self.update_on_newobs = False
if max_cloud_map is None:
self.max_cloud_map = np.zeros(hp.nside2npix(nside), dtype=float) + max_val
else:
self.max_cloud_map = max_cloud_map
self.out_of_bounds_area = np.where(self.max_cloud_map > 1.0)[0]
self.out_of_bounds_val = out_of_bounds_val
self.result = np.ones(hp.nside2npix(self.nside))
def _calc_value(self, conditions, indx=None):
"""
Parameters
----------
indx : list (None)
Index values to compute, if None, full map is computed
Returns
-------
Healpix map where pixels with a cloud value greater than the
max_cloud_map value are marked as unseen.
"""
result = self.result.copy()
clouded = np.where(self.max_cloud_map <= conditions.bulk_cloud)
result[clouded] = self.out_of_bounds_val
return result
[docs]
class MapCloudBasisFunction(BaseBasisFunction):
"""Mark healpixels on a map if their cloud values are greater than
the same healpixels on a maximum cloud map. Currently a placeholder for
when the telemetry stream can include a full sky cloud map.
Parameters
----------
nside: int (default_nside)
The healpix resolution.
max_cloud_map : numpy array (None)
A healpix map showing the maximum allowed cloud values for all
points on the sky
out_of_bounds_val : float (10.)
Point value to give regions where there are no observations
requested
"""
def __init__(self, nside=None, max_cloud_map=None, max_val=0.7, out_of_bounds_val=np.nan):
super(BulkCloudBasisFunction, self).__init__(nside=nside)
self.update_on_newobs = False
if max_cloud_map is None:
self.max_cloud_map = np.zeros(hp.nside2npix(nside), dtype=float) + max_val
else:
self.max_cloud_map = max_cloud_map
self.out_of_bounds_area = np.where(self.max_cloud_map > 1.0)[0]
self.out_of_bounds_val = out_of_bounds_val
self.result = np.ones(hp.nside2npix(self.nside))
def _calc_value(self, conditions, indx=None):
"""
Parameters
----------
indx : list (None)
Index values to compute, if None, full map is computed
Returns
-------
Healpix map where pixels with a cloud value greater than the
max_cloud_map value are marked as unseen.
"""
result = self.result.copy()
clouded = np.where(self.max_cloud_map <= conditions.bulk_cloud)
result[clouded] = self.out_of_bounds_val
return result
[docs]
class MaskAzimuthBasisFunction(BaseBasisFunction):
"""Mask pixels based on azimuth.
Superseded by AltAzShadowMaskBasisFunction.
"""
def __init__(self, nside=None, out_of_bounds_val=np.nan, az_min=0.0, az_max=180.0):
super(MaskAzimuthBasisFunction, self).__init__(nside=nside)
self.az_min = IntRounded(np.radians(az_min))
self.az_max = IntRounded(np.radians(az_max))
self.out_of_bounds_val = out_of_bounds_val
self.result = np.ones(hp.nside2npix(self.nside))
send_unused_deprecation_warning(self.__class__.__name__)
def _calc_value(self, conditions, indx=None):
to_mask = np.where(
(IntRounded(conditions.az) > self.az_min) & (IntRounded(conditions.az) < self.az_max)
)[0]
result = self.result.copy()
result[to_mask] = self.out_of_bounds_val
return result