Source code for rubin_scheduler.scheduler.example.example_scheduler

__all__ = (
    "generate_baseline_coresched",
    "example_scheduler",
    "sched_argparser",
    "set_run_info",
    "run_sched",
)

import argparse
import os
import subprocess
import sys

import numpy as np
import numpy.typing as npt
from astropy.utils import iers

import rubin_scheduler
import rubin_scheduler.scheduler.detailers as detailers
from rubin_scheduler.scheduler import sim_runner
from rubin_scheduler.scheduler.model_observatory import ModelObservatory, tma_movement
from rubin_scheduler.scheduler.schedulers import CoreScheduler, SimpleBandSched
from rubin_scheduler.scheduler.targetofo import gen_all_events
from rubin_scheduler.scheduler.utils import (
    CurrentAreaMap,
    Footprint,
    ObservationArray,
    make_rolling_footprints,
)
from rubin_scheduler.site_models import Almanac
from rubin_scheduler.utils import DEFAULT_NSIDE, SURVEY_START_MJD

from .lsst_ddf_gen import gen_ddf_surveys
from .lsst_surveys import (
    CAMERA_ROT_LIMITS,
    EXPTIME,
    NEXP,
    U_EXPTIME,
    U_NEXP,
    gen_greedy_surveys,
    gen_long_gaps_survey,
    gen_template_surveys,
    generate_blobs,
    generate_short_blobs,
    generate_twilight_near_sun,
)
from .roman_surveys import gen_roman_off_season, gen_roman_on_season
from .too_surveys import gen_too_surveys

# So things don't fail on hyak
iers.conf.auto_download = False
# XXX--note this line probably shouldn't be in production
iers.conf.auto_max_age = None


