Source code for rubin_scheduler.scheduler.schedulers.summit_wrapper
__all__ = ("SummitWrapper",)
import copy
import warnings
import numpy as np
from rubin_scheduler.scheduler.utils import IntRounded
from rubin_scheduler.utils import _ra_dec2_hpid
[docs]
class SummitWrapper:
"""Wrap a core scheduler so observations can be requested
and are assumed to be completed.
Parameters
----------
core_scheduler : `rubin_scheduler.scheduler.schedulers.CoreScheduler`
A CoreScheduler object.
"""
def __init__(self, core_scheduler):
# A standard CoreScheduler object that
# is only given observations that have been
# completed.
self.core_scheduler = core_scheduler
self.conditions = None
self.clear_ahead()
# For compatibility with sim_runner.
# Not tracking flushed observations
self.flushed = 0
[docs]
def clear_ahead(self):
"""Reset the ahead scheduler and pending observations"""
# The ahead_scheduler acts as a copy of
# core scheduler, but will assume any requested
# observations are completed instantly, so requesting
# multiple observations does not risk getting the same
# observation multiple times.
self.ahead_scheduler = copy.deepcopy(self.core_scheduler)
self.requested_but_unadded_ids = []
self.requested_but_unadded_obs = []
# Have we added observations so
# self.core_scheduler and self.ahead_scheduler
# are out of sync?
self.need_replay = False
def flush_queue(self):
self.core_scheduler.flush_queue()
self.clear_ahead()
[docs]
def add_observation(self, observation):
"""Add actually completed observations."""
self.core_scheduler.add_observation(observation)
# Assume everything up to the ID has been observed.
# Should be ok to add out of order, as long as everything up
# to the largest ID is added before request_observation is called.
indx = np.searchsorted(
self.requested_but_unadded_ids, observation["target_id"].view(np.ndarray).max(), side="right"
)
# Should this delete everything up to the indx?
# I think that's ok if we assume things will get added in order
if indx.size > 0:
self.requested_but_unadded_ids = self.requested_but_unadded_ids[indx:]
self.requested_but_unadded_obs = self.requested_but_unadded_obs[indx:]
self.need_replay = True
def _check_queue_mjd_only(self, mjd):
"""
Check if there are things in the queue that can be executed
using only MJD and not full conditions.
This is primarily used by sim_runner to reduce calls calculating
updated conditions when they are not needed.
"""
result = False
if np.sum(self.ahead_scheduler.queue_manager.need_observing) > 0:
fb_mjd = self.ahead_scheduler.return_active_queue()["flush_by_mjd"]
not_flushed_yet = np.where(IntRounded(mjd) < IntRounded(fb_mjd))[0]
no_flush_set = np.where(fb_mjd == 0)[0]
if (np.size(not_flushed_yet) > 0) | (np.size(no_flush_set) > 0):
result = True
return result
def update_conditions(self, conditions):
self.conditions = conditions
self.ahead_scheduler.update_conditions(conditions)
self.core_scheduler.update_conditions(conditions)
def _fill_obs_values(self, observation):
"""Fill in values of an observation assuming it will be
observed approximately now.
"""
# Nearest neighbor from conditions maps
hpid = _ra_dec2_hpid(self.conditions.nside, observation["RA"], observation["dec"])
# XXX--Need to go through and formally list the columns
# that are minimally required for adding observations to Features.
# This seems to work, but nothing stopping someone from asking for
# something random like moon phase and then it will fail. Maybe
# should set all un-defined columns to np.nan so it's clear they
# can't be used unless this method is updated.
observation["mjd"] = self.conditions.mjd
observation["FWHMeff"] = self.conditions.fwhm_eff[observation["band"][0]][hpid]
observation["airmass"] = self.conditions.airmass[hpid]
observation["fivesigmadepth"] = self.conditions.m5_depth[observation["band"][0]][hpid]
observation["night"] = self.conditions.night
return observation
[docs]
def request_observation(self, mjd=None):
"""Request an observation, assuming previously requested
observations were successfully observed.
"""
if mjd is None:
mjd = self.conditions.mjd
# If observations have been added, need to
# rebuild ahead_scheduler
if self.need_replay:
self.ahead_scheduler = copy.deepcopy(self.core_scheduler)
for obs in self.requested_but_unadded_obs:
if mjd < obs["mjd"]:
msg = f"""Adding observation with MJD={obs["mjd"]}, but think it is
currently MJD={mjd}, so we are adding observations
from the future. That seems like it shouldn't be possible.
"""
warnings.warn(msg)
self.ahead_scheduler.add_observation(obs)
self.need_replay = False
self.ahead_scheduler.update_conditions(self.conditions)
# If we fill the queue, need to add that to the core scheduler
# so it knows about it
if self.ahead_scheduler.return_active_queue().size > 0:
result_plain = self.ahead_scheduler.request_observation(mjd=mjd)
else:
# Now, we either refill the queue or
# generate a one-off new observation and have an empty queue.
result_plain = self.ahead_scheduler.request_observation(mjd=mjd)
self.core_scheduler.append_to_queue(result_plain.copy())
ahead_active_queue = self.ahead_scheduler.return_active_queue()
if ahead_active_queue.size > 0:
self.core_scheduler.append_to_queue(ahead_active_queue)
obs_filled = self._fill_obs_values(result_plain.copy())
self.requested_but_unadded_ids.append(obs_filled["target_id"].view(np.ndarray).max())
self.requested_but_unadded_obs.append(obs_filled)
# Add requested observation to the ahead
# scheduler, so if we call request_observation again,
# it will think this has been completed.
self.ahead_scheduler.add_observation(obs_filled)
return result_plain