__all__ = ("DescDdf", "generate_desc_dd_surveys")
import numpy as np
import rubin_scheduler.scheduler.basis_functions as basis_functions
from rubin_scheduler.scheduler.surveys import BaseSurvey
from rubin_scheduler.scheduler.utils import ObservationArray
[docs]
class DescDdf(BaseSurvey):
"""DDF survey based on Scolnic et al Cadence White Paper."""
def __init__(
self,
basis_functions,
RA,
dec,
sequences=None,
exptime=30.0,
nexp=1,
ignore_obs=None,
survey_name="DD_DESC",
reward_value=101.0,
readtime=2.0,
filter_change_time=120.0,
nside=None,
flush_pad=30.0,
seed=42,
detailers=None,
):
super(DescDdf, self).__init__(
nside=nside,
basis_functions=basis_functions,
detailers=detailers,
ignore_obs=ignore_obs,
)
self.ra = np.radians(RA)
self.ra_hours = RA / 360.0 * 24.0
self.dec = np.radians(dec)
self.survey_name = survey_name
self.reward_value = reward_value
self.flush_pad = flush_pad / 60.0 / 24.0 # To days
self.simple_obs = ObservationArray()
self.simple_obs["RA"] = np.radians(RA)
self.simple_obs["dec"] = np.radians(dec)
self.simple_obs["exptime"] = exptime
self.simple_obs["nexp"] = nexp
self.simple_obs["scheduler_note"] = survey_name
# Define the sequences we would like to do
if sequences is None:
self.sequences = [{"u": 2, "g": 2, "r": 4, "i": 8}, {"z": 25, "y": 4}, None]
else:
self.sequences = sequences
self.approx_times = []
for sequence in self.sequences:
if sequence is None:
self.approx_times.append(0)
else:
n_exp_in_seq = np.sum(list(sequence.values()))
time_needed = filter_change_time * len(sequence.keys())
time_needed += exptime * n_exp_in_seq
time_needed += readtime * n_exp_in_seq * nexp
self.approx_times.append(time_needed / 3600.0 / 24.0)
# Track what we last tried to do
# XXX-this should probably go into self.extra_features or
# something for consistency.
self.sequence_index = 0
self.last_night_observed = -100
[docs]
def check_continue(self, observation, conditions):
# feasibility basis functions?
"""
This method enables external calls to check if a given
observations that belongs to this survey is
feasible or not. This is called once a sequence has started to
make sure it can continue.
XXX--TODO: Need to decide if we want to develope check_continue
or instead hold the sequence in the survey, and be able to check
it that way.
"""
result = True
return result
def _check_feasibility(self, conditions):
"""
Check if the survey is feasable in the current conditions
"""
# No more if we've already observed this night
if self.last_night_observed == conditions.night:
return False
# Advance the sequence index if we have skipped a day intentionally
if (self.sequences[self.sequence_index] is None) & (conditions.night - self.last_night_observed > 1):
self.sequence_index = (self.sequence_index + 1) % len(self.sequences)
# If we want to skip this day
if self.sequences[self.sequence_index] is None:
return False
# The usual basis function checks
for bf in self.basis_functions:
result = bf.check_feasibility(conditions)
if not result:
return result
return result
[docs]
def calc_reward_function(self, conditions):
result = -np.inf
if self._check_feasibility(conditions):
result = self.reward_value
return result
[docs]
def generate_observations_rough(self, conditions):
result = []
for key in self.sequences[self.sequence_index]:
# Just skip adding the z-band ones if it's not loaded
if key in conditions.mounted_filters:
for i in np.arange(self.sequences[self.sequence_index][key]):
temp_obs = self.simple_obs.copy()
temp_obs["filter"] = key
# XXX--need to set flush by mjd
result.append(temp_obs)
for i, obs in enumerate(result):
result[i]["flush_by_mjd"] = (
conditions.mjd + self.approx_times[self.sequence_index] + self.flush_pad
)
# Just assuming this sequence gets observed.
self.last_night_observed = conditions.night
self.sequence_index = (self.sequence_index + 1) % len(self.sequences)
return result
def desc_dd_bfs(RA, dec, survey_name, ha_limits, frac_total=0.0185):
"""
Convienence function to generate all the feasibility basis functions
"""
bfs = []
bfs.append(basis_functions.Not_twilight_basis_function(sun_alt_limit=-18))
bfs.append(basis_functions.Time_to_twilight_basis_function(time_needed=30.0))
bfs.append(basis_functions.Hour_Angle_limit_basis_function(RA=RA, ha_limits=ha_limits))
bfs.append(basis_functions.Rising_more_basis_function(RA=RA))
bfs.append(basis_functions.Clouded_out_basis_function())
return bfs
def generate_desc_dd_surveys(nside=None, nexp=1, detailers=None):
surveys = []
# ELAIS S1
RA = 9.45
dec = -44.0
survey_name = "DD:ELAISS1"
ha_limits = ([0.0, 1.18], [21.82, 24.0])
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
)
)
# XMM-LSS
survey_name = "DD:XMM-LSS"
RA = 35.708333
dec = -4 - 45 / 60.0
ha_limits = ([0.0, 1.3], [21.7, 24.0])
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
)
)
# Extended Chandra Deep Field South
RA = 53.125
dec = -28.0 - 6 / 60.0
survey_name = "DD:ECDFS"
ha_limits = [[0.5, 3.0], [20.0, 22.5]]
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
)
)
# COSMOS
RA = 150.1
dec = 2.0 + 10.0 / 60.0 + 55 / 3600.0
survey_name = "DD:COSMOS"
ha_limits = ([0.0, 1.5], [21.5, 24.0])
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
# have a special sequence for COSMOS
sequences = [{"g": 2, "r": 4, "i": 8}, {"z": 25, "y": 4}]
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
sequences=sequences,
)
)
# Just do the two Euclid fields independently for now
survey_name = "DD:EDFSa"
RA = 58.97
dec = -49.28
ha_limits = ([0.0, 1.5], [23.0, 24.0])
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
)
)
survey_name = "DD:EDFSb"
RA = 63.6
dec = -47.60
ha_limits = ([0.0, 1.5], [23.0, 24.0])
bfs = desc_dd_bfs(RA, dec, survey_name, ha_limits)
surveys.append(
DescDdf(
bfs,
RA,
dec,
survey_name=survey_name,
reward_value=100,
nside=nside,
nexp=nexp,
detailers=detailers,
)
)
return surveys
pass