__all__ = (
"DitherDetailer",
"EuclidDitherDetailer",
"CameraRotDetailer",
"CameraSmallRotPerObservationListDetailer",
"ComCamGridDitherDetailer",
)
import numpy as np
from rubin_scheduler.scheduler.detailers import BaseDetailer
from rubin_scheduler.scheduler.utils import wrap_ra_dec
from rubin_scheduler.utils import (
_approx_altaz2pa,
_approx_ra_dec2_alt_az,
bearing,
ddf_locations,
dest_latlon,
gnomonic_project_tosky,
rotation_converter,
)
[docs]
class DitherDetailer(BaseDetailer):
"""
make a uniform dither pattern. Offset by a maximum radius in a random
direction. Mostly intended for DDF pointings, the BaseMarkovDF_survey
class includes dithering for large areas.
Parameters
----------
max_dither : `float` (0.7)
The maximum dither size to use (degrees).
per_night : `bool` (True)
If true, us the same dither offset for an entire night
nnights : `int` (7305)
The number of nights to pre-generate random dithers for
"""
def __init__(self, max_dither=0.7, seed=42, per_night=True, nnights=7305):
self.survey_features = {}
self.current_night = -1
self.max_dither = np.radians(max_dither)
self.per_night = per_night
self.rng = np.random.default_rng(seed)
self.angles = self.rng.random(nnights) * 2 * np.pi
self.radii = self.max_dither * np.sqrt(self.rng.random(nnights))
self.offsets = (self.rng.random((nnights, 2)) - 0.5) * 2.0 * self.max_dither
self.offset = None
def _generate_offsets(self, n_offsets, night):
if self.per_night:
if night != self.current_night:
self.current_night = night
self.offset = self.offsets[night, :]
angle = self.angles[night]
radius = self.radii[night]
self.offset = np.array([radius * np.cos(angle), radius * np.sin(angle)])
offsets = np.tile(self.offset, (n_offsets, 1))
else:
angle = self.rng.random(n_offsets) * 2 * np.pi
radius = self.max_dither * np.sqrt(self.rng.random(n_offsets))
offsets = np.array([radius * np.cos(angle), radius * np.sin(angle)]).T
return offsets
def __call__(self, obs_array, conditions):
if len(obs_array) == 0:
return obs_array
# Generate offsets in RA and Dec
offsets = self._generate_offsets(len(obs_array), conditions.night)
new_ra, new_dec = gnomonic_project_tosky(
offsets[:, 0], offsets[:, 1], obs_array["RA"], obs_array["dec"]
)
new_ra, new_dec = wrap_ra_dec(new_ra, new_dec)
obs_array["RA"] = new_ra
obs_array["dec"] = new_dec
return obs_array
[docs]
class EuclidDitherDetailer(BaseDetailer):
"""Directional dithering for Euclid DDFs
Parameters
----------
dither_bearing_dir : `list`
A list with the dither amplitude in along the axis
connecting the two positions. Default [-0.25, 1] in degrees.
dither_bearing_perp : `list`
A list with the dither amplitude perpendicular to the
axis connecting the two positions. Default [-0.25, 1]
in degrees.
seed : `float`
Random number seed to use (42).
per_night : `bool`
If dither shifts should be per night (default True),
or if dithers should be every pointing (False).
ra_a, dec_a, ra_b, dec_b : `float`
Positions for the two field centers. Default None
will load the positions from
rubin_scheduler.utils.ddf_locations
nnights : `int`
Number of nights to generate dither positions for.
Default 7305 (20 years).
"""
def __init__(
self,
dither_bearing_dir=[-0.25, 1],
dither_bearing_perp=[-0.25, 0.25],
seed=42,
per_night=True,
ra_a=None,
dec_a=None,
ra_b=None,
dec_b=None,
nnights=7305,
):
self.survey_features = {}
default_locations = ddf_locations()
if ra_a is None:
self.ra_a = np.radians(default_locations["EDFS_a"][0])
else:
self.ra_a = np.radians(ra_a)
if ra_b is None:
self.ra_b = np.radians(default_locations["EDFS_b"][0])
else:
self.ra_b = np.radians(ra_b)
if dec_a is None:
self.dec_a = np.radians(default_locations["EDFS_a"][1])
else:
self.dec_a = np.radians(dec_a)
if dec_b is None:
self.dec_b = np.radians(default_locations["EDFS_b"][1])
else:
self.dec_b = np.radians(dec_b)
self.dither_bearing_dir = np.radians(dither_bearing_dir)
self.dither_bearing_perp = np.radians(dither_bearing_perp)
self.bearing_atob = bearing(self.ra_a, self.dec_a, self.ra_b, self.dec_b)
self.bearing_btoa = bearing(self.ra_b, self.dec_b, self.ra_a, self.dec_a)
self.current_night = -1
self.per_night = per_night
self.shifted_ra_a = None
self.shifted_dec_a = None
self.shifted_ra_b = None
self.shifted_dec_b = None
rng = np.random.default_rng(seed)
self.bearings_mag_1 = rng.uniform(
low=self.dither_bearing_dir.min(),
high=self.dither_bearing_dir.max(),
size=nnights,
)
self.perp_mag_1 = rng.uniform(
low=self.dither_bearing_perp.min(),
high=self.dither_bearing_perp.max(),
size=nnights,
)
self.bearings_mag_2 = rng.uniform(
low=self.dither_bearing_dir.min(),
high=self.dither_bearing_dir.max(),
size=nnights,
)
self.perp_mag_2 = rng.uniform(
low=self.dither_bearing_perp.min(),
high=self.dither_bearing_perp.max(),
size=nnights,
)
def _generate_offsets(self, n_offsets, night):
if self.per_night:
if night != self.current_night:
self.current_night = night
bearing_mag = self.bearings_mag_1[night]
perp_mag = self.perp_mag_1[night]
# Move point a along the bearings
self.shifted_dec_a, self.shifted_ra_a = dest_latlon(
bearing_mag, self.bearing_atob, self.dec_a, self.ra_a
)
self.shifted_dec_a, self.shifted_ra_a = dest_latlon(
perp_mag,
self.bearing_atob + np.pi / 2.0,
self.shifted_dec_a,
self.shifted_ra_a,
)
# Shift the second position
bearing_mag = self.bearings_mag_2[night]
perp_mag = self.perp_mag_2[night]
self.shifted_dec_b, self.shifted_ra_b = dest_latlon(
bearing_mag, self.bearing_btoa, self.dec_b, self.ra_b
)
self.shifted_dec_b, self.shifted_ra_b = dest_latlon(
perp_mag,
self.bearing_btoa + np.pi / 2.0,
self.shifted_dec_b,
self.shifted_ra_b,
)
else:
ValueError("not implamented")
return (
self.shifted_ra_a,
self.shifted_dec_a,
self.shifted_ra_b,
self.shifted_dec_b,
)
def __call__(self, obs_array, conditions):
# Generate offsets in RA and Dec
ra_a, dec_a, ra_b, dec_b = self._generate_offsets(len(obs_array), conditions.night)
for i, obs in enumerate(obs_array):
if obs["scheduler_note"][-1] == "a":
obs_array[i]["RA"] = ra_a
obs_array[i]["dec"] = dec_a
elif obs["scheduler_note"][-1] == "b":
obs_array[i]["RA"] = ra_b
obs_array[i]["dec"] = dec_b
else:
ValueError("observation note does not end in a or b.")
return obs_array
[docs]
class CameraRotDetailer(BaseDetailer):
"""
Randomly set the camera rotation, either for each exposure, or per night.
Parameters
----------
max_rot : `float` (90.)
The maximum amount to offset the camera (degrees)
min_rot : `float` (90)
The minimum to offset the camera (degrees)
per_night : `bool` (True)
If True, only set a new offset per night. If False, randomly
rotates every observation.
telescope : `str`
Telescope name. Options of "rubin" or "auxtel". Default "rubin".
"""
def __init__(self, max_rot=90.0, min_rot=-90.0, per_night=True, seed=42, nnights=7305, telescope="rubin"):
self.survey_features = {}
self.current_night = -1
self.max_rot = np.radians(max_rot)
self.min_rot = np.radians(min_rot)
self.range = self.max_rot - self.min_rot
self.per_night = per_night
self.rng = np.random.default_rng(seed)
self.offsets = self.rng.random(nnights)
self.offset = None
self.rc = rotation_converter(telescope=telescope)
def _generate_offsets(self, n_offsets, night):
if self.per_night:
if night != self.current_night:
self.current_night = night
self.offset = self.offsets[night] * self.range + self.min_rot
offsets = np.ones(n_offsets) * self.offset
else:
self.rng = np.random.default_rng()
offsets = self.rng.random(n_offsets) * self.range + self.min_rot
return offsets
def __call__(self, observation_array, conditions):
# Generate offsets in camamera rotator
offsets = self._generate_offsets(len(observation_array), conditions.night)
alt, az = _approx_ra_dec2_alt_az(
observation_array["RA"],
observation_array["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad)
observation_array["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets, obs_pa)
observation_array["rotTelPos"] = offsets
return observation_array
[docs]
class CameraSmallRotPerObservationListDetailer(BaseDetailer):
"""
Randomly set the camera rotation for each observation list.
Generates a small sequential offset for sequential visits
in the same filter; adds a random offset for each filter change.
Parameters
----------
max_rot : `float`, optional
The maximum amount to offset the camera (degrees).
Default of 85 allows some padding for camera rotator.
min_rot : `float`, optional
The minimum to offset the camera (degrees)
Default of -85 allows some padding for camera rotator.
seed : `int`, optional
Seed for random number generation (per night).
per_visit_rot : `float`, optional
Sequential rotation to add per visit.
telescope : `str`, optional
Telescope name. Options of "rubin" or "auxtel". Default "rubin".
This is used to determine conversions between rotSkyPos and rotTelPos.
"""
def __init__(self, max_rot=85.0, min_rot=-85.0, seed=42, per_visit_rot=0.0, telescope="rubin"):
self.survey_features = {}
self.current_night = -1
self.max_rot = np.radians(max_rot)
self.min_rot = np.radians(min_rot)
self.rot_range = self.max_rot - self.min_rot
self.seed = seed
self.per_visit_rot = np.radians(per_visit_rot)
self.offset = None
self.rc = rotation_converter(telescope=telescope)
def _generate_offsets_filter_change(self, filter_list, mjd, initial_offset):
"""Generate a random camera rotation for each filter change
or add a small offset for each sequential observation.
"""
mjd_hash = round(100 * (np.asarray(mjd).item() % 100))
rng = np.random.default_rng(mjd_hash * self.seed)
offsets = np.zeros(len(filter_list))
# Find the locations of the filter changes
filter_changes = np.where(np.array(filter_list[:-1]) != np.array(filter_list[1:]))[0]
filter_changes = np.concatenate([np.array([-1]), filter_changes])
# But add one because of counting and offsets above.
filter_changes += 1
# Count visits per filter in the sequence.
nvis_per_filter = np.concatenate(
[np.diff(filter_changes), np.array([len(filter_list) - 1 - filter_changes[-1]])]
)
# Set up the random rotator offsets for each filter change
# This includes first rotation .. maybe not needed?
for fchange_idx, nvis_f in zip(filter_changes, nvis_per_filter):
rot_range = self.rot_range - self.per_visit_rot * nvis_f
# At the filter change spot, update to random offset
offsets[fchange_idx:] = rng.random() * rot_range + self.min_rot
# After the filter change point, add incremental rotation
# (we'll wipe this when we get to next fchange_idx)
offsets[fchange_idx:] += self.per_visit_rot * np.arange(len(filter_list) - fchange_idx)
offsets = np.where(offsets > self.max_rot, self.max_rot, offsets)
return offsets
def __call__(self, observation_list, conditions):
# Generate offsets in camera rotator
filter_list = [np.asarray(obs["filter"]).item() for obs in observation_list]
offsets = self._generate_offsets_filter_change(filter_list, conditions.mjd, conditions.rot_tel_pos)
for i, obs in enumerate(observation_list):
alt, az = _approx_ra_dec2_alt_az(
obs["RA"],
obs["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
)
obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad)
obs["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets[i], obs_pa)
obs["rotTelPos"] = offsets[i]
return observation_list
[docs]
class ComCamGridDitherDetailer(BaseDetailer):
"""
Generate an offset pattern to synthesize a 2x2 grid of ComCam pointings.
Parameters
----------
rotTelPosDesired : `float`, (0.)
The physical rotation angle of the camera rotator (degrees)
scale : `float` (0.355)
Half of the offset between grid pointing centers. (degrees)
dither : `float` (0.05)
Dither offsets within grid to fill chip gaps. (degrees)
telescope : `str`, ("comcam")
Telescope name. Default "comcam".
This is used to determine conversions between rotSkyPos and rotTelPos.
"""
def __init__(self, rotTelPosDesired=0.0, scale=0.355, dither=0.05, telescope="comcam"):
self.survey_features = {}
self.rotTelPosDesired = np.radians(rotTelPosDesired)
self.scale = np.radians(scale)
self.dither = np.radians(dither)
self.rc = rotation_converter(telescope=telescope)
def _rotate(self, x, y, angle):
x_rot = x * np.cos(angle) - y * np.sin(angle)
y_rot = x * np.sin(angle) + y * np.cos(angle)
return x_rot, y_rot
def _generate_offsets(self, n_offsets, filter_list, rotSkyPos):
# 2 x 2 pointing grid
x_grid = np.array([-1.0 * self.scale, -1.0 * self.scale, self.scale, self.scale])
y_grid = np.array([-1.0 * self.scale, self.scale, self.scale, -1.0 * self.scale])
x_grid_rot, y_grid_rot = self._rotate(x_grid, y_grid, -1.0 * rotSkyPos)
offsets_grid_rot = np.array([x_grid_rot, y_grid_rot]).T
# Dither pattern within grid to fill chip gaps
# Psuedo-random offsets
x_dither = np.array(
[
0.0,
-0.5 * self.dither,
-1.25 * self.dither,
1.5 * self.dither,
0.75 * self.dither,
]
)
y_dither = np.array(
[
0.0,
-0.75 * self.dither,
1.5 * self.dither,
1.25 * self.dither,
-0.5 * self.dither,
]
)
x_dither_rot, y_dither_rot = self._rotate(x_dither, y_dither, -1.0 * rotSkyPos)
offsets_dither_rot = np.array([x_dither_rot, y_dither_rot]).T
# Find the indices of the filter changes
filter_changes = np.where(np.array(filter_list[:-1]) != np.array(filter_list[1:]))[0]
filter_changes = np.concatenate([np.array([-1]), filter_changes])
filter_changes += 1
offsets = []
index_filter = 0
for ii in range(0, n_offsets):
if ii in filter_changes:
# Reset the count after each filter change
index_filter = 0
index_grid = index_filter % 4
index_dither = np.floor(index_filter / 4).astype(int) % 5
offsets.append(offsets_grid_rot[index_grid] + offsets_dither_rot[index_dither])
index_filter += 1
return np.vstack(offsets)
def __call__(self, observation_list, conditions):
if len(observation_list) == 0:
return observation_list
filter_list = [np.asarray(obs["filter"]).item() for obs in observation_list]
# Initial estimate of rotSkyPos corresponding to desired rotTelPos
alt, az, pa = _approx_ra_dec2_alt_az(
observation_list[0]["RA"],
observation_list[0]["dec"],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
return_pa=True,
)
rotSkyPos = self.rc._rottelpos2rotskypos(self.rotTelPosDesired, pa)
# Generate offsets in RA and Dec
offsets = self._generate_offsets(len(observation_list), filter_list, rotSkyPos)
# Project offsets onto sky
obs_array = np.concatenate(observation_list)
new_ra, new_dec = gnomonic_project_tosky(
offsets[:, 0], offsets[:, 1], obs_array["RA"], obs_array["dec"]
)
new_ra, new_dec = wrap_ra_dec(new_ra, new_dec)
# Update observations
for ii in range(0, len(observation_list)):
observation_list[ii]["RA"] = new_ra[ii]
observation_list[ii]["dec"] = new_dec[ii]
alt, az, pa = _approx_ra_dec2_alt_az(
new_ra[ii],
new_dec[ii],
conditions.site.latitude_rad,
conditions.site.longitude_rad,
conditions.mjd,
return_pa=True,
)
observation_list[ii]["rotSkyPos"] = rotSkyPos
observation_list[ii]["rotTelPos"] = self.rc._rotskypos2rottelpos(rotSkyPos, pa)
return observation_list