Source code for rubin_scheduler.scheduler.utils.footprints
"""Footprints: Take sky area maps and turn them into dynamic `footprint`
objects which understand seasons and time, in order to weight area on sky
appropriately for a given time.
"""
__all__ = (
"calc_norm_factor",
"calc_norm_factor_array",
"StepLine",
"Footprints",
"Footprint",
"StepSlopes",
"ConstantFootprint",
"BasePixelEvolution",
"slice_wfd_area_quad",
"slice_wfd_indx",
"slice_quad_galactic_cut",
"make_rolling_footprints",
)
import warnings
import healpy as hp
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from rubin_scheduler.utils import DEFAULT_NSIDE, _hpid2_ra_dec
from .sky_area import CurrentAreaMap
[docs]
def make_rolling_footprints(
fp_hp=None,
mjd_start=60218.0,
sun_ra_start=3.27717639,
nslice=2,
scale=0.8,
nside=DEFAULT_NSIDE,
wfd_indx=None,
order_roll=0,
n_cycles=None,
n_constant_start=2,
n_constant_end=6,
verbose=False,
uniform=True,
):
"""
Generate rolling footprints
Parameters
----------
fp_hp : dict-like
A dict with band name keys and HEALpix map values.
Default None will load CurrentAreaMap. Assumes
WFD is where r-band is 1.
mjd_start : `float`
The starting date of the survey.
sun_ra_start : `float`
The RA of the sun at the start of the survey
nslice : `int`
How much to slice the sky up. Can be 2, 3, 4, or 6.
scale : `float`
The strength of the rolling, value of 1 is full power rolling.
Zero is no rolling.
wfd_indx : array of ints
The indices of the HEALpix map that are to be included in the rolling.
order_roll : `int`
Change the order of when bands roll. Default 0.
n_cycles : `int`
Number of complete rolling cycles to attempt. If None, defaults to 3
full cycles for nslice=2, 2 cycles for nslice=3 or 4, and 1 cycle for
nslice=6.
n_constant_start : `int`
The number of constant non-rolling seasons to start with. Anything
less than 2 will start rolling too early near Y1. Defaults to 2.
n_constant_end : `int`
The number of constant seasons to end the survey with. Defaults to 6.
Returns
-------
Footprints object
"""
if fp_hp is None:
sky = CurrentAreaMap(nside=nside)
footprints, labels = sky.return_maps()
fp_hp = {}
for key in footprints.dtype.names:
fp_hp[key] = footprints[key]
nc_default = {2: 3, 3: 2, 4: 2, 6: 1}
if n_cycles is None:
n_cycles = nc_default[nslice]
hp_footprints = fp_hp
D = 1.0 - scale
U = nslice - D * (nslice - 1)
start = [1.0] * n_constant_start
# After n_cycles, just go to no-rolling for 6 years.
end = [1.0] * n_constant_end
rolling = [U] + [D] * (nslice - 1)
rolling = np.roll(rolling, order_roll).tolist()
all_slopes = []
if uniform:
extra_constant = [1]
else:
extra_constant = []
for i in range(nslice):
_roll = np.roll(rolling, i).tolist() + extra_constant
all_slopes.append(start + _roll * n_cycles + end)
for i in range(nslice):
_roll = np.roll(rolling, i).tolist() + extra_constant
_roll = [_roll[-1]] + _roll[1:-1] + [_roll[0]]
all_slopes.append(start + _roll * n_cycles + end)
dvals = {
1: "1",
D: "D",
U: "U",
}
abc = ["a", "b", "c", "d", "e", "f", "g", "h"]
slice_names = ["slice %s" % abc[i] for i in range(nslice)]
for i, s in enumerate(all_slopes):
if i >= nslice:
sname = slice_names[i - nslice] + " w/ ra - sun_ra in [90, 270]"
else:
sname = slice_names[i] + " w/ ra - sun_ra in [270, 90]"
if verbose:
print(sname + ": " + " ".join([dvals[x] for x in s]))
fp_non_wfd = Footprint(mjd_start, sun_ra_start=sun_ra_start, nside=nside)
rolling_footprints = []
for i in range(len(all_slopes)):
step_func = StepSlopes(rise=all_slopes[i])
rolling_footprints.append(
Footprint(
mjd_start,
sun_ra_start=sun_ra_start,
step_func=step_func,
nside=nside,
)
)
wfd = hp_footprints["r"] * 0
if wfd_indx is None:
wfd_indx = np.where(hp_footprints["r"] == 1)[0]
wfd[wfd_indx] = 1
non_wfd_indx = np.where(wfd == 0)[0]
if uniform:
split_wfd_indices = slice_quad_galactic_cut(
hp_footprints,
nslice=nslice,
wfd_indx=wfd_indx,
ra_range=(sun_ra_start + 1.5 * np.pi, sun_ra_start + np.pi / 2),
)
split_wfd_indices_delayed = slice_quad_galactic_cut(
hp_footprints,
nslice=nslice,
wfd_indx=wfd_indx,
ra_range=(sun_ra_start + np.pi / 2, sun_ra_start + 1.5 * np.pi),
)
else:
split_wfd_indices = slice_quad_galactic_cut(hp_footprints, nslice=nslice, wfd_indx=wfd_indx)
for key in hp_footprints:
temp = hp_footprints[key] + 0
temp[wfd_indx] = 0
fp_non_wfd.set_footprint(key, temp)
for i in range(nslice):
# make a copy of the current band
temp = hp_footprints[key] + 0
# Set the non-rolling area to zero
temp[non_wfd_indx] = 0
indx = split_wfd_indices[i]
# invert the indices
ze = temp * 0
ze[indx] = 1
temp = temp * ze
rolling_footprints[i].set_footprint(key, temp)
if uniform:
for _i in range(nslice, nslice * 2):
# make a copy of the current band
temp = hp_footprints[key] + 0
# Set the non-rolling area to zero
temp[non_wfd_indx] = 0
indx = split_wfd_indices_delayed[_i - nslice]
# invert the indices
ze = temp * 0
ze[indx] = 1
temp = temp * ze
rolling_footprints[_i].set_footprint(key, temp)
result = Footprints([fp_non_wfd] + rolling_footprints)
return result
def _is_in_ra_range(ra, low, high):
_low = low % (2.0 * np.pi)
_high = high % (2.0 * np.pi)
if _low <= _high:
return (ra >= _low) & (ra <= _high)
else:
return (ra >= _low) | (ra <= _high)
[docs]
def slice_quad_galactic_cut(target_map, nslice=2, wfd_indx=None, ra_range=None):
"""
Helper function for generating rolling footprints
Parameters
----------
target_map : dict of HEALpix maps
The final desired footprint as HEALpix maps. Keys are band names
nslice : `int`
The number of slices to make, can be 2 or 3.
wfd_indx : array of ints
The indices of target_map that should be used for rolling.
If None, assumes the rolling area should be where target_map['r'] == 1.
ra_range : tuple of floats, optional
If not None, then the indices are restricted to the given RA range
in radians.
"""
nside = hp.npix2nside(target_map["r"].size)
ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside)))
coord = SkyCoord(ra=ra * u.rad, dec=dec * u.rad)
_, gal_lat = coord.galactic.l.deg, coord.galactic.b.deg
indx_north = np.intersect1d(np.where(gal_lat >= 0)[0], wfd_indx)
indx_south = np.intersect1d(np.where(gal_lat < 0)[0], wfd_indx)
splits_north = slice_wfd_area_quad(target_map, nslice=nslice, wfd_indx=indx_north)
splits_south = slice_wfd_area_quad(target_map, nslice=nslice, wfd_indx=indx_south)
slice_indx = []
for j in np.arange(nslice):
indx_temp = []
for i in np.arange(j + 1, nslice * 2 + 1, nslice):
indx_temp += indx_north[splits_north[i - 1] : splits_north[i]].tolist()
indx_temp += indx_south[splits_south[i - 1] : splits_south[i]].tolist()
slice_indx.append(indx_temp)
if ra_range is not None:
ra_indx = np.where(_is_in_ra_range(ra, *ra_range))[0]
for j in range(nslice):
slice_indx[j] = np.intersect1d(ra_indx, slice_indx[j])
return slice_indx
[docs]
def slice_wfd_area_quad(target_map, nslice=2, wfd_indx=None):
"""
Divide a healpix map in an intelligent way
Parameters
----------
target_map : dict of HEALpix arrays
The input map to slice
nslice : int
The number of slices to divide the sky into (gets doubled).
wfd_indx : array of int
The indices of the healpix map to consider as part of the WFD area
that will be split.
If set to None, the pixels where target_map['r'] == 1 are
considered as WFD.
"""
nslice2 = nslice * 2
wfd = target_map["r"] * 0
if wfd_indx is None:
wfd_indices = np.where(target_map["r"] == 1)[0]
else:
wfd_indices = wfd_indx
wfd[wfd_indices] = 1
wfd_accum = np.cumsum(wfd)
split_wfd_indices = np.floor(np.max(wfd_accum) / nslice2 * (np.arange(nslice2) + 1)).astype(int)
split_wfd_indices = split_wfd_indices.tolist()
split_wfd_indices = [0] + split_wfd_indices
return split_wfd_indices
[docs]
def slice_wfd_indx(target_map, nslice=2, wfd_indx=None):
"""
simple map split
"""
wfd = target_map["r"] * 0
if wfd_indx is None:
wfd_indx = np.where(target_map["r"] == 1)[0]
wfd[wfd_indx] = 1
wfd_accum = np.cumsum(wfd)
split_wfd_indices = np.floor(np.max(wfd_accum) / nslice * (np.arange(nslice) + 1)).astype(int)
split_wfd_indices = split_wfd_indices.tolist()
split_wfd_indices = [0] + split_wfd_indices
return split_wfd_indices
[docs]
class BasePixelEvolution:
"""Helper class that can be used to describe the time evolution of a
HEALpix in a footprint.
"""
def __init__(self, period=365.25, rise=1.0, t_start=0.0):
self.period = period
self.rise = rise
self.t_start = t_start
def __call__(self, mjd_in, phase):
pass
[docs]
class StepLine(BasePixelEvolution):
def __call__(self, mjd_in, phase):
t = mjd_in + phase - self.t_start
n_periods = np.floor(t / (self.period))
result = n_periods * self.rise
tphased = t % self.period
step_area = np.where(tphased > self.period / 2.0)[0]
result[step_area] += (tphased[step_area] - self.period / 2) * self.rise / (0.5 * self.period)
result[np.where(t < 0)] = 0
return result
[docs]
class StepSlopes(BasePixelEvolution):
def __call__(self, mjd_in, phase):
steps = np.array(self.rise)
t = mjd_in + phase - self.t_start
season = np.floor(t / (self.period))
season = season.astype(int)
plateus = np.cumsum(steps) - steps[0]
result = plateus[season]
tphased = t % self.period
step_area = np.where(tphased > self.period / 2.0)[0]
result[step_area] += (
(tphased[step_area] - self.period / 2) * steps[season + 1][step_area] / (0.5 * self.period)
)
result[np.where(t < 0)] = 0
return result
[docs]
class Footprint:
"""An object to compute the desired survey footprint at a given time
Parameters
----------
mjd_start : `float`
The MJD the survey starts on.
sun_ra_start : `float`
The RA of the sun at the start of the survey (radians).
bands : `list` of `str`
The band names to include in the footprint.
filters : `list` of `str`
Deprecated version of bands.
period : `float`
Used for setting the phase of step_func (days). Default 365.25.
step_func : `BasePixelEvolution`
Callable class that determines how the footprint evolves with time.
Default of None will result in `StepLine` being used.
"""
def __init__(
self,
mjd_start,
sun_ra_start=0,
nside=DEFAULT_NSIDE,
bands=["u", "g", "r", "i", "z", "y"],
filters=None,
period=365.25,
step_func=None,
):
if filters is not None:
warnings.warn("Use of `filters` will be deprecated in favor of `bands` at v4", FutureWarning)
bands = filters
# Dict to map band name to array index.
if not isinstance(bands, dict):
bands_dict = {}
for i, bandname in enumerate(bands):
bands_dict[bandname] = i
bands = bands_dict
self.period = period
self.nside = nside
if step_func is None:
step_func = StepLine()
self.step_func = step_func
self.mjd_start = mjd_start
self.sun_ra_start = sun_ra_start
self.npix = hp.nside2npix(nside)
self.bands = bands
if filters is not None:
self.filters = filters
self.ra, self.dec = _hpid2_ra_dec(self.nside, np.arange(self.npix))
# Set the phase of each healpixel.
# If RA to sun is zero, we are at phase np.pi/2.
# This is similar to the season calculation, except
# that phase and season are offset by 90 degrees.
self.phase = (-self.ra + self.sun_ra_start + np.pi / 2) % (2.0 * np.pi)
self.phase = self.phase * (self.period / 2.0 / np.pi)
# Empty footprints to start
self.out_dtype = list(zip(bands, [float] * len(bands)))
self.footprints = np.zeros((len(bands), self.npix), dtype=float)
self.estimate = np.zeros((len(bands), self.npix), dtype=float)
self.current_footprints = np.zeros((len(bands), self.npix), dtype=float)
self.zero = self.step_func(0.0, self.phase)
self.mjd_current = None
def set_footprint(self, bandname, values):
self.footprints[self.bands[bandname], :] = values
def get_footprint(self, bandname):
return self.footprints[self.bands[bandname], :]
def _update_mjd(self, mjd, norm=True):
if mjd != self.mjd_current:
self.mjd_current = mjd
t_elapsed = mjd - self.mjd_start
norm_coverage = self.step_func(t_elapsed, self.phase)
norm_coverage -= self.zero
self.current_footprints = self.footprints * norm_coverage
c_sum = np.nansum(self.current_footprints)
if norm:
if c_sum != 0:
self.current_footprints = self.current_footprints / c_sum
[docs]
def arr2struc(self, inarr):
"""Take an array and convert it to labeled struc array"""
result = np.empty(self.npix, dtype=self.out_dtype)
for key in self.bands:
result[key] = inarr[self.bands[key]]
# Argle bargel, why doesn't this view work?
# struc = inarr.view(dtype=self.out_dtype).squeeze()
return result
[docs]
def estimate_counts(self, mjd, nvisits=2.2e6, fov_area=9.6):
"""Estimate the counts we'll get after some time and visits"""
pix_area = hp.nside2pixarea(self.nside, degrees=True)
pix_per_visit = fov_area / pix_area
self._update_mjd(mjd, norm=True)
self.estimate = self.current_footprints * pix_per_visit * nvisits
return self.arr2struc(self.estimate)
[docs]
def __call__(self, mjd, norm=True):
"""
Parameters
----------
mjd : `float`
Current MJD.
norm : `bool`
If normalized, the footprint retains the same range of values
over time.
Returns
-------
current_footprints : `np.ndarray`, (6, N)
A numpy structured array with the updated normalized number of
observations that should be requested at each Healpix.
Multiply by the number of HEALpix observations (all bands), to
convert to the number of observations desired.
"""
self._update_mjd(mjd, norm=norm)
return self.arr2struc(self.current_footprints)
[docs]
class ConstantFootprint(Footprint):
def __init__(self, nside=DEFAULT_NSIDE, bands=["u", "g", "r", "i", "z", "y"], filters=None):
if filters is not None:
warnings.warn("Use of `filters` will be deprecated in favor of `bands` at v4", FutureWarning)
bands = filters
if not isinstance(bands, dict):
bands_dict = {}
for i, bandname in enumerate(bands):
bands_dict[bandname] = i
bands = bands_dict
self.nside = nside
self.bands = bands
if filters is not None:
self.filters = filters
self.npix = hp.nside2npix(nside)
self.footprints = np.zeros((len(bands), self.npix), dtype=float)
self.out_dtype = list(zip(bands, [float] * len(bands)))
self.to_return = self.arr2struc(self.footprints)
def set_footprint(self, bandname, values):
self.footprints[self.bands[bandname], :] = values
self.to_return = self.arr2struc(self.footprints)
def __call__(self, mjd, array=False):
return self.to_return
[docs]
class Footprints(Footprint):
"""An object to combine multiple Footprint objects."""
def __init__(self, footprint_list):
self.footprint_list = footprint_list
self.mjd_current = None
self.current_footprints = 0
# Should probably run a check that all the footprints are compatible
# (same nside, etc)
self.npix = footprint_list[0].npix
self.out_dtype = footprint_list[0].out_dtype
self.bands = footprint_list[0].bands
self.nside = footprint_list[0].nside
self.footprints = np.zeros((len(self.bands), self.npix), dtype=float)
for fp in self.footprint_list:
self.footprints += fp.footprints
def set_footprint(self, bandname, values):
pass
def _update_mjd(self, mjd, norm=True):
if mjd != self.mjd_current:
self.mjd_current = mjd
self.current_footprints = 0.0
for fp in self.footprint_list:
fp._update_mjd(mjd, norm=False)
self.current_footprints += fp.current_footprints
c_sum = np.sum(self.current_footprints)
if norm:
if c_sum != 0:
self.current_footprints = self.current_footprints / c_sum
[docs]
def calc_norm_factor(goal_dict, radius=1.75):
"""Calculate how to normalize a Target_map_basis_function.
This is basically:
the area of the fov / area of a healpixel /
the sum of all of the weighted-healpix values in the footprint.
Parameters
-----------
goal_dict : dict of healpy maps
The target goal map(s) being used
radius : float (1.75)
Radius of the FoV (degrees)
Returns
-------
Value to use as Target_map_basis_function norm_factor kwarg
"""
all_maps_sum = 0
for key in goal_dict:
good = np.where(goal_dict[key] > 0)
all_maps_sum += goal_dict[key][good].sum()
nside = hp.npix2nside(goal_dict[key].size)
hp_area = hp.nside2pixarea(nside, degrees=True)
norm_val = radius**2 * np.pi / hp_area / all_maps_sum
return norm_val
[docs]
def calc_norm_factor_array(goal_map, radius=1.75):
"""Calculate how to normalize a Target_map_basis_function.
This is basically:
the area of the fov / area of a healpixel /
the sum of all of the weighted-healpix values in the footprint.
Parameters
-----------
goal_map : recarray of healpy maps
The target goal map(s) being used
radius : float
Radius of the FoV (degrees)
Returns
-------
Value to use as Target_map_basis_function norm_factor kwarg
"""
all_maps_sum = 0
for key in goal_map.dtype.names:
good = np.where(goal_map[key] > 0)
all_maps_sum += goal_map[key][good].sum()
nside = hp.npix2nside(goal_map[key].size)
hp_area = hp.nside2pixarea(nside, degrees=True)
norm_val = radius**2 * np.pi / hp_area / all_maps_sum
return norm_val