Source code for rubin_scheduler.scheduler.schedulers.queue_manager

__all__ = ("BaseQueueManager",)

import healpy as hp
import numpy as np

from rubin_scheduler.scheduler.utils import IntRounded, ObservationArray


[docs] class BaseQueueManager: """Class for managing a queue of desired observations. NOTE: If using masking basis functions, one should ensure Survey objects are using the same masks as the QueueManager, else one will end up in a loop where surveys propose visits that the QueueManager always rejects. E.g., if the surveys don't mask clouds, all their observations can be rejected by the QueueManager. Parameters ---------- detailers : `list` List of detailers to apply to observations as they are selected from the queue. Mostly for setting camera rotation angles dynamically, or skipping observations that have become clouded out or gotten close to zenith. basis_functions : `list` List of basis functions. Assumed to be only used for masking, thus no weights for the basis functions. """ def __init__(self, detailers=None, basis_functions=None): if detailers is None: self.detailers = [] else: self.detailers = detailers if basis_functions is None: self.basis_functions = [] else: self.basis_functions = basis_functions self.desired_observations_array = ObservationArray(n=0) # Array to track which desired_observations_array # have been observed self.need_observing = [] def flush_queue(self): self.desired_observations_array = ObservationArray(n=0) self.need_observing = [] def set_queue(self, observation_array): self.desired_observations_array = observation_array self.need_observing = np.ones(observation_array.size, dtype=bool) def add_observation(self, observation, **kwargs): if self.desired_observations_array is not None: match_indx = np.where(self.desired_observations_array["target_id"] == observation["target_id"])[0] if np.size(match_indx) > 0: self.need_observing[match_indx] = False for bf in self.basis_functions: bf.add_observation(observation, **kwargs) for detailer in self.detailers: detailer.add_observation(observation, **kwargs) def add_observations_array(self, observation_array, observations_hpid_in): if self.desired_observations_array is not None: indx = np.isin(self.desired_observations_array["target_id"], observation_array["target_id"]) if np.size(indx) > 0: self.need_observing[indx] = False good = np.isin(observations_hpid_in["ID"], observation_array["ID"]) observations_hpid = observations_hpid_in[good] for bf in self.basis_functions: bf.add_observations_array(observation_array, observations_hpid) for detailer in self.detailers: detailer.add_observations_array(observation_array, observations_hpid) def compute_reward(self, conditions): reward = 0 for bf in self.basis_functions: reward += bf(conditions) return reward def _check_queue_mjd_only(self, mjd, conditions): """ Check if there are things in the queue that can be executed using only MJD and not full conditions. This is primarily used by sim_runner to reduce calls calculating updated conditions when they are not needed. """ # With queue manager doing cloud dodging, need full conditions # now. Go ahead and use stale conditions, but update the # mjd so things get flushed if needed. valid_obs = self.find_valid_observations_indx(conditions, mjd=mjd) return np.size(valid_obs) > 0
[docs] def find_valid_observations_indx(self, conditions, mjd=None): """Return the indices of desired_observations_array that can be observed """ if mjd is None: mjd = conditions.mjd # Still valid date mjd_ok = np.where( (IntRounded(mjd) < IntRounded(self.desired_observations_array["flush_by_mjd"])) | (self.desired_observations_array["flush_by_mjd"] == 0) )[0] valid = np.where(self.need_observing)[0] valid = np.intersect1d(mjd_ok, valid) # Compute reward from basis functions reward = self.compute_reward(conditions) # If reward is an array, then it's a HEALpy map and we # need to interpolate to the actual positions we want. # now to interpolate to the reward positions if np.size(reward) > 1: reward_interp = hp.get_interp_val( reward, np.degrees(self.desired_observations_array["RA"][valid]), np.degrees(self.desired_observations_array["dec"][valid]), lonlat=True, ) sub_valid_reward = np.isfinite(reward_interp) valid = np.ravel(valid[sub_valid_reward]) return valid
[docs] def request_observation(self, conditions, whole_queue=False, n_return=1): """ Parameters ---------- conditions : `` Conditions object whole_queue : `bool` If True, return all desired observations that pass basis function checks. Default False. n_return : `int` Number of observations to return. Ignored if whole_queue is True. """ indx = self.find_valid_observations_indx(conditions) if not whole_queue: if indx.size > n_return: indx = indx[0:n_return] result = self.desired_observations_array[indx].copy() if np.size(result) > 0: for det in self.detailers: result = det(result, conditions) return result
[docs] def return_active_queue(self): """Return array of observations that are waiting to be executed""" return self.desired_observations_array[self.need_observing]