Source code for rubin_scheduler.scheduler.detailers.dither_detailer

__all__ = (
    "DitherDetailer",
    "EuclidDitherDetailer",
    "CameraRotDetailer",
    "CameraSmallRotPerObservationListDetailer",
)

import numpy as np

from rubin_scheduler.scheduler.detailers import BaseDetailer
from rubin_scheduler.scheduler.utils import wrap_ra_dec
from rubin_scheduler.utils import (
    _approx_altaz2pa,
    _approx_ra_dec2_alt_az,
    bearing,
    ddf_locations,
    dest_latlon,
    gnomonic_project_tosky,
    rotation_converter,
)


[docs] class DitherDetailer(BaseDetailer): """ make a uniform dither pattern. Offset by a maximum radius in a random direction. Mostly intended for DDF pointings, the BaseMarkovDF_survey class includes dithering for large areas. Parameters ---------- max_dither : `float` (0.7) The maximum dither size to use (degrees). per_night : `bool` (True) If true, us the same dither offset for an entire night nnights : `int` (7305) The number of nights to pre-generate random dithers for """ def __init__(self, max_dither=0.7, seed=42, per_night=True, nnights=7305): self.survey_features = {} self.current_night = -1 self.max_dither = np.radians(max_dither) self.per_night = per_night self.rng = np.random.default_rng(seed) self.angles = self.rng.random(nnights) * 2 * np.pi self.radii = self.max_dither * np.sqrt(self.rng.random(nnights)) self.offsets = (self.rng.random((nnights, 2)) - 0.5) * 2.0 * self.max_dither self.offset = None def _generate_offsets(self, n_offsets, night): if self.per_night: if night != self.current_night: self.current_night = night self.offset = self.offsets[night, :] angle = self.angles[night] radius = self.radii[night] self.offset = np.array([radius * np.cos(angle), radius * np.sin(angle)]) offsets = np.tile(self.offset, (n_offsets, 1)) else: angle = self.rng.random(n_offsets) * 2 * np.pi radius = self.max_dither * np.sqrt(self.rng.random(n_offsets)) offsets = np.array([radius * np.cos(angle), radius * np.sin(angle)]).T return offsets def __call__(self, observation_list, conditions): if len(observation_list) == 0: return observation_list # Generate offsets in RA and Dec offsets = self._generate_offsets(len(observation_list), conditions.night) obs_array = np.concatenate(observation_list) new_ra, new_dec = gnomonic_project_tosky( offsets[:, 0], offsets[:, 1], obs_array["RA"], obs_array["dec"] ) new_ra, new_dec = wrap_ra_dec(new_ra, new_dec) for i, obs in enumerate(observation_list): observation_list[i]["RA"] = new_ra[i] observation_list[i]["dec"] = new_dec[i] return observation_list
[docs] class EuclidDitherDetailer(BaseDetailer): """Directional dithering for Euclid DDFs Parameters ---------- dither_bearing_dir : `list` A list with the dither amplitude in along the axis connecting the two positions. Default [-0.25, 1] in degrees. dither_bearing_perp : `list` A list with the dither amplitude perpendicular to the axis connecting the two positions. Default [-0.25, 1] in degrees. seed : `float` Random number seed to use (42). per_night : `bool` If dither shifts should be per night (default True), or if dithers should be every pointing (False). ra_a, dec_a, ra_b, dec_b : `float` Positions for the two field centers. Default None will load the positions from rubin_scheduler.utils.ddf_locations nnights : `int` Number of nights to generate dither positions for. Default 7305 (20 years). """ def __init__( self, dither_bearing_dir=[-0.25, 1], dither_bearing_perp=[-0.25, 0.25], seed=42, per_night=True, ra_a=None, dec_a=None, ra_b=None, dec_b=None, nnights=7305, ): self.survey_features = {} default_locations = ddf_locations() if ra_a is None: self.ra_a = np.radians(default_locations["EDFS_a"][0]) else: self.ra_a = np.radians(ra_a) if ra_b is None: self.ra_b = np.radians(default_locations["EDFS_b"][0]) else: self.ra_b = np.radians(ra_b) if dec_a is None: self.dec_a = np.radians(default_locations["EDFS_a"][1]) else: self.dec_a = np.radians(dec_a) if dec_b is None: self.dec_b = np.radians(default_locations["EDFS_b"][1]) else: self.dec_b = np.radians(dec_b) self.dither_bearing_dir = np.radians(dither_bearing_dir) self.dither_bearing_perp = np.radians(dither_bearing_perp) self.bearing_atob = bearing(self.ra_a, self.dec_a, self.ra_b, self.dec_b) self.bearing_btoa = bearing(self.ra_b, self.dec_b, self.ra_a, self.dec_a) self.current_night = -1 self.per_night = per_night self.shifted_ra_a = None self.shifted_dec_a = None self.shifted_ra_b = None self.shifted_dec_b = None rng = np.random.default_rng(seed) self.bearings_mag_1 = rng.uniform( low=self.dither_bearing_dir.min(), high=self.dither_bearing_dir.max(), size=nnights, ) self.perp_mag_1 = rng.uniform( low=self.dither_bearing_perp.min(), high=self.dither_bearing_perp.max(), size=nnights, ) self.bearings_mag_2 = rng.uniform( low=self.dither_bearing_dir.min(), high=self.dither_bearing_dir.max(), size=nnights, ) self.perp_mag_2 = rng.uniform( low=self.dither_bearing_perp.min(), high=self.dither_bearing_perp.max(), size=nnights, ) def _generate_offsets(self, n_offsets, night): if self.per_night: if night != self.current_night: self.current_night = night bearing_mag = self.bearings_mag_1[night] perp_mag = self.perp_mag_1[night] # Move point a along the bearings self.shifted_dec_a, self.shifted_ra_a = dest_latlon( bearing_mag, self.bearing_atob, self.dec_a, self.ra_a ) self.shifted_dec_a, self.shifted_ra_a = dest_latlon( perp_mag, self.bearing_atob + np.pi / 2.0, self.shifted_dec_a, self.shifted_ra_a, ) # Shift the second position bearing_mag = self.bearings_mag_2[night] perp_mag = self.perp_mag_2[night] self.shifted_dec_b, self.shifted_ra_b = dest_latlon( bearing_mag, self.bearing_btoa, self.dec_b, self.ra_b ) self.shifted_dec_b, self.shifted_ra_b = dest_latlon( perp_mag, self.bearing_btoa + np.pi / 2.0, self.shifted_dec_b, self.shifted_ra_b, ) else: ValueError("not implamented") return ( self.shifted_ra_a, self.shifted_dec_a, self.shifted_ra_b, self.shifted_dec_b, ) def __call__(self, observation_list, conditions): # Generate offsets in RA and Dec ra_a, dec_a, ra_b, dec_b = self._generate_offsets(len(observation_list), conditions.night) for i, obs in enumerate(observation_list): if obs[0]["scheduler_note"][-1] == "a": observation_list[i]["RA"] = ra_a observation_list[i]["dec"] = dec_a elif obs[0]["scheduler_note"][-1] == "b": observation_list[i]["RA"] = ra_b observation_list[i]["dec"] = dec_b else: ValueError("observation note does not end in a or b.") return observation_list
[docs] class CameraRotDetailer(BaseDetailer): """ Randomly set the camera rotation, either for each exposure, or per night. Parameters ---------- max_rot : `float` (90.) The maximum amount to offset the camera (degrees) min_rot : `float` (90) The minimum to offset the camera (degrees) per_night : `bool` (True) If True, only set a new offset per night. If False, randomly rotates every observation. telescope : `str` Telescope name. Options of "rubin" or "auxtel". Default "rubin". """ def __init__(self, max_rot=90.0, min_rot=-90.0, per_night=True, seed=42, nnights=7305, telescope="rubin"): self.survey_features = {} self.current_night = -1 self.max_rot = np.radians(max_rot) self.min_rot = np.radians(min_rot) self.range = self.max_rot - self.min_rot self.per_night = per_night self.rng = np.random.default_rng(seed) self.offsets = self.rng.random(nnights) self.offset = None self.rc = rotation_converter(telescope=telescope) def _generate_offsets(self, n_offsets, night): if self.per_night: if night != self.current_night: self.current_night = night self.offset = self.offsets[night] * self.range + self.min_rot offsets = np.ones(n_offsets) * self.offset else: self.rng = np.random.default_rng() offsets = self.rng.random(n_offsets) * self.range + self.min_rot return offsets def __call__(self, observation_list, conditions): # Generate offsets in camamera rotator offsets = self._generate_offsets(len(observation_list), conditions.night) for i, obs in enumerate(observation_list): alt, az = _approx_ra_dec2_alt_az( obs["RA"], obs["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) obs["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets[i], obs_pa) obs["rotTelPos"] = offsets[i] return observation_list
[docs] class CameraSmallRotPerObservationListDetailer(BaseDetailer): """ Randomly set the camera rotation for each observation list. Generates a small sequential offset for sequential visits in the same filter; adds a random offset for each filter change. Parameters ---------- max_rot : `float`, optional The maximum amount to offset the camera (degrees). Default of 85 allows some padding for camera rotator. min_rot : `float`, optional The minimum to offset the camera (degrees) Default of -85 allows some padding for camera rotator. seed : `int`, optional Seed for random number generation (per night). per_visit_rot : `float`, optional Sequential rotation to add per visit. telescope : `str`, optional Telescope name. Options of "rubin" or "auxtel". Default "rubin". This is used to determine conversions between rotSkyPos and rotTelPos. """ def __init__(self, max_rot=85.0, min_rot=-85.0, seed=42, per_visit_rot=0.0, telescope="rubin"): self.survey_features = {} self.current_night = -1 self.max_rot = np.radians(max_rot) self.min_rot = np.radians(min_rot) self.rot_range = self.max_rot - self.min_rot self.seed = seed self.per_visit_rot = np.radians(per_visit_rot) self.offset = None self.rc = rotation_converter(telescope=telescope) def _generate_offsets_filter_change(self, filter_list, mjd, initial_offset): """Generate a random camera rotation for each filter change or add a small offset for each sequential observation. """ mjd_hash = round(100 * (np.asarray(mjd).item() % 100)) rng = np.random.default_rng(mjd_hash * self.seed) offsets = np.zeros(len(filter_list)) # Find the locations of the filter changes filter_changes = np.where(np.array(filter_list[:-1]) != np.array(filter_list[1:]))[0] filter_changes = np.concatenate([np.array([-1]), filter_changes]) # But add one because of counting and offsets above. filter_changes += 1 # Count visits per filter in the sequence. nvis_per_filter = np.concatenate( [np.diff(filter_changes), np.array([len(filter_list) - 1 - filter_changes[-1]])] ) # Set up the random rotator offsets for each filter change # This includes first rotation .. maybe not needed? for fchange_idx, nvis_f in zip(filter_changes, nvis_per_filter): rot_range = self.rot_range - self.per_visit_rot * nvis_f # At the filter change spot, update to random offset offsets[fchange_idx:] = rng.random() * rot_range + self.min_rot # After the filter change point, add incremental rotation # (we'll wipe this when we get to next fchange_idx) offsets[fchange_idx:] += self.per_visit_rot * np.arange(len(filter_list) - fchange_idx) offsets = np.where(offsets > self.max_rot, self.max_rot, offsets) return offsets def __call__(self, observation_list, conditions): # Generate offsets in camera rotator filter_list = [np.asarray(obs["filter"]).item() for obs in observation_list] offsets = self._generate_offsets_filter_change(filter_list, conditions.mjd, conditions.rot_tel_pos) for i, obs in enumerate(observation_list): alt, az = _approx_ra_dec2_alt_az( obs["RA"], obs["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) obs["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets[i], obs_pa) obs["rotTelPos"] = offsets[i] return observation_list