Source code for rubin_scheduler.sim_archive.prenight

"""Tools for running the set of simulations used for a pre-night briefing."""

__all__ = [
    "AnomalousOverheadFunc",
    "run_prenights",
    "prenight_sim_cli",
]

import argparse
import bz2
import gzip
import io
import logging
import lzma
import os
import pickle
import typing
from datetime import datetime
from functools import partial
from tempfile import TemporaryFile
from typing import Callable, Optional, Sequence
from warnings import warn

import numpy as np
import numpy.typing as npt
from astropy.time import Time
from matplotlib.pylab import Generator

from rubin_scheduler.scheduler.example import example_scheduler
from rubin_scheduler.scheduler.model_observatory import ModelObservatory
from rubin_scheduler.scheduler.schedulers.core_scheduler import CoreScheduler
from rubin_scheduler.scheduler.utils import SchemaConverter
from rubin_scheduler.sim_archive.sim_archive import drive_sim
from rubin_scheduler.site_models import Almanac

from .make_snapshot import add_make_scheduler_snapshot_args, get_scheduler, save_scheduler

try:
    from rubin_sim.data import get_baseline  # type: ignore
except ModuleNotFoundError:
    get_baseline = partial(warn, "Cannot find default baseline because rubin_sim is not installed.")

DEFAULT_ARCHIVE_URI = "s3://rubin-scheduler-prenight/opsim/"


def _run_sim(
    mjd_start: float,
    archive_uri: str,
    scheduler_io: io.BufferedRandom,
    label: str,
    tags: Sequence[str] = tuple(),
    sim_length: float = 2,
    anomalous_overhead_func: Optional[Callable] = None,
    opsim_metadata: dict | None = None,
) -> None:
    logging.info(f"Running {label}.")

    scheduler_io.seek(0)
    scheduler = pickle.load(scheduler_io)

    observatory = ModelObservatory(
        nside=scheduler.nside,
        cloud_data="ideal",
        seeing_data="ideal",
        downtimes="ideal",
    )

    drive_sim(
        observatory=observatory,
        scheduler=scheduler,
        archive_uri=archive_uri,
        label=label,
        tags=tags,
        script=__file__,
        mjd_start=mjd_start,
        survey_length=sim_length,
        record_rewards=True,
        anomalous_overhead_func=anomalous_overhead_func,
        opsim_metadata=opsim_metadata,
    )


def _mjd_now() -> float:
    # Used instead of just Time.now().mjd to make type checker happy.
    mjd = Time.now().mjd
    assert isinstance(mjd, float)
    return mjd


def _iso8601_now() -> str:
    # Used instead of just Time.now().mjd to make type checker happy.
    now_iso = Time.now().iso
    assert isinstance(now_iso, str)
    return now_iso


def _create_scheduler_io(
    day_obs_mjd: float,
    scheduler_fname: Optional[str] = None,
    scheduler_instance: Optional[CoreScheduler] = None,
    opsim_db: str | None = None,
) -> io.BufferedRandom:
    if scheduler_instance is not None:
        scheduler = scheduler_instance
    elif scheduler_fname is None:
        sample_scheduler = example_scheduler()
        if not isinstance(sample_scheduler, CoreScheduler):
            raise TypeError()

        scheduler = sample_scheduler

    else:
        opener: typing.Callable = open

        if scheduler_fname.endswith(".bz2"):
            opener = bz2.open
        elif scheduler_fname.endswith(".xz"):
            opener = lzma.open
        elif scheduler_fname.endswith(".gz"):
            opener = gzip.open

        with opener(scheduler_fname, "rb") as sched_io:
            scheduler = pickle.load(sched_io)

    scheduler.keep_rewards = True

    if opsim_db is not None:
        last_preexisting_obs_mjd = day_obs_mjd + 0.5
        obs = SchemaConverter().opsim2obs(opsim_db)
        obs = obs[obs["mjd"] < last_preexisting_obs_mjd]
        if len(obs) > 0:
            scheduler.add_observations_array(obs)

    scheduler_io = TemporaryFile()
    pickle.dump(scheduler, scheduler_io)
    scheduler_io.seek(0)
    return scheduler_io