[docs] def example_scheduler(mjd_start=SURVEY_START_MJD, **kwargs) -> CoreScheduler: """Renamed""" # In case folks are using old kwarg name if "survey_start_mjd" not in kwargs: kwargs["survey_start_mjd"] = mjd_start return generate_baseline_coresched(**kwargs)
[docs] def generate_baseline_coresched( nside: int = DEFAULT_NSIDE, survey_start_mjd: float = SURVEY_START_MJD, no_too: bool = False, ) -> CoreScheduler: """Provide an example baseline survey-strategy scheduler. Parameters ---------- nside : `int` Nside for the scheduler maps and basis functions. survey_start_mjd : `float` Start date for the survey (MJD). no_too : `bool` Turn off ToO simulation. Default False. Returns ------- scheduler : `rubin_scheduler.scheduler.CoreScheduler` A scheduler set up as the baseline survey strategy. """ parser = sched_argparser() args = parser.parse_args(args=[]) args.setup_only = True args.no_too = no_too args.dbroot = "example_" args.outDir = "." args.nside = nside args.survey_start_mjd = survey_start_mjd scheduler = gen_scheduler(args) return scheduler
[docs] def set_run_info(dbroot: str | None = None, file_end: str = "", out_dir: str = ".") -> tuple[str, dict]: """Gather versions of software used to record""" extra_info = {} exec_command = "" for arg in sys.argv: exec_command += " " + arg extra_info["exec command"] = exec_command try: extra_info["git hash"] = subprocess.check_output(["git", "rev-parse", "HEAD"]) except subprocess.CalledProcessError: extra_info["git hash"] = "Not in git repo" extra_info["file executed"] = os.path.realpath(__file__) try: rs_path = rubin_scheduler.__path__[0] hash_file = os.path.join(rs_path, "../", ".git/refs/heads/main") extra_info["rubin_scheduler git hash"] = subprocess.check_output(["cat", hash_file]) except subprocess.CalledProcessError: pass # Use the filename of the script to name the output database if dbroot is None: fileroot = os.path.basename(sys.argv[0]).replace(".py", "") + "_" else: fileroot = dbroot + "_" fileroot = os.path.join(out_dir, fileroot + file_end) return fileroot, extra_info
[docs] def run_sched( scheduler: CoreScheduler, survey_length: float = 365.25, nside: int = DEFAULT_NSIDE, filename: str | None = None, verbose: bool = False, extra_info: dict | None = None, illum_limit: float = 40.0, survey_start_mjd: float = 60796.0, event_table: npt.NDArray | None = None, sim_to_o=None, snapshot_dir: str | None = None, readtime: float = 3.07, band_changetime: float = 140.0, tma_performance: float = 40.0, ) -> tuple[ModelObservatory, CoreScheduler, ObservationArray]: """Run survey""" n_visit_limit = None fs = SimpleBandSched(illum_limit=illum_limit) observatory = ModelObservatory(nside=nside, mjd_start=survey_start_mjd, sim_to_o=sim_to_o) tma_kwargs = tma_movement(percent=tma_performance) observatory.setup_telescope(**tma_kwargs) observatory.setup_camera(band_changetime=band_changetime, readtime=readtime) observatory, scheduler, observations = sim_runner( observatory, scheduler, sim_duration=survey_length, filename=filename, delete_past=True, n_visit_limit=n_visit_limit, verbose=verbose, extra_info=extra_info, band_scheduler=fs, event_table=event_table, snapshot_dir=snapshot_dir, ) return observatory, scheduler, observations
def gen_scheduler( args: argparse.ArgumentParser, ) -> tuple[ModelObservatory, CoreScheduler, ObservationArray] | CoreScheduler: survey_length = args.survey_length # Days out_dir = args.out_dir verbose = args.verbose dbroot = args.dbroot nside = args.nside mjd_plus = args.mjd_plus snapshot_dir = args.snapshot_dir too = not args.no_too # Parameters that were previously command-line # arguments. max_dither = 0.2 # Degrees. For DDFs illum_limit = 40.0 # Percent. Lunar illumination used for band loading nslice = 2 # N slices for rolling rolling_scale = 0.9 # Strength of rolling rolling_uniform = True # Should we use the uniform rolling flag nights_off = 3 # For long gaps ei_night_pattern = 4 # select doing earth interior observation every 4 nights ei_bands = "riz" # Bands to use for earth interior observations. ei_repeat = 4 # Number of times to repeat earth interior observations ei_am = 2.5 # Earth interior airmass limit ei_elong_req = 45.0 # Solar elongation required for inner solar system ei_area_req = 0.0 # Sky area required before attempting inner solar system per_night = False # Dither DDF per night, if False, dither all observations camera_ddf_rot_limit = 75.0 # degrees repeat_weight = 0 # Maybe set to -1? # Be sure to also update and regenerate DDF grid save file # if changing survey_start_mjd survey_start_mjd = SURVEY_START_MJD + mjd_plus fileroot, extra_info = set_run_info( dbroot=dbroot, file_end="v5.1.0_", out_dir=out_dir, ) pattern_dict = { 1: [True], 2: [True, False], 3: [True, False, False], 4: [True, False, False, False], # 4 on, 4 off 5: [True, True, True, True, False, False, False, False], # 3 on 4 off 6: [True, True, True, False, False, False, False], 7: [True, True, False, False, False, False], } ei_night_pattern = pattern_dict[ei_night_pattern] reverse_ei_night_pattern = [not val for val in ei_night_pattern] sky = CurrentAreaMap(nside=nside) footprints_hp_array, labels = sky.return_maps() wfd_indx = np.where((labels == "lowdust") | (labels == "virgo"))[0] wfd_footprint = footprints_hp_array["r"] * 0 wfd_footprint[wfd_indx] = 1 footprints_hp = {} for key in footprints_hp_array.dtype.names: footprints_hp[key] = footprints_hp_array[key] footprint_mask = footprints_hp["r"] * 0 footprint_mask[np.where(footprints_hp["r"] > 0)] = 1 # Use the Almanac to find the position of the sun at the start of survey almanac = Almanac(mjd_start=survey_start_mjd) sun_moon_info = almanac.get_sun_moon_positions(survey_start_mjd) sun_ra_start = sun_moon_info["sun_RA"].copy() footprints = make_rolling_footprints( fp_hp=footprints_hp, mjd_start=survey_start_mjd, sun_ra_start=sun_ra_start, nslice=nslice, scale=rolling_scale, nside=nside, wfd_indx=wfd_indx, order_roll=1, n_cycles=3, uniform=rolling_uniform, ) gaps_night_pattern = [True] + [False] * nights_off long_gaps = gen_long_gaps_survey( nside=nside, footprints=footprints, night_pattern=gaps_night_pattern, u_exptime=U_EXPTIME, nexp=NEXP, ) # Set up the DDF surveys to dither u_detailer = detailers.BandNexp(bandname="u", nexp=U_NEXP, exptime=U_EXPTIME) dither_detailer = detailers.DitherDetailer(per_night=per_night, max_dither=max_dither) dither_detailer = detailers.SplitDetailer(dither_detailer, detailers.EuclidDitherDetailer()) details = [ detailers.CameraRotDetailer(min_rot=-camera_ddf_rot_limit, max_rot=camera_ddf_rot_limit), dither_detailer, u_detailer, detailers.BandSortDetailer(), detailers.LabelRegionsAndDDFs(), detailers.TruncatePreTwiDetailer(), ] ddfs = gen_ddf_surveys( detailer_list=details, nside=nside, ) greedy = gen_greedy_surveys(nside, nexp=NEXP, footprints=footprints) neo = generate_twilight_near_sun( nside=nside, night_pattern=ei_night_pattern, bands=ei_bands, n_repeat=ei_repeat, footprint_mask=footprint_mask, max_airmass=ei_am, max_elong=ei_elong_req, min_area=ei_area_req, ) blobs = generate_blobs( nside=nside, nexp=NEXP, footprints=footprints, survey_start=survey_start_mjd, u_exptime=U_EXPTIME, ) twi_blobs = generate_short_blobs( nside=nside, nexp=NEXP, footprints=footprints, repeat_weight=repeat_weight, night_pattern=reverse_ei_night_pattern, ) roman_surveys = [ gen_roman_on_season(nexp=NEXP, exptime=EXPTIME), gen_roman_off_season(nexp=NEXP, exptime=EXPTIME), ] # Make the templates footprint template_fp = Footprint(SURVEY_START_MJD, sun_ra_start, nside=nside) for key in footprints_hp_array.dtype.names: indx = np.where(footprints_hp_array[key] > 0)[0] tmp_fp = footprints_hp_array[key] * 0 + np.nan tmp_fp[indx] = 1 template_fp.set_footprint(key, tmp_fp) template_surveys = gen_template_surveys( template_fp, nside=nside, nexp=NEXP, u_exptime=U_EXPTIME, u_nexp=U_NEXP, n_obs_template={"u": 4, "g": 4, "r": 4, "i": 4, "z": 4, "y": 4}, ) if too: too_scale = 1.0 sim_ToOs, event_table = gen_all_events(scale=too_scale, nside=nside) camera_rot_limits = CAMERA_ROT_LIMITS detailer_list = [] detailer_list.append( detailers.CameraRotDetailer(min_rot=np.min(camera_rot_limits), max_rot=np.max(camera_rot_limits)) ) detailer_list.append(detailers.LabelRegionsAndDDFs()) detailer_list.append(detailers.BandSubstituteDetailer(band_original="z", band_replacement="y")) # Let's make a footprint to follow up ToO events too_footprint = footprints_hp["r"] * 0 + np.nan too_footprint[np.where(footprints_hp["r"] > 0)[0]] = 1.0 toos = gen_too_surveys( nside=nside, detailer_list=detailer_list, too_footprint=too_footprint, n_snaps=NEXP, ) surveys = [ toos, roman_surveys, ddfs, template_surveys, long_gaps, blobs, twi_blobs, neo, greedy, ] else: surveys = [ roman_surveys, ddfs, template_surveys, long_gaps, blobs, twi_blobs, neo, greedy, ] sim_ToOs = None event_table = None fileroot = fileroot.replace("baseline", "no_too") scheduler = CoreScheduler(surveys, nside=nside, survey_start_mjd=survey_start_mjd) if args.setup_only: return scheduler else: years = np.round(survey_length / 365.25) observatory, scheduler, observations = run_sched( scheduler, survey_length=survey_length, verbose=verbose, filename=os.path.join(fileroot + "%iyrs.db" % years), extra_info=extra_info, nside=nside, illum_limit=illum_limit, survey_start_mjd=survey_start_mjd, event_table=event_table, sim_to_o=sim_ToOs, snapshot_dir=snapshot_dir, ) return observatory, scheduler, observations def sched_argparser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() parser.add_argument("--verbose", dest="verbose", action="store_true", help="Print more output") parser.set_defaults(verbose=False) parser.add_argument("--survey_length", type=float, default=365.25 * 10, help="Survey length in days") parser.add_argument("--out_dir", type=str, default="", help="Output directory") parser.add_argument("--dbroot", type=str, help="Database root") parser.add_argument( "--setup_only", dest="setup_only", default=False, action="store_true", help="Only construct scheduler, do not simulate", ) parser.add_argument( "--nside", type=int, default=DEFAULT_NSIDE, help="Nside should be set to default (32) except for tests.", ) parser.add_argument( "--mjd_plus", type=float, default=0, help="number of days to add to the mjd start", ) parser.add_argument( "--snapshot_dir", type=str, default="", help="Directory for scheduler snapshots.", ) parser.set_defaults(split_long=False) parser.add_argument("--no_too", dest="no_too", action="store_true") parser.set_defaults(no_too=False) return parser if __name__ == "__main__": parser = sched_argparser() args = parser.parse_args() gen_scheduler(args)