__all__ = ("CoreScheduler",)
import logging
import time
from collections import OrderedDict
from copy import deepcopy
from io import StringIO
import healpy as hp
import numpy as np
import pandas as pd
from astropy.time import Time
from rubin_scheduler.scheduler.utils import HpInComcamFov, HpInLsstFov, IntRounded, ObservationArray
from rubin_scheduler.utils import DEFAULT_NSIDE, _hpid2_ra_dec, rotation_converter
[docs]
class CoreScheduler:
"""Core scheduler that takes requests observations,
reports observatory status and provides completed observations.
Parameters
----------
surveys : list (or list of lists) of rubin_scheduler.scheduler.survey
A list of surveys to consider.
If multiple surveys return the same highest reward value,
the survey at the earliest position in the list will be selected.
Can also be a list of lists to make heirarchical priorities
('tiers').
nside : `int`
A HEALpix nside value.
camera : `str`
Which camera to use to compute the correspondence between
visits and HEALpixels when recording observations.
Can be 'LSST' or 'comcam'.
log : `logging.Logger`
If None, a new logger is created.
keep_rewards : `bool`
Flag whether to record the rewards and basis function values
(this can be useful for schedview).
telescope : `str`
Which telescope model to use for rotTelPos/rotSkyPos conversions.
Default "rubin".
target_id_counter : int
Starting value for the target_id. If restarting observations, could
be useful to set to whatever value the scheduler was at previously.
Default 0.
"""
def __init__(
self,
surveys,
nside=DEFAULT_NSIDE,
camera="LSST",
log=None,
keep_rewards=False,
telescope="rubin",
target_id_counter=0,
):
self.keep_rewards = keep_rewards
# Use integer ns just to be sure there are no rounding issues.
self.mjd_perf_counter_offset = np.int64(Time.now().mjd * 86400000000000) - time.perf_counter_ns()
if log is None:
self.log = logging.getLogger(type(self).__name__)
else:
self.log = log.getChild(type(self).__name__)
# initialize a queue of observations to request
self.queue = []
# The indices of self.survey_lists that provided
# the last addition(s) to the queue
self.survey_index = [None, None]
# If we have a list of survey objects, convert to list-of-lists
if isinstance(surveys[0], list):
self.survey_lists = surveys
else:
self.survey_lists = [surveys]
self.nside = nside
hpid = np.arange(hp.nside2npix(nside))
self.ra_grid_rad, self.dec_grid_rad = _hpid2_ra_dec(nside, hpid)
# Should just make camera a class that takes a pointing
# and returns healpix indices
if camera == "LSST":
self.pointing2hpindx = HpInLsstFov(nside=nside)
elif camera == "comcam":
self.pointing2hpindx = HpInComcamFov(nside=nside)
else:
raise ValueError("camera %s not implamented" % camera)
self.rc = rotation_converter(telescope=telescope)
# Keep track of how many observations get flushed from the queue
self.flushed = 0
# Counter for observations added to the queue
self.target_id_counter = target_id_counter
# Set to something so it doesn't fail if never set later
self.queue_fill_mjd_ns = -1
self.queue_reward_df = None
[docs]
def flush_queue(self):
"""Like it sounds, clear any currently queued desired observations."""
self.queue = []
self.survey_index = [None, None]
[docs]
def add_observations_array(self, obs):
"""Like add_observation, but for passing many observations at once.
Assigns overlapping HEALpix IDs to each observation, then passes
the observation array and constructed observations + healpix id
to each survey.
"""
# Need to add "band" here if it wasn't populated
missing_band = np.where(obs["band"] == "")
band = np.char.rstrip(obs["filter"][missing_band], chars="_0123456789")
obs["band"][missing_band] = band
obs.sort(order="mjd")
# Generate list-of-lists for HEALPix IDs for each pointing
lol_hpids = self.pointing2hpindx(obs["RA"], obs["dec"])
hpids = []
big_array_indx = []
# Unravel the list-of-lists
for i, indxs in enumerate(lol_hpids):
for indx in indxs:
hpids.append(indx)
big_array_indx.append(i)
hpids = np.array(hpids, dtype=[("hpid", int)])
names = list(obs.dtype.names)
types = [obs[name].dtype for name in names]
names.append(hpids.dtype.names[0])
types.append(hpids["hpid"].dtype)
ndt = list(zip(names, types))
obs_array_hpid = np.empty(hpids.size, dtype=ndt)
obs_array_hpid[list(obs.dtype.names)] = obs[big_array_indx]
obs_array_hpid[hpids.dtype.names[0]] = hpids
for surveys in self.survey_lists:
for survey in surveys:
survey.add_observations_array(obs, obs_array_hpid)
if np.max(obs["target_id"]) >= self.target_id_counter:
self.target_id_counter = np.max(obs["target_id"]) + 1
[docs]
def add_observation(self, observation):
"""
Record a completed observation and update features accordingly.
Parameters
----------
observation : dict-like
An object that contains the relevant information about a
completed observation
(e.g., mjd, ra, dec, filter, rotation angle, etc)
"""
# Catch if someone passed in a slice of an observation
# rather than a full observation array
if len(observation.shape) == 0:
full_obs = ObservationArray()
full_obs[0] = observation
observation = full_obs
# Need to add "band" here if it wasn't populated
missing_band = np.where(observation["band"] == "")
band = np.char.rstrip(observation["filter"][missing_band], chars="_0123456789")
observation["band"][missing_band] = band
# Find the healpixel centers that are included in an observation
indx = self.pointing2hpindx(
observation["RA"][0], observation["dec"][0], rotSkyPos=observation["rotSkyPos"][0]
)
for surveys in self.survey_lists:
for survey in surveys:
survey.add_observation(observation, indx=indx)
if np.max(observation["target_id"]) >= self.target_id_counter:
self.target_id_counter = np.max(observation["target_id"]) + 1
[docs]
def update_conditions(self, conditions_in):
"""
Parameters
----------
conditions : dict-like
The current conditions of the telescope (pointing position,
loaded filters, cloud-mask, etc)
"""
# Add the current queue and scheduled queue to the conditions
self.conditions = conditions_in
# put the local queue in the conditions
self.conditions.queue = self.queue
# Check if any surveys have upcomming scheduled observations.
# Note that we are accumulating all of the possible scheduled
# observations, so it's up to the user to make sure things don't
# collide. The ideal implementation would be to have all the
# scheduled observations in a single survey objects,
# presumably at the highest tier of priority.
all_scheduled = []
for sl in self.survey_lists:
for sur in sl:
scheduled = sur.get_scheduled_obs()
if scheduled is not None:
all_scheduled.append(scheduled.view(np.ndarray))
if len(all_scheduled) == 0:
self.conditions.scheduled_observations = []
else:
all_scheduled = np.sort(np.concatenate(all_scheduled).ravel())
# In case the surveys have not been removing executed observations
all_scheduled = all_scheduled[np.where(all_scheduled >= self.conditions.mjd)]
self.conditions.scheduled_observations = all_scheduled
def _check_queue_mjd_only(self, mjd):
"""
Check if there are things in the queue that can be executed
using only MJD and not full conditions.
This is primarily used by sim_runner to reduce calls calculating
updated conditions when they are not needed.
"""
result = False
if len(self.queue) > 0:
if (IntRounded(mjd) < IntRounded(self.queue[0]["flush_by_mjd"])) | (
self.queue[0]["flush_by_mjd"] == 0
):
result = True
return result
[docs]
def request_observation(self, mjd=None, whole_queue=False):
"""
Ask the scheduler what it wants to observe next
Parameters
----------
mjd : `float`
The Modified Julian Date.
If None, it uses the MJD from the conditions from the
last conditions update.
whole_queue : `bool`
Return the entire list of observations in the queue (True),
or just a single observation (False). Default False
Returns
-------
observation : `list` [`~.scheduler.utils.ObservationArray`]
Returns ~.scheduler.utils.ObservationArray if
whole_queue is False
Returns None if the queue fails to fill
Returns list of ObservationArray if whole_queue is True
Notes
-----
Calling request_observation repeatedly, even without updating
the time or conditions, can return different requested
observations. This is because the internal queue is filled
when it becomes empty, and then subsequent calls to
request_observation will pop successive visits from this queue,
until the queue is empty or is flushed.
"""
if mjd is None:
mjd = self.conditions.mjd
if len(self.queue) == 0:
self._fill_queue()
if len(self.queue) == 0:
return None
else:
# If the queue has gone stale, flush and refill.
# Zero means no flush_by was set.
if (IntRounded(mjd) > IntRounded(self.queue[0]["flush_by_mjd"])) & (
self.queue[0]["flush_by_mjd"] != 0
):
self.flushed += len(self.queue)
self.flush_queue()
self._fill_queue()
if len(self.queue) == 0:
return None
if whole_queue:
result = deepcopy(self.queue)
self.flush_queue()
else:
result = self.queue.pop(0)
return result
def _fill_queue(self):
"""
Compute reward function for each survey and fill the observing queue
with the observations from the highest reward survey.
"""
# If we loaded the scheduler from a pickle, self.keep_rewards
# might not have been initialized, but we want it to succeed
# anyway. So, assign it to false if it isn't present.
try:
keep_rewards = self.keep_rewards
except AttributeError:
keep_rewards = False
if keep_rewards:
# Use perf_counter_ns to get the best time resolution to guarantee
# successive calls do occur within the same resolution element.
self.queue_fill_mjd_ns = np.int64(self.mjd_perf_counter_offset + time.perf_counter_ns())
self.queue_reward_df = self.make_reward_df(accum=False)
self.queue_reward_df = self.queue_reward_df.assign(
queue_start_mjd=float(self.conditions.mjd),
queue_fill_mjd_ns=np.int64(self.queue_fill_mjd_ns),
)
rewards = None
for ns, surveys in enumerate(self.survey_lists):
rewards = np.zeros(len(surveys))
for i, survey in enumerate(surveys):
# For each survey, find the highest reward value.
rewards[i] = np.nanmax(survey.calc_reward_function(self.conditions))
# If we have a tier with a good reward, break out of the loop
if np.nanmax(rewards) > -np.inf:
self.survey_index[0] = ns
break
if (np.nanmax(rewards) == -np.inf) | (np.isnan(np.nanmax(rewards))):
self.flush_queue()
else:
to_fix = np.where(np.isnan(rewards) == True)
rewards[to_fix] = -np.inf
# Take a min here, so the surveys will be executed in the order
# they are entered if there is a tie.
self.survey_index[1] = np.min(np.where(rewards == np.nanmax(rewards)))
# Survey returns ObservationArray
result = self.survey_lists[self.survey_index[0]][self.survey_index[1]].generate_observations(
self.conditions
)
# Tag with a unique target_id
result["target_id"] = np.arange(self.target_id_counter, self.target_id_counter + result.size)
self.target_id_counter += result.size
# Convert to a list for the queue
self.queue = result.tolist()
self.queue_filled = self.conditions.mjd
if len(self.queue) == 0:
self.log.warning(f"Failed to fill queue at time {self.conditions.mjd}")
[docs]
def get_basis_functions(self, survey_index=None, conditions=None):
"""Get the basis functions for a specific survey,
in provided conditions.
Parameters
----------
survey_index : `List` [`int`]
A list with two elements: the survey list and the element
within that survey list for which the basis function should be
retrieved. If ``None``, use the latest survey to make an addition
to the queue.
conditions : `rubin_scheduler.scheduler.features.conditions.Conditions`
The conditions for which to return the basis functions.
If ``None``, use the conditions associated with this scheduler.
Returns
-------
basis_funcs : `OrderedDict` ['str`,
`~.scheduler.basis_functions.basis_functions.Base_basis_function`]
A dictionary of the basis functions, where the keys are names for
the basis functions and the values are the functions themselves.
"""
if survey_index is None:
survey_index = self.survey_index
if conditions is None:
conditions = self.conditions
survey = self.survey_lists[survey_index[0]][survey_index[1]]
basis_funcs = OrderedDict()
if hasattr(survey, "basis_functions"):
for basis_func in survey.basis_functions:
sample_values = basis_func(conditions)
if hasattr(sample_values, "__len__"):
key = f"{basis_func.__class__.__name__} @{id(basis_func)}"
basis_funcs[key] = basis_func
return basis_funcs
[docs]
def get_healpix_maps(self, survey_index=None, conditions=None):
"""Get the healpix maps for a specific survey, in provided conditions.
Parameters
----------
survey_index : `List` [`int`], opt
A list with two elements: the survey list and the element within
that survey list for which the maps that should be retrieved.
If ``None``, use the latest survey which added to the queue.
conditions : `~.scheduler.features.conditions.Conditions`, opt
The conditions for the maps to be returned. If ``None``, use
the conditions associated with this sceduler. By default None.
Returns
-------
basis_funcs : `OrderedDict` ['str`, `numpy.ndarray`]
A dictionary of the maps, where the keys are names for the maps
and values are the numpy arrays as used by ``healpy``.
"""
if survey_index is None:
survey_index = self.survey_index
if conditions is None:
conditions = self.conditions
maps = OrderedDict()
for band in conditions.skybrightness.keys():
maps[f"{band}_sky"] = deepcopy(conditions.skybrightness[band])
maps[f"{band}_sky"][maps[f"{band}_sky"] < -1e30] = np.nan
basis_functions = self.get_basis_functions(survey_index, conditions)
for this_basis_func in basis_functions.values():
label = this_basis_func.label()
if label in maps:
label = f"{label} @{id(this_basis_func)}"
maps[label] = this_basis_func(conditions)
return maps
def __repr__(self):
if isinstance(self.pointing2hpindx, HpInLsstFov):
camera = "LSST"
elif isinstance(self.pointing2hpindx, HpInComcamFov):
camera = "comcam"
else:
camera = None
this_repr = f"""{self.__class__.__qualname__}(
surveys={repr(self.survey_lists)},
camera="{camera}",
nside={repr(self.nside)},
survey_index={repr(self.survey_index)},
log={repr(self.log)}
)"""
return this_repr
def __str__(self):
# If dependencies of to_markdown are not installed, fall back on repr
try:
pd.DataFrame().to_markdown()
except ImportError:
return repr(self)
if isinstance(self.pointing2hpindx, HpInLsstFov):
camera = "LSST"
elif isinstance(self.pointing2hpindx, HpInComcamFov):
camera = "comcam"
else:
camera = None
output = StringIO()
print(f"# {self.__class__.__name__} at {hex(id(self))}", file=output)
try:
last_chosen = str(self.survey_lists[self.survey_index[0]][self.survey_index[1]])
except TypeError:
last_chosen = "None"
misc = pd.Series(
{
"camera": camera,
"nside": self.nside,
"survey index": list(self.survey_index),
"Last chosen": last_chosen,
}
)
misc.name = "value"
print(misc.to_markdown(), file=output)
print("", file=output)
print("## Surveys", file=output)
if len(self.survey_lists) == 0:
print("Scheduler contains no surveys.", file=output)
for tier_index, tier_surveys in enumerate(self.survey_lists):
print(file=output)
print(f"### Survey list {tier_index}", file=output)
print(self.surveys_df(tier_index).to_markdown(), file=output)
print("", file=output)
if hasattr(self, "conditions"):
print(str(self.conditions), file=output)
else:
print("No conditions set", file=output)
print("", file=output)
print("## Queue", file=output)
if len(self.queue) > 0:
print(
pd.concat(pd.DataFrame(q) for q in self.queue)[
["ID", "flush_by_mjd", "RA", "dec", "filter", "exptime", "note"]
]
.set_index("ID")
.to_markdown(),
file=output,
)
else:
print("Queue is empty", file=output)
result = output.getvalue()
return result
def _repr_markdown_(self):
# This is used by jupyter
return str(self)
[docs]
def surveys_df(self, tier):
"""Create a pandas.DataFrame describing rewards from surveys.
Parameters
----------
conditions : `rubin_scheduler.scheduler.features.Conditions`
Conditions for which rewards are to be returned.
tier : `int`
The level of the list of survey lists for which to return values.
Returns
-------
reward_df : `pandas.DataFrame`
A table of surveys listing the rewards.
"""
surveys = []
survey_list = self.survey_lists[tier]
for survey_list_elem, survey in enumerate(survey_list):
if (self.survey_index[0] is None) or (tier > self.survey_index[0]):
# This survey reward was not been evaluated
reward = None
elif not hasattr(survey, "reward"):
reward = None
elif survey.reward is None:
reward = None
elif np.isscalar(survey.reward):
reward = survey.reward
elif np.count_nonzero(survey.reward > -np.inf) == 0:
# The entire survey is masked
reward = -np.inf
else:
reward = np.nanmax(survey.reward)
try:
chosen = (tier == self.survey_index[0]) and (survey_list_elem == self.survey_index[1])
except TypeError:
chosen = False
surveys.append({"survey": str(survey), "reward": reward, "chosen": chosen})
df = pd.DataFrame(surveys).set_index("survey")
return df
[docs]
def make_reward_df(self, conditions=None, accum=True):
"""Create a pandas.DataFrame describing rewards from contained surveys.
Parameters
----------
conditions : `rubin_scheduler.scheduler.features.Conditions`
Conditions for which rewards are to be returned
accum : `bool`
Include accumulated rewards (defaults to True)
Returns
-------
reward_df : `pandas.DataFrame`
A table of surveys listing the rewards.
"""
if conditions is None:
conditions = self.conditions
survey_dfs = []
survey_labels = self.survey_labels
for index0, survey_list in enumerate(self.survey_lists):
for index1, survey in enumerate(survey_list):
survey_df = survey.make_reward_df(conditions, accum=accum)
if len(survey_df) == 0:
continue
survey_df["list_index"] = index0
survey_df["survey_index"] = index1
survey_df["tier_label"] = f"tier {index0}"
survey_df["survey_label"] = survey_labels[index0][index1]
survey_df["survey_class"] = survey.__class__.__name__
survey_df["survey_reward"] = np.nanmax(survey.calc_reward_function(conditions))
survey_dfs.append(survey_df)
reward_df = pd.concat(survey_dfs).set_index(["list_index", "survey_index"])
return reward_df
@property
def survey_labels(self):
"""Provide a list of labels for surveys
Returns
-------
label_lists : `list`
A list, or list of lists, of labels corresponding to
self.survey_lists
"""
label_lists = []
encountered_labels = set()
duplicated_labels = set()
def process_survey(survey):
try:
basic_label = survey.survey_name
except AttributeError:
basic_label = ""
if len(basic_label) == 0:
basic_label = survey.__class__.__name__
if basic_label in encountered_labels:
duplicated_labels.add(basic_label)
else:
encountered_labels.add(basic_label)
return basic_label
for item in self.survey_lists:
if hasattr(item, "generate_observations"):
basic_label = process_survey(item)
label_lists.append()
else:
labels = []
for survey in item:
basic_label = process_survey(survey)
labels.append(basic_label)
label_lists.append(labels)
label_count = {}
def unique_label(survey):
try:
basic_label = survey.survey_name
except AttributeError:
basic_label = ""
if len(basic_label) == 0:
basic_label = survey.__class__.__name__
if basic_label not in duplicated_labels:
return basic_label
if basic_label not in label_count:
label_count[basic_label] = 0
label_count[basic_label] += 1
label = f"{basic_label} {label_count[basic_label]}"
return label
label_lists = []
for item in self.survey_lists:
if hasattr(item, "generate_observations"):
label = unique_label(item)
label_lists.append(label)
else:
labels = []
for survey in item:
label = unique_label(survey)
labels.append(label)
label_lists.append(labels)
return label_lists