Source code for rubin_scheduler.scheduler.surveys.base_survey

__all__ = ("BaseSurvey", "BaseMarkovSurvey")

from copy import copy, deepcopy
from functools import cached_property

import healpy as hp
import numpy as np
import pandas as pd

from rubin_scheduler.scheduler.detailers import TrackingInfoDetailer, ZeroRotDetailer
from rubin_scheduler.scheduler.utils import (
    HpInLsstFov,
    ObservationArray,
    comcam_tessellate,
    thetaphi2xyz,
    xyz2thetaphi,
)
from rubin_scheduler.site_models import _read_fields
from rubin_scheduler.utils import DEFAULT_NSIDE, _build_tree, _hpid2_ra_dec, _xyz_from_ra_dec


[docs] class BaseSurvey: """A baseclass for survey objects. Parameters ---------- basis_functions : list List of basis_function objects extra_features : list XXX--should this be a dict for clarity? List of any additional features the survey may want to use e.g., for computing final dither positions. extra_basis_functions : dict of rubin_scheduler.scheduler.basis_function Extra basis function objects. Typically not passed in, but et in the __init__. ignore_obs : list of str (None) If an incoming observation has this string in the note, ignore it. Handy if one wants to ignore DD fields or observations requested by self. Take note, if a survey is called 'mysurvey23', setting ignore_obs to 'mysurvey2' will ignore it because 'mysurvey2' is a substring of 'mysurvey23'. detailers : list of rubin_scheduler.scheduler.detailers objects The detailers to apply to the list of observations. scheduled_obs : np.array An array of MJD values for when observations should execute. target_name : `str` A target name label. Will be added to a final detailer, so should override any target_name set elsewhere. Default None. science_program : `str` A science program label. Will be added to a final detailer, so should override any science_program set elsewhere. Default None. observation_reason : `str` An observation reason label. Will be added to a final detailer, so should override any observation_reason set elsewhere. Default None. """ def __init__( self, basis_functions, extra_features=None, extra_basis_functions=None, ignore_obs=None, survey_name=None, scheduler_note=None, nside=DEFAULT_NSIDE, detailers=None, scheduled_obs=None, target_name=None, science_program=None, observation_reason=None, ): if ignore_obs is None: ignore_obs = [] if isinstance(ignore_obs, str): ignore_obs = [ignore_obs] self.nside = nside if survey_name is None: self._generate_survey_name() else: self.survey_name = survey_name self.scheduler_note = scheduler_note if self.scheduler_note is None: self.scheduler_note = survey_name self.ignore_obs = ignore_obs self.reward = None self.survey_index = None self.basis_functions = basis_functions if extra_features is None: self.extra_features = {} else: self.extra_features = extra_features if extra_basis_functions is None: self.extra_basis_functions = {} else: self.extra_basis_functions = extra_basis_functions self.reward_checked = False # Attribute to track if the reward function is up-to-date. self.reward_checked = False # If there's no detailers, add one to set rotation to near zero if detailers is None: self.detailers = [ZeroRotDetailer(nside=nside)] else: # If there are detailers -- copy them, because we're # about to change the list (and users can be reusing this list). self.detailers = deepcopy(detailers) # Scheduled observations self.scheduled_obs = scheduled_obs # Strings to pass onto observations if (target_name is not None) | (science_program is not None) | (observation_reason is not None): self.detailers.append( TrackingInfoDetailer( target_name=target_name, science_program=science_program, observation_reason=observation_reason, ) ) @cached_property def roi_hpid(self): return None @cached_property def roi_mask(self): if self.roi_hpid is None: mask = np.ones(hp.nside2npix(self.nside), dtype=bool) else: mask = np.zeros(hp.nside2npix(self.nside), dtype=bool) mask[self.roi_hpid] = True return mask def _generate_survey_name(self): self.survey_name = "" def get_scheduled_obs(self): return self.scheduled_obs
[docs] def add_observations_array(self, observations_array_in, observations_hpid_in): """Add an array of observations rather than one at a time Parameters ---------- observations_array_in : ObservationArray An array of completed observations, rubin_scheduler.scheduler.utils.ObservationArray observations_hpid_in : np.array Same as observations_array_in, but larger and with an additional column for HEALpix id. Each observation is listed mulitple times, once for every HEALpix it overlaps. """ # Just to be sure things are sorted observations_array_in.sort(order="mjd") observations_hpid_in.sort(order="mjd") # Copy so we don't prune things for other survey objects observations_array = observations_array_in.copy() observations_hpid = observations_hpid_in.copy() for ig in self.ignore_obs: not_ignore = np.where(np.char.find(observations_array["scheduler_note"], ig) == -1)[0] observations_array = observations_array[not_ignore] not_ignore = np.where(np.char.find(observations_hpid["scheduler_note"], ig) == -1)[0] observations_hpid = observations_hpid[not_ignore] for feature in self.extra_features: self.extra_features[feature].add_observations_array(observations_array, observations_hpid) for bf in self.extra_basis_functions: self.extra_basis_functions[bf].add_observations_array(observations_array, observations_hpid) for bf in self.basis_functions: bf.add_observations_array(observations_array, observations_hpid) for detailer in self.detailers: detailer.add_observations_array(observations_array, observations_hpid) self.reward_checked = False
def add_observation(self, observation, **kwargs): # Check each posible ignore string checks = [io not in str(observation["scheduler_note"]) for io in self.ignore_obs] # ugh, I think here I have to assume observation is an # array and not a dict. if all(checks): for feature in self.extra_features: self.extra_features[feature].add_observation(observation, **kwargs) for bf in self.extra_basis_functions: self.extra_basis_functions[bf].add_observation(observation, **kwargs) for bf in self.basis_functions: bf.add_observation(observation, **kwargs) for detailer in self.detailers: detailer.add_observation(observation, **kwargs) self.reward_checked = False def _check_feasibility(self, conditions): """ Check if the survey is feasible in the current conditions Returns ------- result : `bool` """ result = True for bf in self.basis_functions: result = bf.check_feasibility(conditions) if not result: return result return result
[docs] def calc_reward_function(self, conditions): """ Parameters ---------- conditions : rubin_scheduler.scheduler.features.Conditions object Returns ------- reward : float (or array) """ if self._check_feasibility(conditions): self.reward = 0 else: # If we don't pass feasability self.reward = -np.inf self.reward_checked = True return self.reward
[docs] def generate_observations_rough(self, conditions): """ Returns ------- ObservationArray """ # If the reward function hasn't been updated with the # latest info, calculate it if not self.reward_checked: self.reward = self.calc_reward_function(conditions) obs = ObservationArray() return obs
def generate_observations(self, conditions): observations = self.generate_observations_rough(conditions) if np.size(observations) > 0: for detailer in self.detailers: observations = detailer(observations, conditions) return observations def viz_config(self): # This primarily lives in schedview but could be somewhat added here pass def __repr__(self): try: repr = f"<{self.__class__.__name__} survey_name='{self.survey_name}' at {hex(id(self))}>" except AttributeError: repr = f"<{self.__class__.__name__} at {hex(id(self))}>" return repr def _reward_to_scalars(self, reward): if np.isscalar(reward): scalar_reward = reward if np.isnan(reward) or reward == -np.inf: unmasked_area = 0 scalar_reward = -np.inf else: try: pix_area = hp.nside2pixarea(self.nside, degrees=True) unmasked_area = pix_area * np.count_nonzero(self.roi_mask) except AttributeError: unmasked_area = 4 * np.pi * (180 / np.pi) ** 2 else: reward_in_roi = np.where(self.roi_mask, reward, -np.inf) pix_area = hp.nside2pixarea(self.nside, degrees=True) unmasked_area = pix_area * np.count_nonzero(reward_in_roi > -np.inf) if unmasked_area == 0: scalar_reward = -np.inf else: scalar_reward = np.nanmax(reward_in_roi) return scalar_reward, unmasked_area
[docs] def make_reward_df(self, conditions, accum=True): """Create a pandas.DataFrame describing the reward from the survey. Parameters ---------- conditions : `rubin_scheduler.scheduler.features.Conditions` Conditions for which rewards are to be returned accum : `bool` Include accumulated reward (more compute intensive) Defaults to True Returns ------- reward_df : `pandas.DataFrame` A table of surveys listing the rewards. """ feasibility = [] max_rewards = [] basis_areas = [] accum_rewards = [] accum_areas = [] bf_label = [] bf_class = [] basis_functions = [] basis_weights = [] try: full_basis_weights = self.basis_weights except AttributeError: full_basis_weights = [1.0 for df in self.basis_functions] short_labels = self.bf_short_labels _, scalar_area = self._reward_to_scalars(1) for weight, basis_function in zip(full_basis_weights, self.basis_functions): bf_label.append(short_labels[basis_function.label()]) bf_class.append(basis_function.__class__.__name__) bf_reward = basis_function(conditions) max_reward, basis_area = self._reward_to_scalars(bf_reward) if basis_area == 0: this_feasibility = False else: this_feasibility = np.array(basis_function.check_feasibility(conditions)).any() feasibility.append(this_feasibility) max_rewards.append(max_reward) basis_areas.append(basis_area) if accum: basis_functions.append(basis_function) basis_weights.append(weight) test_survey = deepcopy(self) test_survey.basis_functions = basis_functions test_survey.basis_weights = basis_weights this_accum_reward = test_survey.calc_reward_function(conditions) accum_reward, accum_area = self._reward_to_scalars(this_accum_reward) accum_rewards.append(accum_reward) accum_areas.append(accum_area) reward_data = { "basis_function": bf_label, "basis_function_class": bf_class, "feasible": feasibility, "max_basis_reward": max_rewards, "basis_area": basis_areas, "basis_weight": full_basis_weights, } if accum: reward_data["max_accum_reward"] = accum_rewards reward_data["accum_area"] = accum_areas reward_df = pd.DataFrame(reward_data) return reward_df
[docs] def reward_changes(self, conditions): """List the rewards for each basis function used by the survey. Parameters ---------- conditions : `rubin_scheduler.scheduler.features.Conditions` Conditions for which rewards are to be returned Returns ------- rewards : `list` A list of tuples, each with a basis function name and the maximum reward returned by that basis function for the provided conditions. """ reward_values = [] basis_functions = [] basis_weights = [] try: full_basis_weights = self.basis_weights except AttributeError: full_basis_weights = [1 for bf in self.basis_functions] for weight, basis_function in zip(full_basis_weights, self.basis_functions): test_survey = deepcopy(self) basis_functions.append(basis_function) test_survey.basis_functions = basis_functions basis_weights.append(weight) test_survey.basis_weights = basis_weights try: reward_values.append(np.nanmax(test_survey.calc_reward_function(conditions))) except IndexError: reward_values.append(None) bf_names = [bf.label() for bf in self.basis_functions] return list(zip(bf_names, reward_values))
@property def bf_short_labels(self): try: long_labels = [bf.label() for bf in self.basis_functions] except AttributeError: return [] label_bases = [label.split(" @")[0] for label in long_labels] duplicated_labels = set([label for label in label_bases if label_bases.count(label) > 1]) short_labels = [] label_count = {k: 0 for k in duplicated_labels} for label_base in label_bases: if label_base in duplicated_labels: label_count[label_base] += 1 short_labels.append(f"{label_base} {label_count[label_base]}") else: short_labels.append(label_base) label_map = dict(zip(long_labels, short_labels)) return label_map
def rotx(theta, x, y, z): """rotate the x,y,z points theta radians about x axis""" sin_t = np.sin(theta) cos_t = np.cos(theta) xp = x yp = y * cos_t + z * sin_t zp = -y * sin_t + z * cos_t return xp, yp, zp
[docs] class BaseMarkovSurvey(BaseSurvey): """A Markov Decision Function survey object. Uses Basis functions to compute a final reward function and decide what to observe based on the reward. Includes methods for dithering and defaults to dithering nightly. Parameters ---------- basis_function : list of rubin_scheduler.scheduler.basis_function basis_weights : list of float Must be same length as basis_function seed : hashable Random number seed, used for randomly orienting sky tessellation. camera : str ('LSST') Should be 'LSST' or 'comcam' fields : np.array (None) An array of field positions. Should be numpy array with columns of "RA" and "dec" in radians. If none, site_models.read_fields or utils.comcam_tessellate is used to read field positions. area_required : float (None) The valid area that should be present in the reward function (square degrees). npositions : int (7305) The number of dither positions to pre-compute. Defaults to 7305 (so good for 20 years) """ def __init__( self, basis_functions, basis_weights, extra_features=None, smoothing_kernel=None, ignore_obs=None, survey_name=None, scheduler_note=None, nside=DEFAULT_NSIDE, seed=42, dither=True, detailers=None, camera="LSST", fields=None, area_required=None, npositions=7305, target_name=None, science_program=None, observation_reason=None, ): super(BaseMarkovSurvey, self).__init__( basis_functions=basis_functions, extra_features=extra_features, ignore_obs=ignore_obs, survey_name=survey_name, scheduler_note=scheduler_note, nside=nside, detailers=detailers, target_name=target_name, science_program=science_program, observation_reason=observation_reason, ) self.basis_weights = basis_weights # Check that weights and basis functions are same length if len(basis_functions) != np.size(basis_weights): raise ValueError("basis_functions and basis_weights must be same length.") self.camera = camera # Load the OpSim field tesselation and map healpix to fields if fields is None: if self.camera == "LSST": ra, dec = _read_fields() self.fields_init = np.empty(ra.size, dtype=list(zip(["RA", "dec"], [float, float]))) self.fields_init["RA"] = ra self.fields_init["dec"] = dec elif self.camera == "comcam": self.fields_init = comcam_tessellate() else: ValueError('camera %s unknown, should be "LSST" or "comcam"' % camera) else: self.fields_init = fields self.fields = self.fields_init.copy() self.hp2fields = np.array([]) self._hp2fieldsetup(self.fields["RA"], self.fields["dec"]) if smoothing_kernel is not None: self.smoothing_kernel = np.radians(smoothing_kernel) else: self.smoothing_kernel = None if area_required is None: self.area_required = area_required else: self.area_required = area_required * (np.pi / 180.0) ** 2 # To steradians # Start tracking the night self.night = -1 self.dither = dither # Generate and store rotation positions to use. # This way, if different survey objects are seeded the same, # they will use the same dither positions each night rng = np.random.default_rng(seed) self.lon = rng.random(npositions) * np.pi * 2 # Make sure latitude points spread correctly # http://mathworld.wolfram.com/SpherePointPicking.html self.lat = np.arccos(2.0 * rng.random(npositions) - 1.0) self.lon2 = rng.random(npositions) * np.pi * 2 def _check_feasibility(self, conditions): """ Check if the survey is feasable in the current conditions """ for bf in self.basis_functions: result = bf.check_feasibility(conditions) if not result: return result if self.area_required is not None: reward = self.calc_reward_function(conditions) good_pix = np.where(np.isfinite(reward) == True)[0] area = hp.nside2pixarea(self.nside) * np.size(good_pix) if area < self.area_required: return False return result def _hp2fieldsetup(self, ra, dec): """Map each healpixel to nearest field. This will only work if healpix resolution is higher than field resolution. Parameters ---------- ra : `float` The RA of the possible pointings (radians) dec : `float` The decs of the possible pointings (radians) """ self.hp2fields = np.zeros(hp.nside2npix(self.nside), dtype=int) if self.camera == "LSST": pointing2hpindx = HpInLsstFov(nside=self.nside) for i in range(len(ra)): hpindx = pointing2hpindx(ra[i], dec[i], rotSkyPos=0.0) self.hp2fields[hpindx] = i elif self.camera == "comcam": # Let's just map each healpix to the closest field location tree = _build_tree(ra, dec) hp_ra, hp_dec = _hpid2_ra_dec(self.nside, np.arange(hp.nside2npix(self.nside))) x, y, z = _xyz_from_ra_dec(hp_ra, hp_dec) dist, ind = tree.query(np.vstack([x, y, z]).T, k=1) self.hp2fields = ind def _spin_fields(self, conditions, lon=None, lat=None, lon2=None): """Spin the field tessellation to generate a random orientation The default field tesselation is rotated randomly in longitude, and then the pole is rotated to a random point on the sphere. Parameters ---------- lon : float (None) The amount to initially rotate in longitude (radians). Will use a random value between 0 and 2 pi if None (default). lat : float (None) The amount to rotate in latitude (radians). lon2 : float (None) The amount to rotate the pole in longitude (radians). """ if lon is None: lon = self.lon[conditions.night] if lat is None: lat = self.lat[conditions.night] if lon2 is None: lon2 = self.lon2[conditions.night] # rotate longitude ra = (self.fields_init["RA"] + lon) % (2.0 * np.pi) dec = copy(self.fields_init["dec"]) # Now to rotate ra and dec about the x-axis x, y, z = thetaphi2xyz(ra, dec + np.pi / 2.0) xp, yp, zp = rotx(lat, x, y, z) theta, phi = xyz2thetaphi(xp, yp, zp) dec = phi - np.pi / 2 ra = theta + np.pi # One more RA rotation ra = (ra + lon2) % (2.0 * np.pi) self.fields["RA"] = ra self.fields["dec"] = dec # Rebuild the kdtree with the new positions # XXX-may be doing some ra,dec to conversions xyz more # than needed. self._hp2fieldsetup(ra, dec)
[docs] def smooth_reward(self): """If we want to smooth the reward function.""" if hp.isnpixok(self.reward.size): # Need to swap NaNs to hp.UNSEEN so smoothing doesn't # spread mask reward_temp = copy(self.reward) mask = np.isnan(reward_temp) reward_temp[mask] = hp.UNSEEN self.reward_smooth = hp.sphtfunc.smoothing(reward_temp, fwhm=self.smoothing_kernel, verbose=False) self.reward_smooth[mask] = np.nan self.reward = self.reward_smooth
[docs] def calc_reward_function(self, conditions): self.reward_checked = True if self._check_feasibility(conditions): self.reward = 0 indx = np.arange(hp.nside2npix(self.nside)) for bf, weight in zip(self.basis_functions, self.basis_weights): basis_value = bf(conditions, indx=indx) self.reward += basis_value * weight else: # If not feasable, negative infinity reward self.reward = -np.inf return self.reward if self.smoothing_kernel is not None: self.smooth_reward() if self.area_required is not None: good_area = np.where(np.abs(self.reward) >= 0)[0].size * hp.nside2pixarea(self.nside) if good_area < self.area_required: self.reward = -np.inf return self.reward
[docs] def generate_observations_rough(self, conditions): self.reward = self.calc_reward_function(conditions) # Check if we need to spin the tesselation if self.dither & (conditions.night != self.night): self._spin_fields(conditions) self.night = copy(conditions.night) # XXX Use self.reward to decide what to observe. return None