[docs] class AnomalousOverheadFunc: """Callable to return random overhead. Parameters ---------- seed : `int` Random number seed. slew_scale : `float` The scale for the scatter in the slew offest (seconds). visit_scale : `float`, optional The scale for the scatter in the visit overhead offset (seconds). Defaults to 0.0. slew_loc : `float`, optional The location of the scatter in the slew offest (seconds). Defaults to 0.0. visit_loc : `float`, optional The location of the scatter in the visit offset (seconds). Defaults to 0.0. """ def __init__( self, seed: int, slew_scale: float, visit_scale: float = 0.0, slew_loc: float = 0.0, visit_loc: float = 0.0, ) -> None: self.rng: Generator = np.random.default_rng(seed) self.visit_loc: float = visit_loc self.visit_scale: float = visit_scale self.slew_loc: float = slew_loc self.slew_scale: float = slew_scale
[docs] def __call__(self, visittime: float, slewtime: float) -> float: """Return a randomized offset for the visit overhead. Parameters ---------- visittime : `float` The visit time (seconds). slewtime : `float` The slew time (seconds). Returns ------- offset: `float` Random offset (seconds). """ slew_overhead: float = slewtime * self.rng.normal(self.slew_loc, self.slew_scale) # Slew might be faster that expected, but it will never take negative # time. if (slewtime + slew_overhead) < 0: slew_overhead = 0.0 visit_overhead: float = visittime * self.rng.normal(self.slew_loc, self.slew_scale) # There might be anomalous overhead that makes visits take longer, # but faster is unlikely. if visit_overhead < 0: visit_overhead = 0.0 return slew_overhead + visit_overhead
[docs] def run_prenights( day_obs_mjd: float, archive_uri: str, scheduler_file: Optional[str] = None, opsim_db: Optional[str] = None, minutes_delays: tuple[float, ...] = (0, 1, 10, 60, 240), anomalous_overhead_seeds: tuple[int, ...] = (101, 102, 103, 104, 105), sim_nights: int = 2, opsim_metadata: dict | None = None, ) -> None: """Run the set of scheduler simulations needed to prepare for a night. Parameters ---------- day_obs_mjd : `float` The starting MJD. archive_uri : `str` The URI of visits completed before this night. scheduler_file : `str`, optional File from which to load the scheduler. None defaults to the example scheduler in rubin_sim, if it is installed. The default is None. opsim_db : `str`, optional The file name of the visit database for visits preceeding the simulation. The default is None. minutes_delays : `tuple` of `float` Delayed starts to be simulated. anomalous_overhead_seeds: `tuple` of `int` Random number seeds to use for anomalous overhead runs. sim_nights: `int` Number of nights to simulate. Defaults to 2. opsim_metadata: `dict` Extra metadata for the archive """ exec_time: str = _iso8601_now() scheduler_io: io.BufferedRandom = _create_scheduler_io( day_obs_mjd, scheduler_fname=scheduler_file, opsim_db=opsim_db ) # Assign args common to all sims for this execution. run_sim = partial( _run_sim, archive_uri=archive_uri, scheduler_io=scheduler_io, opsim_metadata=opsim_metadata ) # Find the start of observing for the specified day_obs. # Almanac.get_sunset_info does not use day_obs, so just index # Almanac.sunsets for what we want directly. all_sun_n12_setting: npt.NDArray[np.float_] = Almanac().sunsets["sun_n12_setting"] before_first_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting < day_obs_mjd + 0.5 after_first_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting > day_obs_mjd + 1.5 on_first_day_obs: npt.NDArray[np.bool_] = ~(before_first_day_obs | after_first_day_obs) mjd_start: float = all_sun_n12_setting[on_first_day_obs].item() all_sun_n12_rising: npt.NDArray[np.float_] = Almanac().sunsets["sun_n12_rising"] before_last_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting < day_obs_mjd + sim_nights + 0.5 after_last_day_obs: npt.NDArray[np.bool_] = all_sun_n12_setting > day_obs_mjd + sim_nights + 1.5 on_last_day_obs: npt.NDArray[np.bool_] = ~(before_last_day_obs | after_last_day_obs) mjd_end: float = all_sun_n12_rising[on_last_day_obs].item() sim_length: float = mjd_end - mjd_start # Begin with an ideal pure model sim. completed_run_without_delay: bool = False if len(minutes_delays) == 0 or (np.array(minutes_delays) == 0).any(): run_sim( mjd_start, label=f"Nominal start and overhead, ideal conditions, run at {exec_time}", tags=["ideal", "nominal"], ) completed_run_without_delay = True # Delayed start for minutes_delay in minutes_delays: if completed_run_without_delay and minutes_delay == 0: # Did this already. continue delayed_mjd_start = mjd_start + minutes_delay / (24.0 * 60) sim_length = mjd_end - delayed_mjd_start run_sim( delayed_mjd_start, label=f"Start time delayed by {minutes_delay} minutes," + f" Nominal slew and visit overhead, ideal conditions, run at {exec_time}", tags=["ideal", f"delay_{minutes_delay}"], sim_length=sim_length, ) # Run a few different scatters of visit time anomalous_overhead_scale = 0.1 for anomalous_overhead_seed in anomalous_overhead_seeds: anomalous_overhead_func = AnomalousOverheadFunc(anomalous_overhead_seed, anomalous_overhead_scale) run_sim( mjd_start, label=f"Anomalous overhead {anomalous_overhead_seed, anomalous_overhead_scale}," + f" Nominal start, ideal conditions, run at {exec_time}", tags=["ideal", "anomalous_overhead"], anomalous_overhead_func=anomalous_overhead_func, sim_length=sim_length, )
def _parse_dayobs_to_mjd(dayobs: str | float) -> float: try: day_obs_mjd = Time(dayobs).mjd except ValueError: try: day_obs_mjd = Time(datetime.strptime(str(dayobs), "%Y%m%d")) except ValueError: day_obs_mjd = Time(dayobs, format="mjd") if not isinstance(day_obs_mjd, float): raise ValueError return day_obs_mjd def prenight_sim_cli(cli_args: list = []) -> None: parser = argparse.ArgumentParser(description="Run prenight simulations") default_time = Time(int(_mjd_now() - 0.5), format="mjd") parser.add_argument( "--dayobs", type=str, default=default_time.iso, help="The day_obs of the night to simulate.", ) parser.add_argument( "--archive", type=str, default=DEFAULT_ARCHIVE_URI, help="Archive in which to store simulation results.", ) parser.add_argument("--scheduler", type=str, default=None, help="pickle file of the scheduler to run.") # Only pass a default if we have an opsim baseline = get_baseline() if baseline is not None: parser.add_argument( "--opsim", default=baseline, type=str, help="Opsim database from which to load previous visits." ) else: parser.add_argument("--opsim", type=str, help="Opsim database from which to load previous visits.") add_make_scheduler_snapshot_args(parser) args = parser.parse_args() if len(cli_args) == 0 else parser.parse_args(cli_args) day_obs_mjd = _parse_dayobs_to_mjd(args.dayobs) archive_uri = args.archive opsim_db = None if args.opsim in ("", "None") else args.opsim scheduler_file = args.scheduler if args.repo is not None: if os.path.exists(scheduler_file): raise ValueError(f"File {scheduler_file} already exists!") scheduler: CoreScheduler = get_scheduler(args.repo, args.script, args.branch) save_scheduler(scheduler, scheduler_file) opsim_metadata = { "opsim_config_repository": args.repo, "opsim_config_script": args.script, "opsim_config_branch": args.branch, } else: opsim_metadata = None run_prenights( day_obs_mjd, archive_uri, scheduler_file, opsim_db, minutes_delays=(0, 1, 10, 60), anomalous_overhead_seeds=(101, 102), opsim_metadata=opsim_metadata, ) if __name__ == "__main__": prenight_sim_cli()