__all__ = ("FieldSurvey", "FieldAltAzSurvey")
import copy
from functools import cached_property
import numpy as np
from rubin_scheduler.utils import DEFAULT_NSIDE, _approx_alt_az2_ra_dec, _ra_dec2_hpid
from ..detailers import AltAz2RaDecDetailer
from ..features import LastObservation, NObsCount
from ..utils import ObservationArray
from . import BaseSurvey
[docs]
class FieldSurvey(BaseSurvey):
"""A survey class for running field surveys.
Parameters
----------
basis_functions : `list` [`rubin_scheduler.scheduler.basis_function`]
List of basis_function objects
detailers : `list` [`rubin_scheduler.scheduler.detailer`] objects
The detailers to apply to the list of observations.
RA : `float`
The RA of the field (degrees)
dec : `float`
The dec of the field to observe (degrees)
sequence : `list` [`str`]
The sequence of observations to take. (specify which filters to use).
nvisits : `dict` {`str`: `int`}
Dictionary of the number of visits in each filter.
Default of None will use a backup sequence of 20 visits per filter.
Must contain all filters in sequence.
exptimes : `dict` {`str`: `float`}
Dictionary of the exposure time for visits in each filter.
Default of None will use a backup sequence of 38s in u, and
29.2s in all other bands. Must contain all filters in sequence.
nexps : dict` {`str`: `int`}
Dictionary of the number of exposures per visit in each filter.
Default of None will use a backup sequence of 1 exposure per visit
in u band, 2 in all other bands. Must contain all filters in sequence.
ignore_obs : `list` [`str`] or None
Ignore observations with this string in the `scheduler_note`.
Will ignore observations which match subsets of the string, as well as
the entire string. Ignoring 'mysurvey23' will also ignore 'mysurvey2'.
survey_name : `str` or None.
The name to give this survey, for debugging and visualization purposes.
Also propagated to the 'target_name' in the observation.
The default None will construct a name based on the
RA/Dec of the field.
scheduler_note : `str` or None
The value to include in the scheduler note.
The scheduler note is for internal, scheduler, use for the purposes of
identifying observations to ignore or include for a survey or feature.
readtime : `float`
Readout time for computing approximate time of observing
the sequence. (seconds)
filter_change_time : `float`
Filter change time, on average. Used for computing approximate
time for the observing sequence. (seconds)
nside : `float` or None
Nside for computing survey basis functions and maps.
The default of None will use rubin_scheduler.utils.set_default_nside().
flush_pad : `float`
How long to hold observations in the queue after they
were expected to be completed (minutes).
"""
def __init__(
self,
basis_functions,
RA,
dec,
sequence="ugrizy",
nvisits=None,
exptimes=None,
nexps=None,
ignore_obs=None,
survey_name=None,
target_name=None,
science_program=None,
observation_reason=None,
scheduler_note=None,
readtime=2.4,
filter_change_time=120.0,
nside=DEFAULT_NSIDE,
flush_pad=30.0,
detailers=None,
):
default_nvisits = {"u": 20, "g": 20, "r": 20, "i": 20, "z": 20, "y": 20}
default_exptimes = {"u": 38, "g": 29.2, "r": 29.2, "i": 29.2, "z": 29.2, "y": 29.2}
default_nexps = {"u": 1, "g": 2, "r": 2, "i": 2, "z": 2, "y": 2}
self.ra = np.radians(RA)
self.ra_hours = RA / 360.0 * 24.0
self.dec = np.radians(dec)
self.ra_deg, self.dec_deg = RA, dec
self.survey_name = survey_name
if self.survey_name is None:
self._generate_survey_name(target_name=target_name)
# Backfill target name if it wasn't set
if target_name is None:
target_name = self.survey_name
super().__init__(
nside=nside,
basis_functions=basis_functions,
detailers=detailers,
ignore_obs=ignore_obs,
survey_name=self.survey_name,
target_name=target_name,
science_program=science_program,
observation_reason=observation_reason,
)
# It's useful to save these as class attributes too
self.target_name = target_name
self.science_program = science_program
self.observation_reason = observation_reason
self.indx = _ra_dec2_hpid(self.nside, self.ra, self.dec)
# Set all basis function equal.
self.basis_weights = np.ones(len(basis_functions)) / len(basis_functions)
self.flush_pad = flush_pad / 60.0 / 24.0 # To days
self.filter_sequence = []
self.scheduler_note = scheduler_note
if self.scheduler_note is None:
self.scheduler_note = self.survey_name
# This sets up what a requested "observation" looks like.
# For sequences, each 'observation' is more than one exposure.
# When generating actual observations, filters which are not available
# are not included in the requested sequence.
if nvisits is None:
nvisits = default_nvisits
if exptimes is None:
exptimes = default_exptimes
if nexps is None:
nexps = default_nexps
# Do a little shuffling if this was not configured quite as expected
if isinstance(sequence, str):
if isinstance(nvisits, (float, int)):
nvisits = dict([(filtername, nvisits) for filtername in sequence])
if isinstance(exptimes, (float, int)):
exptimes = dict([(filtername, exptimes) for filtername in sequence])
if isinstance(nexps, (float, int)):
nexps = dict([(filtername, nexps) for filtername in sequence])
if isinstance(sequence, ObservationArray) | isinstance(sequence[0], ObservationArray):
self.observations = sequence
else:
self.observations = []
for filtername in sequence:
for j in range(nvisits[filtername]):
obs = ObservationArray()
obs["filter"] = filtername
obs["exptime"] = exptimes[filtername]
obs["RA"] = self.ra
obs["dec"] = self.dec
obs["nexp"] = nexps[filtername]
obs["scheduler_note"] = self.scheduler_note
self.observations.append(obs)
# Let's just make this an array for ease of use
self.observations = np.concatenate(self.observations)
order = np.argsort(self.observations["filter"])
self.observations = self.observations[order]
n_filter_change = np.size(np.unique(self.observations["filter"]))
# Make an estimate of how long a sequence will take.
# Assumes no major rotational or spatial
# dithering slowing things down.
# Does not account for unavailable filters.
self.approx_time = (
np.sum(self.observations["exptime"] + readtime * self.observations["nexp"])
+ filter_change_time * n_filter_change
)
# convert to days, for internal approximation in timestep sizes
self.approx_time /= 3600.0 / 24.0
# This is the only index in the healpix arrays that will be considered
self.indx = _ra_dec2_hpid(self.nside, self.ra, self.dec)
# Tucking this here so we can look at how many observations
# recorded - both for any note and for this note.
self.extra_features["ObsRecorded"] = NObsCount(scheduler_note=None)
self.extra_features["LastObs"] = LastObservation(scheduler_note=None)
self.extra_features["ObsRecorded_note"] = NObsCount(scheduler_note=self.scheduler_note)
self.extra_features["LastObs_note"] = LastObservation(scheduler_note=self.scheduler_note)
def _generate_survey_name(self, target_name=None):
if target_name is not None:
self.survey_name = target_name
else:
self.survey_name = f"Field {self.ra_deg :.2f} {self.dec_deg :.2f}"
@cached_property
def roi_hpid(self):
hpid = _ra_dec2_hpid(self.nside, self.ra, self.dec)
return hpid
[docs]
def check_continue(self, observation, conditions):
# feasibility basis functions?
"""
This method enables external calls to check if a given
observations that belongs to this survey is
feasible or not. This is called once a sequence has
started to make sure it can continue.
XXX--TODO: Need to decide if we want to develop check_continue,
or instead hold the sequence in the survey, and be able to check
it that way.
(note that this may depend a lot on how the SchedulerCSC works)
"""
return True
[docs]
def calc_reward_function(self, conditions):
# only calculates reward at the index for the RA/Dec of the field
self.reward_checked = True
if self._check_feasibility(conditions):
self.reward = 0.0
for bf, weight in zip(self.basis_functions, self.basis_weights):
basis_value = bf(conditions, indx=self.indx)
self.reward += basis_value * weight
if not np.isscalar(self.reward):
self.reward = np.sum(self.reward[self.indx])
if np.any(np.isinf(self.reward)):
self.reward = np.inf
else:
# If not feasible, negative infinity reward
self.reward = -np.inf
return self.reward
[docs]
def generate_observations_rough(self, conditions):
result = []
if self._check_feasibility(conditions):
result = copy.deepcopy(self.observations)
# Set the flush_by
result["flush_by_mjd"] = conditions.mjd + self.approx_time + self.flush_pad
# remove filters that are not mounted
mask = np.isin(result["filter"], conditions.mounted_filters)
result = result[mask]
# Put current loaded filter first
ind1 = np.where(result["filter"] == conditions.current_filter)[0]
ind2 = np.where(result["filter"] != conditions.current_filter)[0]
result = result[ind1.tolist() + (ind2.tolist())]
# convert to list of array.
final_result = [
row.reshape(
1,
)
for row in result
]
result = final_result
return result
def __repr__(self):
return (
f"<{self.__class__.__name__} survey_name='{self.survey_name}'"
f", RA={self.ra}, dec={self.dec} at {hex(id(self))}>"
)
[docs]
class FieldAltAzSurvey(FieldSurvey):
"""A clone of FieldSurvey that takes alt,az rather than RA,dec.
Parameters
----------
basis_functions : `list` [`rubin_scheduler.scheduler.basis_function`]
List of basis_function objects
detailers : `list` [`rubin_scheduler.scheduler.detailer`] objects
The detailers to apply to the list of observations.
az : `float`
The azimuth of the field (degrees)
alt : `float`
The altitude of the field to observe (degrees)
sequence : `list` [`str`]
The sequence of observations to take. (specify which filters to use).
nvisits : `dict` {`str`: `int`}
Dictionary of the number of visits in each filter.
Default of None will use a backup sequence of 20 visits per filter.
Must contain all filters in sequence.
exptimes : `dict` {`str`: `float`}
Dictionary of the exposure time for visits in each filter.
Default of None will use a backup sequence of 38s in u, and
29.2s in all other bands. Must contain all filters in sequence.
nexps : dict` {`str`: `int`}
Dictionary of the number of exposures per visit in each filter.
Default of None will use a backup sequence of 1 exposure per visit
in u band, 2 in all other bands. Must contain all filters in sequence.
ignore_obs : `list` [`str`] or None
Ignore observations with this string in the `scheduler_note`.
Will ignore observations which match subsets of the string, as well as
the entire string. Ignoring 'mysurvey23' will also ignore 'mysurvey2'.
survey_name : `str` or None.
The name to give this survey, for debugging and visualization purposes.
Also propagated to the 'target_name' in the observation.
The default None will construct a name based on the
RA/Dec of the field.
scheduler_note : `str` or None
The value to include in the scheduler note.
The scheduler note is for internal, scheduler, use for the purposes of
identifying observations to ignore or include for a survey or feature.
readtime : `float`
Readout time for computing approximate time of observing
the sequence. (seconds)
filter_change_time : `float`
Filter change time, on average. Used for computing approximate
time for the observing sequence. (seconds)
nside : `float` or None
Nside for computing survey basis functions and maps.
The default of None will use rubin_scheduler.utils.set_default_nside().
flush_pad : `float`
How long to hold observations in the queue after they
were expected to be completed (minutes)."""
def __init__(
self,
basis_functions,
az,
alt,
sequence="ugrizy",
nvisits=None,
exptimes=None,
nexps=None,
ignore_obs=None,
survey_name=None,
target_name=None,
science_program=None,
observation_reason=None,
scheduler_note=None,
readtime=2.4,
filter_change_time=120.0,
nside=DEFAULT_NSIDE,
flush_pad=30.0,
detailers=None,
):
if detailers is None:
detailers = [AltAz2RaDecDetailer()]
# Check that an AltAz detailer is present
names_match = ["AltAz2RaDecDetailer" in det.__class__.__name__ for det in detailers]
if not np.any(names_match):
ValueError(
"detailers list does not include a AltAz2RaDecDetailer which is needed for FieldAltAzSurvey"
)
self.az = np.radians(az)
self.alt = np.radians(alt)
self.alt_deg, self.az_deg = alt, az
super().__init__(
basis_functions=basis_functions,
RA=0.0,
dec=0.0,
sequence="ugrizy",
nvisits=nvisits,
exptimes=exptimes,
nexps=nexps,
ignore_obs=ignore_obs,
survey_name=survey_name,
target_name=target_name,
science_program=science_program,
observation_reason=observation_reason,
scheduler_note=scheduler_note,
readtime=readtime,
filter_change_time=filter_change_time,
nside=nside,
flush_pad=flush_pad,
detailers=detailers,
)
# Clobber and set to None
self.ra = None
self.dec = None
# Backfill target name if it wasn't set
if target_name is None:
target_name = self.survey_name
# Default self.observations have ra,dec. Swap to alt,az
self.observations["RA"] = None
self.observations["dec"] = None
self.observations["alt"] = self.alt
self.observations["az"] = self.az
@property
def roi_hpid(self):
ra, dec = _approx_alt_az2_ra_dec([self.alt], [self.az], self.lat, self.lon, self.mjd)
hpid = _ra_dec2_hpid(self.nside, ra, dec)
return hpid
def _generate_survey_name(self, target_name=None):
if target_name is not None:
self.survey_name = target_name
else:
self.survey_name = f"Field alt,az {self.alt_deg :.2f} {self.az_deg :.2f}"
[docs]
def calc_reward_function(self, conditions):
# Only calculates reward at the roi_hpid for the RA,dec coordinate
# of the alt,az position at the current time.
self.reward_checked = True
self.mjd = conditions.mjd
self.lat = np.radians(conditions.site.latitude)
self.lon = np.radians(conditions.site.longitude)
hpid = self.roi_hpid
if self._check_feasibility(conditions):
self.reward = 0.0
for bf, weight in zip(self.basis_functions, self.basis_weights):
basis_value = bf(conditions, indx=hpid)
self.reward += basis_value * weight
if not np.isscalar(self.reward):
self.reward = np.sum(self.reward[hpid])
if np.any(np.isinf(self.reward)):
self.reward = np.inf
else:
# If not feasible, negative infinity reward
self.reward = -np.inf
return self.reward