Source code for rubin_scheduler.scheduler.detailers.dither_detailer

__all__ = (
    "DitherDetailer",
    "EuclidDitherDetailer",
    "SplitDetailer",
    "CameraRotDetailer",
    "CameraSmallRotPerObservationListDetailer",
    "ComCamGridDitherDetailer",
    "DeltaCoordDitherDetailer",
)

import warnings

import numpy as np

from rubin_scheduler.scheduler.detailers import BaseDetailer
from rubin_scheduler.scheduler.features import Conditions, NObsCount
from rubin_scheduler.scheduler.utils import ObservationArray, rotx, thetaphi2xyz, wrap_ra_dec, xyz2thetaphi
from rubin_scheduler.utils import (
    _approx_altaz2pa,
    _approx_ra_dec2_alt_az,
    bearing,
    ddf_locations,
    dest_latlon,
    gnomonic_project_tosky,
    rotation_converter,
)


[docs] class DitherDetailer(BaseDetailer): """ make a uniform dither pattern. Offset by a maximum radius in a random direction. Mostly intended for DDF pointings, the BaseMarkovDF_survey class includes dithering for large areas. Parameters ---------- max_dither : `float` (0.7) The maximum dither size to use (degrees). per_night : `bool` (True) If true, use the same dither offset for an entire night. If False, generate a dither position per visit. n_in_night : `int` (1200) The number of unique positions to generate in a night. Only used if per_night is False. """ def __init__(self, max_dither=0.7, per_night=True, n_in_night=1200, nnights=None, seed=None): if nnights is not None: warnings.warn("kwarg nnights deprecated", FutureWarning) if seed is not None: warnings.warn("kwarg seed deprecated", FutureWarning) self.survey_features = {"n_in_night": NObsCount(per_night=True)} self.current_night = -np.inf self.max_dither = np.radians(max_dither) self.n_in_night = n_in_night self.per_night = per_night self.offset_ra = None self.offset_dec = None self.current_night = -np.inf def _new_ang_rad(self, night): # Seed with the current night rng = np.random.default_rng(night) if self.per_night: n_gen = 1 else: n_gen = self.n_in_night self.angles = rng.random(n_gen) * 2 * np.pi self.radii = self.max_dither * np.sqrt(rng.random(n_gen)) def _generate_offsets(self, n_offsets, night): if self.per_night: if night != self.current_night: self._new_ang_rad(night) self.current_night = night angle = self.angles radius = self.radii self.offsets_ra = radius * np.cos(angle) self.offsets_dec = radius * np.sin(angle) else: if night != self.current_night: self._new_ang_rad(night) self.current_night = night indx_in_night = self.survey_features["n_in_night"].feature angle = self.angles[indx_in_night : indx_in_night + n_offsets] radius = self.radii[indx_in_night : indx_in_night + n_offsets] self.offsets_ra = radius * np.cos(angle) self.offsets_dec = radius * np.sin(angle) return self.offsets_ra, self.offsets_dec def __call__(self, obs_array, conditions): if len(obs_array) == 0: return obs_array # Generate offsets in RA and Dec offsets_ra, offsets_dec = self._generate_offsets(len(obs_array), conditions.night) new_ra, new_dec = gnomonic_project_tosky(offsets_ra, offsets_dec, obs_array["RA"], obs_array["dec"]) obs_array["RA"] = new_ra obs_array["dec"] = new_dec return obs_array
[docs] class DeltaCoordDitherDetailer(BaseDetailer): """Dither pattern set by user. Each input observation is expanded to have an observation at each dither position. Parameters ---------- delta_ra : `np.array` Angular distances to move on sky in the RA-direction (degrees). delta_dec : `np.array` Angular distances to move on sky in the dec-direction (degree). delta_rotskypos : `np.array` Angular shifts to make in the rotskypos values (degrees). Default None applies no rotational shift """ def __init__(self, delta_ra, delta_dec, delta_rotskypos=None): if delta_rotskypos is not None: if np.size(delta_ra) != np.size(delta_rotskypos): raise ValueError( "size of delta_ra (%i) is not equal to size of delta_rotskypos (%i)" % (np.size(delta_ra), np.size(delta_rotskypos)) ) if np.size(delta_ra) != np.size(delta_dec): raise ValueError( "Sizes of delta_ra (%i) and delta_dec (%i) do not match." % (np.size(delta_ra), np.size(delta_dec)) ) self.survey_features = {} self.delta_ra = np.radians(delta_ra) self.delta_dec = np.radians(delta_dec) if delta_rotskypos is None: self.delta_rotskypos = delta_rotskypos else: self.delta_rotskypos = np.radians(delta_rotskypos) def __call__(self, obs_array, conditions): output_array_list = [] for obs in obs_array: dithered_observations = ObservationArray(self.delta_ra.size) for key in obs.dtype.names: dithered_observations[key] = obs[key] # Generate grid on the equator (so RA offsets are not small) new_decs = self.delta_dec.copy() # Adding 90 deg here for later rotation about x-axis new_ras = self.delta_ra + np.pi / 2 # Rotate ra and dec about the x-axis # pi/2 term to convert dec to phi x, y, z = thetaphi2xyz(new_ras, new_decs + np.pi / 2.0) # rotate so offsets are now centered on proper dec xp, yp, zp = rotx(obs["dec"], x, y, z) theta, phi = xyz2thetaphi(xp, yp, zp) new_decs = phi - np.pi / 2 # Remove 90 degree rot from earlier and center on proper RA new_ras = theta - np.pi / 2 + obs["RA"] # Make sure coords are in proper range new_ras, new_decs = wrap_ra_dec(new_ras, new_decs) dithered_observations["RA"] = new_ras dithered_observations["dec"] = new_decs if self.delta_rotskypos is not None: dithered_observations["rotSkyPos_desired"] += self.delta_rotskypos output_array_list.append(dithered_observations) result = np.concatenate(output_array_list) return result
[docs] class EuclidDitherDetailer(BaseDetailer): """Directional dithering for Euclid DDFs Parameters ---------- dither_bearing_dir : `list` A list with the dither amplitude in along the axis connecting the two positions. Default [-0.25, 1] in degrees. dither_bearing_perp : `list` A list with the dither amplitude perpendicular to the axis connecting the two positions. Default [-0.25, 1] in degrees. seed : `float` Random number seed to use (42). per_night : `bool` If dither shifts should be per night (default True), or if dithers should be every pointing (False). ra_a, dec_a, ra_b, dec_b : `float` Positions for the two field centers. Default None will load the positions from rubin_scheduler.utils.ddf_locations n_in_night : `int` Number of positions to generate dither positions for within a night. """ def __init__( self, dither_bearing_dir=[-0.25, 1], dither_bearing_perp=[-0.25, 0.25], per_night=True, ra_a=None, dec_a=None, ra_b=None, dec_b=None, n_in_night=1200, nnights=None, seed=None, ): if nnights is not None: warnings.warn("kwarg nnights deprecated", FutureWarning) if seed is not None: warnings.warn("kwarg seed deprecated", FutureWarning) self.n_in_night = n_in_night self.survey_features = {"n_in_night": NObsCount(per_night=True)} default_locations = ddf_locations() if ra_a is None: self.ra_a = np.radians(default_locations["EDFS_a"][0]) else: self.ra_a = np.radians(ra_a) if ra_b is None: self.ra_b = np.radians(default_locations["EDFS_b"][0]) else: self.ra_b = np.radians(ra_b) if dec_a is None: self.dec_a = np.radians(default_locations["EDFS_a"][1]) else: self.dec_a = np.radians(dec_a) if dec_b is None: self.dec_b = np.radians(default_locations["EDFS_b"][1]) else: self.dec_b = np.radians(dec_b) self.dither_bearing_dir = np.radians(dither_bearing_dir) self.dither_bearing_perp = np.radians(dither_bearing_perp) self.bearing_atob = bearing(self.ra_a, self.dec_a, self.ra_b, self.dec_b) self.bearing_btoa = bearing(self.ra_b, self.dec_b, self.ra_a, self.dec_a) self.current_night = -np.inf self.per_night = per_night self.shifted_ra_a = None self.shifted_dec_a = None self.shifted_ra_b = None self.shifted_dec_b = None def _new_ang_rad(self, night): # seed with the current night rng = np.random.default_rng(night) if self.per_night: n_gen = 1 else: n_gen = self.n_in_night self.bearings_mag_1 = rng.uniform( low=self.dither_bearing_dir.min(), high=self.dither_bearing_dir.max(), size=n_gen, ) self.perp_mag_1 = rng.uniform( low=self.dither_bearing_perp.min(), high=self.dither_bearing_perp.max(), size=n_gen, ) self.bearings_mag_2 = rng.uniform( low=self.dither_bearing_dir.min(), high=self.dither_bearing_dir.max(), size=n_gen, ) self.perp_mag_2 = rng.uniform( low=self.dither_bearing_perp.min(), high=self.dither_bearing_perp.max(), size=n_gen, ) def _generate_offsets(self, n_offsets, night): indx_in_night = self.survey_features["n_in_night"].feature if night != self.current_night: self.indx_in_night = 0 self._new_ang_rad(night) self.current_night = night if self.per_night: bearing_mag = self.bearings_mag_1 perp_mag = self.perp_mag_1 else: bearing_mag = self.bearings_mag_1[indx_in_night : indx_in_night + n_offsets] perp_mag = self.perp_mag_1[indx_in_night : indx_in_night + n_offsets] # Move point a along the bearings self.shifted_dec_a, self.shifted_ra_a = dest_latlon( bearing_mag, self.bearing_atob, self.dec_a, self.ra_a ) self.shifted_dec_a, self.shifted_ra_a = dest_latlon( perp_mag, self.bearing_atob + np.pi / 2.0, self.shifted_dec_a, self.shifted_ra_a, ) # Shift the second position if self.per_night: bearing_mag = self.bearings_mag_2 perp_mag = self.perp_mag_2 else: bearing_mag = self.bearings_mag_2[indx_in_night : indx_in_night + n_offsets] perp_mag = self.perp_mag_2[indx_in_night : indx_in_night + n_offsets] self.shifted_dec_b, self.shifted_ra_b = dest_latlon( bearing_mag, self.bearing_btoa, self.dec_b, self.ra_b ) self.shifted_dec_b, self.shifted_ra_b = dest_latlon( perp_mag, self.bearing_btoa + np.pi / 2.0, self.shifted_dec_b, self.shifted_ra_b, ) return ( self.shifted_ra_a, self.shifted_dec_a, self.shifted_ra_b, self.shifted_dec_b, ) def __call__(self, obs_array, conditions): # Generate offsets in RA and Dec ra_a, dec_a, ra_b, dec_b = self._generate_offsets(len(obs_array), conditions.night) for i, obs in enumerate(obs_array): if "DD:EDFS_a" in obs["scheduler_note"][0:9]: obs_array[i]["RA"] = ra_a obs_array[i]["dec"] = dec_a elif "DD:EDFS_b" in obs["scheduler_note"][0:9]: obs_array[i]["RA"] = ra_b obs_array[i]["dec"] = dec_b else: raise ValueError("scheduler_note does not contain EDFS_a or EDFS_b.") return obs_array
[docs] class SplitDetailer(BaseDetailer): """Combine two detailers, but choose which one of them to use, based on the presence of a specified string in the scheduler_note. Useful to identify different kinds of observations in a ScriptedSurvey and then apply different detailers (such as EuclidDetailer vs. standard DDF dither detailer). Parameters ---------- det1 : `detailers.BaseDetailer` The first detailer, to use if the `split_str` is not present in scheduler_note. det2 : `detailers.BaseDetailer` The second detailer, to use if `split_str` is present. split_str : `str` Search for this string in scheduler_note (matches sub-string). """ def __init__( self, det1: BaseDetailer, det2: BaseDetailer, split_str: str = "EDFS", ): self.det1 = det1 self.det2 = det2 self.split_str = split_str def __call__(self, observation_array: ObservationArray, conditions: Conditions) -> ObservationArray: string_in = [self.split_str in note for note in observation_array["scheduler_note"]] string_out = np.logical_not(string_in) observation_array[string_out] = self.det1(observation_array[string_out], conditions) observation_array[string_in] = self.det2(observation_array[string_in], conditions) return observation_array
[docs] class CameraRotDetailer(BaseDetailer): """ Randomly set the camera rotation, either for each exposure, for each time the detailer is called, or per night. Note dithering for each exposure may not be repeatable if scheduler is restarted. Parameters ---------- max_rot : `float` (90.) The maximum amount to offset the camera (degrees) min_rot : `float` (90) The minimum to offset the camera (degrees) dither : `str` If "night", change positions per night. If call, change per call. If "all", randomize per visit. Default "night". telescope : `str` Telescope name. Options of "rubin" or "auxtel". Default "rubin". n_in_night : `int` Number of dither positions to generate. Should be greater than the maximum number of visits expected in a night (if dither set to "night"). Default 2000. """ def __init__( self, max_rot=90.0, min_rot=-90.0, dither="night", per_night=None, seed=42, nnights=None, telescope="rubin", n_in_night=2000, ): self.survey_features = {} if nnights is not None: warnings.warn("nnights deprecated", FutureWarning) if per_night is True: warnings.warn("per_night deprecated, setting dither='night'", FutureWarning) dither = "night" if per_night is False: warnings.warn("per_night deprecated, setting dither='all'", FutureWarning) dither = "all" if dither is True: warnings.warn("dither=True deprecated, swapping to dither='night'", FutureWarning) dither = "night" if dither is False: warnings.warn("dither=False deprecated, swapping to dither='all'", FutureWarning) dither = "all" self.current_night = -np.inf self.max_rot = np.radians(max_rot) self.min_rot = np.radians(min_rot) self.range = self.max_rot - self.min_rot self.dither = dither self.rng = np.random.default_rng(seed) self.n_in_night = n_in_night if dither == "call": self.offsets = None else: self.offsets = self.rng.random(n_in_night) self.offset = None self.rc = rotation_converter(telescope=telescope) self.call_num = 0 def _generate_offsets(self, n_offsets, night): if self.dither == "night": if night != self.current_night: self.current_night = night self.offset = self.offsets[night] * self.range + self.min_rot offsets = np.ones(n_offsets) * self.offset elif self.dither == "call": if night != self.current_night: self.current_night = night self.call_num = 0 self.rng = np.random.default_rng(night) self.offsets = self.rng.random(self.n_in_night) offsets = self.offsets[self.call_num] * self.range + self.min_rot self.call_num += 1 offsets = np.ones(n_offsets) * offsets elif self.dither == "all": self.rng = np.random.default_rng() offsets = self.rng.random(n_offsets) * self.range + self.min_rot else: raise ValueError("dither kwarg must be set to 'night', 'call', or 'all'.") return offsets def __call__(self, observation_array, conditions): # Generate offsets in camamera rotator offsets = self._generate_offsets(len(observation_array), conditions.night) alt, az = _approx_ra_dec2_alt_az( observation_array["RA"], observation_array["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) observation_array["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets, obs_pa) observation_array["rotTelPos"] = offsets return observation_array
[docs] class CameraSmallRotPerObservationListDetailer(BaseDetailer): """ Randomly set the camera rotation for each observation list. Generates a small sequential offset for sequential visits in the same band; adds a random offset for each band change. Parameters ---------- max_rot : `float`, optional The maximum amount to offset the camera (degrees). Default of 85 allows some padding for camera rotator. min_rot : `float`, optional The minimum to offset the camera (degrees) Default of -85 allows some padding for camera rotator. seed : `int`, optional Seed for random number generation (per night). per_visit_rot : `float`, optional Sequential rotation to add per visit. telescope : `str`, optional Telescope name. Options of "rubin" or "auxtel". Default "rubin". This is used to determine conversions between rotSkyPos and rotTelPos. """ def __init__(self, max_rot=85.0, min_rot=-85.0, seed=42, per_visit_rot=0.0, telescope="rubin"): self.survey_features = {} self.current_night = -1 self.max_rot = np.radians(max_rot) self.min_rot = np.radians(min_rot) self.rot_range = self.max_rot - self.min_rot self.seed = seed self.per_visit_rot = np.radians(per_visit_rot) self.offset = None self.rc = rotation_converter(telescope=telescope) def _generate_offsets_band_change(self, band_list, mjd, initial_offset): """Generate a random camera rotation for each band change or add a small offset for each sequential observation. """ mjd_hash = round(100 * (np.asarray(mjd).item() % 100)) rng = np.random.default_rng(mjd_hash * self.seed) offsets = np.zeros(len(band_list)) # Find the locations of the band changes band_changes = np.where(np.array(band_list[:-1]) != np.array(band_list[1:]))[0] band_changes = np.concatenate([np.array([-1]), band_changes]) # But add one because of counting and offsets above. band_changes += 1 # Count visits per band in the sequence. nvis_per_band = np.concatenate( [np.diff(band_changes), np.array([len(band_list) - 1 - band_changes[-1]])] ) # Set up the random rotator offsets for each band change # This includes first rotation .. maybe not needed? for fchange_idx, nvis_f in zip(band_changes, nvis_per_band): rot_range = self.rot_range - self.per_visit_rot * nvis_f # At the band change spot, update to random offset offsets[fchange_idx:] = rng.random() * rot_range + self.min_rot # After the band change point, add incremental rotation # (we'll wipe this when we get to next fchange_idx) offsets[fchange_idx:] += self.per_visit_rot * np.arange(len(band_list) - fchange_idx) offsets = np.where(offsets > self.max_rot, self.max_rot, offsets) return offsets def __call__(self, observation_list, conditions): # Generate offsets in camera rotator band_list = [np.asarray(obs["band"]).item() for obs in observation_list] offsets = self._generate_offsets_band_change(band_list, conditions.mjd, conditions.rot_tel_pos) for i, obs in enumerate(observation_list): alt, az = _approx_ra_dec2_alt_az( obs["RA"], obs["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, ) obs_pa = _approx_altaz2pa(alt, az, conditions.site.latitude_rad) obs["rotSkyPos"] = self.rc._rottelpos2rotskypos(offsets[i], obs_pa) obs["rotTelPos"] = offsets[i] return observation_list
[docs] class ComCamGridDitherDetailer(BaseDetailer): """ Generate an offset pattern to synthesize a 2x2 grid of ComCam pointings. Parameters ---------- rotTelPosDesired : `float`, (0.) The physical rotation angle of the camera rotator (degrees) scale : `float` (0.355) Half of the offset between grid pointing centers. (degrees) dither : `float` (0.05) Dither offsets within grid to fill chip gaps. (degrees) telescope : `str`, ("comcam") Telescope name. Default "comcam". This is used to determine conversions between rotSkyPos and rotTelPos. """ def __init__(self, rotTelPosDesired=0.0, scale=0.355, dither=0.05, telescope="comcam"): self.survey_features = {} self.rotTelPosDesired = np.radians(rotTelPosDesired) self.scale = np.radians(scale) self.dither = np.radians(dither) self.rc = rotation_converter(telescope=telescope) def _rotate(self, x, y, angle): x_rot = x * np.cos(angle) - y * np.sin(angle) y_rot = x * np.sin(angle) + y * np.cos(angle) return x_rot, y_rot def _generate_offsets(self, n_offsets, band_list, rotSkyPos): # 2 x 2 pointing grid x_grid = np.array([-1.0 * self.scale, -1.0 * self.scale, self.scale, self.scale]) y_grid = np.array([-1.0 * self.scale, self.scale, self.scale, -1.0 * self.scale]) x_grid_rot, y_grid_rot = self._rotate(x_grid, y_grid, -1.0 * rotSkyPos) offsets_grid_rot = np.array([x_grid_rot, y_grid_rot]).T # Dither pattern within grid to fill chip gaps # Psuedo-random offsets x_dither = np.array( [ 0.0, -0.5 * self.dither, -1.25 * self.dither, 1.5 * self.dither, 0.75 * self.dither, ] ) y_dither = np.array( [ 0.0, -0.75 * self.dither, 1.5 * self.dither, 1.25 * self.dither, -0.5 * self.dither, ] ) x_dither_rot, y_dither_rot = self._rotate(x_dither, y_dither, -1.0 * rotSkyPos) offsets_dither_rot = np.array([x_dither_rot, y_dither_rot]).T # Find the indices of the band changes band_changes = np.where(np.array(band_list[:-1]) != np.array(band_list[1:]))[0] band_changes = np.concatenate([np.array([-1]), band_changes]) band_changes += 1 offsets = [] index_band = 0 for ii in range(0, n_offsets): if ii in band_changes: # Reset the count after each band change index_band = 0 index_grid = index_band % 4 index_dither = np.floor(index_band / 4).astype(int) % 5 offsets.append(offsets_grid_rot[index_grid] + offsets_dither_rot[index_dither]) index_band += 1 return np.vstack(offsets) def __call__(self, observation_list, conditions): if len(observation_list) == 0: return observation_list band_list = [np.asarray(obs["band"]).item() for obs in observation_list] # Initial estimate of rotSkyPos corresponding to desired rotTelPos alt, az, pa = _approx_ra_dec2_alt_az( observation_list[0]["RA"], observation_list[0]["dec"], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, return_pa=True, ) rotSkyPos = self.rc._rottelpos2rotskypos(self.rotTelPosDesired, pa) # Generate offsets in RA and Dec offsets = self._generate_offsets(len(observation_list), band_list, rotSkyPos) # Project offsets onto sky obs_array = np.concatenate(observation_list) new_ra, new_dec = gnomonic_project_tosky( offsets[:, 0], offsets[:, 1], obs_array["RA"], obs_array["dec"] ) new_ra, new_dec = wrap_ra_dec(new_ra, new_dec) # Update observations for ii in range(0, len(observation_list)): observation_list[ii]["RA"] = new_ra[ii] observation_list[ii]["dec"] = new_dec[ii] alt, az, pa = _approx_ra_dec2_alt_az( new_ra[ii], new_dec[ii], conditions.site.latitude_rad, conditions.site.longitude_rad, conditions.mjd, return_pa=True, ) observation_list[ii]["rotSkyPos"] = rotSkyPos observation_list[ii]["rotTelPos"] = self.rc._rotskypos2rottelpos(rotSkyPos, pa) return observation_list