Source code for rubin_scheduler.scheduler.surveys.long_gap_survey

__all__ = ("LongGapSurvey",)

import logging
from copy import copy, deepcopy

import numpy as np
import pandas as pd

from rubin_scheduler.scheduler.surveys import BaseSurvey
from rubin_scheduler.scheduler.utils import ScheduledObservationArray
from rubin_scheduler.utils import Site, _approx_ra_dec2_alt_az

log = logging.getLogger(__name__)


[docs] class LongGapSurvey(BaseSurvey): """ Parameters ---------- blob_survey : rubin_scheduler.scheduler.surveys.BlobSurvey A survey object that we will want to take repeat measurments of sometime later in the evening scripted_survey : rubin_scheduler.scheduler.surveys.ScriptedSurvey A scripted survey object that will have a queue updated with objects to observe later. gap range : list of 2 floats The desired gap range (hours) long_name : str The string to put in the observation 'note' for the scripted observations scripted_tol : float The tolerance for when scripted observations can execute (hours) after_meridian : bool (False) If True, force the scripted obsrevations to happen after they pass through the meridian. This can help make sure we don't hit the zenith exclusion zone. hour_step : float (0.5) The amount of time to step scheduled observations forward if they could try to execute in the zenith avoidance area (hours). Only used if `avoid_zenith` is True. ha_min(_max) : float (0,24) Trying to set so they don't acctually get used. """ def __init__( self, blob_survey, scripted_survey, gap_range=[2, 10], long_name="long", scripted_tol=2.0, alt_min=20, alt_max=80.0, ha_min=24, ha_max=0.0, sun_alt_max=-18.0, flush_time=2.0, dist_tol=1.0, block_length=33.0, reverse=True, seed=42, night_max=50000, avoid_zenith=True, site=None, hour_step=0.5, survey_name=None, ): self.blob_survey = blob_survey self.scripted_survey = scripted_survey # Set the nside to be the same as the blob self.nside = self.blob_survey.nside self.night = -1 self.gap_range = np.array(gap_range) / 24.0 # To days rng = np.random.default_rng(seed) self.gaps = rng.uniform(self.gap_range.min(), self.gap_range.max(), night_max) self.gap = 0.0 if site is None: self.site = Site("LSST") self.long_name = long_name if survey_name is None: self._generate_survey_name() else: self.survey_name = survey_name self.scripted_tol = scripted_tol / 24.0 # To days self.alt_min = np.radians(alt_min) self.alt_max = np.radians(alt_max) self.sun_alt_max = np.radians(sun_alt_max) self.ha_min = ha_min self.ha_max = ha_max self.flush_time = flush_time / 24.0 self.dist_tol = np.radians(dist_tol) self.block_length = block_length / 60 / 24.0 self.reverse = reverse self.avoid_zenith = avoid_zenith self.mjd_step = hour_step / 24.0 def _generate_survey_name(self): self.blob_survey.survey_name = f"Long Gap {self.blob_survey.survey_name}" self.scripted_survey.survey_name = "Long Gap Scripted Triplet" self.survey_name = f"Long Gap ({self.blob_survey.survey_name} + Triplet)" def _schedule_obs(self, observations): """Take incoming observations and decide if they should be added to the scripted survey to try and be observed again later """ # Only match if we have completed the second of a pair and are # in most recent night. # note, np.where doesn't support using scalars if np.size(observations) == 1: if (observations["scheduler_note"] == self.blob_survey.scheduler_note + ", b") & ( observations["night"] == np.max(observations["night"]) ): need_to_observe = np.array([0]) else: need_to_observe = np.array([]) else: need_to_observe = np.where( (observations["scheduler_note"] == self.blob_survey.scheduler_note + ", b") & (observations["night"] == np.max(observations["night"])) )[0] # Set to the proper gap self.gap = self.gaps[np.max(observations["night"])] # If the incoming observation needs to have something # scheduled later if np.size(need_to_observe) > 0: sched_array = ScheduledObservationArray(n=need_to_observe.size) for dt in np.intersect1d(observations.dtype.names, sched_array.dtype.names): if np.size(observations) == 1: sched_array[dt] = observations[dt] else: sched_array[dt] = observations[need_to_observe][dt] sched_array["mjd_tol"] = self.scripted_tol sched_array["alt_min"] = self.alt_min sched_array["alt_max"] = self.alt_max sched_array["HA_min"] = self.ha_min sched_array["HA_max"] = self.ha_max sched_array["sun_alt_max"] = self.sun_alt_max if np.size(observations) == 1: sched_array["flush_by_mjd"] = observations["mjd"] + self.flush_time + self.gap sched_array["mjd"] = observations["mjd"] + self.gap else: sched_array["flush_by_mjd"] = ( observations[need_to_observe]["mjd"] + self.flush_time + self.gap ) sched_array["mjd"] = observations[need_to_observe]["mjd"] + self.gap sched_array["dist_tol"] = self.dist_tol if self.avoid_zenith: # when is the earliest we expect things could execute earliest_mjd = sched_array["mjd"] - sched_array["mjd_tol"] alts = [] mjds = np.arange( np.min(earliest_mjd), np.max(sched_array["mjd"]) + self.mjd_step, self.mjd_step, ) # Let's compute the alt of everything at earliest and # scheduled for mjd in mjds: alt, az = _approx_ra_dec2_alt_az( sched_array["RA"], sched_array["dec"], self.site.latitude_rad, self.site.longitude_rad, mjd, ) alts.append(np.max(alt)) while np.max(alts) > self.alt_max: alts = [] sched_array["mjd"] += self.mjd_step sched_array["flush_by_mjd"] += self.mjd_step earliest_mjd = sched_array["mjd"] - sched_array["mjd_tol"] # Let's compute the alt of everything then mjds = np.arange( np.min(earliest_mjd), np.max(sched_array["mjd"]) + self.mjd_step, self.mjd_step, ) # Let's compute the alt of everything at earliest # and scheduled for mjd in mjds: alt, az = _approx_ra_dec2_alt_az( sched_array["RA"], sched_array["dec"], self.site.latitude_rad, self.site.longitude_rad, mjd, ) alts.append(np.max(alt)) # Make sure these have the note filled in sched_array["scheduler_note"] = self.long_name sched_array["target_name"] = "" sched_array["observation_reason"] = "FBS" # Don't let the desired rotSkyPos block the observation. sched_array["rotSkyPos_desired"] = sched_array["rotSkyPos"] sched_array["rotSkyPos"] = np.nan # See if we need to append things to the scripted survey object if self.scripted_survey.obs_wanted is not None: sched_array = np.concatenate([self.scripted_survey.obs_wanted, sched_array]) self.scripted_survey.set_script(sched_array)
[docs] def add_observations_array(self, observations_array_in, observations_hpid_in): self._schedule_obs(observations_array_in) self.blob_survey.add_observations_array(observations_array_in, observations_hpid_in) self.scripted_survey.add_observations_array(observations_array_in, observations_hpid_in)
def add_observation(self, observation, **kwargs): self._schedule_obs(observation) self.blob_survey.add_observation(observation, **kwargs) self.scripted_survey.add_observation(observation, **kwargs) def _check_feasibility(self, conditions): f1 = self.blob_survey._check_feasibility(conditions) f2 = self.scripted_survey._check_feasibility(conditions) # If either one is able to go, we can observe result = f1 | f2 return result def get_scheduled_obs(self): return self.scripted_survey.get_scheduled_obs()
[docs] def calc_reward_function(self, conditions): if conditions.night != self.night: # Clear out the scheduled observations self.scripted_survey.clear_script() self.night = copy(conditions.night) self.gap = self.gaps[conditions.night] time_remaining = conditions.sun_n18_rising - conditions.mjd if self.gap > time_remaining: self.gap = time_remaining - self.block_length # XXX-need to reach into the blob and set what the gap # is I guess self.r1 = self.blob_survey.calc_reward_function(conditions) self.r2 = self.scripted_survey.calc_reward_function(conditions) self.reward_checked = True return np.nanmax([np.nanmax(self.r1), np.nanmax(self.r2)])
def generate_observations_rough(self, conditions): """""" pass def generate_observations(self, conditions): if not self.reward_checked: self.reward = self.calc_reward_function(conditions) # Check for any pre-scheduled if self.r2 > -np.inf: observations = self.scripted_survey.generate_observations(conditions) else: observations = None # Check if it's a good time for a blob if observations is None: observations = self.blob_survey.generate_observations(conditions) return observations
[docs] def make_reward_df(self, conditions, accum=True): """Create a pandas.DataFrame describing the reward from the survey. Parameters ---------- conditions : `rubin_scheduler.scheduler.features.Conditions` Conditions for which rewards are to be returned accum : `bool` Include accumulated rewards Returns ------- reward_df : `pandas.DataFrame` A table of surveys listing the rewards. """ test_survey = deepcopy(self) reward = test_survey.calc_reward_function(conditions) feasible = test_survey._check_feasibility(conditions) and reward > np.finfo(reward).min if accum: reward_df = pd.DataFrame( { "basis_function": ["None"], "feasible": [feasible], "max_basis_reward": [reward], "basis_area": [0], "basis_weight": [1], "max_accum_reward": [reward], "accum_area": [0], } ) else: reward_df = pd.DataFrame( { "basis_function": ["None"], "feasible": [feasible], "max_basis_reward": [reward], "basis_area": [0], "basis_weight": [1], } ) return reward_df