Source code for rubin_scheduler.scheduler.example.simple_examples

__all__ = (
    "get_ideal_model_observatory",
    "update_model_observatory_sunset",
    "standard_masks",
    "simple_rewards",
    "simple_pairs_survey",
    "simple_greedy_survey",
    "simple_rewards_field_survey",
    "simple_field_survey",
)


import numpy as np
from astropy.time import Time

import rubin_scheduler.scheduler.basis_functions as basis_functions
import rubin_scheduler.scheduler.detailers as detailers
import rubin_scheduler.scheduler.features as features
from rubin_scheduler.scheduler.model_observatory import (
    KinemModel,
    ModelObservatory,
    rotator_movement,
    tma_movement,
)
from rubin_scheduler.scheduler.schedulers import FilterSwapScheduler
from rubin_scheduler.scheduler.surveys import BlobSurvey, FieldSurvey, GreedySurvey
from rubin_scheduler.scheduler.utils import Footprint, get_current_footprint
from rubin_scheduler.site_models import Almanac, ConstantSeeingData, ConstantWindData
from rubin_scheduler.utils import DEFAULT_NSIDE, SURVEY_START_MJD


[docs] def get_ideal_model_observatory( nside: int = DEFAULT_NSIDE, dayobs: str = "2024-09-09", fwhm_500: float = 1.6, wind_speed: float = 5.0, wind_direction: float = 340, tma_percent: float = 70, rotator_percent: float = 100, survey_start: Time = Time("2024-09-09T12:00:00", format="isot", scale="utc").mjd, ) -> ModelObservatory: """Set up a model observatory with constant seeing and wind speed, running on 'dayobs'. Track a constant 'survey_start' over different 'dayobs' so that the 'night' value in the simulation outputs is consistent, and any basis functions which might use 'season' values from the conditions have appropriate values. Parameters ---------- nside : `int` The nside for the model observatory. dayobs : `str` DAYOBS formatted str (YYYY-MM-DD) for the evening to start up the observatory. fwhm_500 : `float`, optional The value to set for atmospheric component of seeing, constant seeing throughout the night (arcseconds). Ad-hoc value for start of comcam on-sky operations about 2.0". wind_speed : `float`, optional Set a (constant) wind speed for the night, (m/s). Default of 5.0 is minimal but noticeable. wind_direction : `float`, optional Set a (constant) wind direction for the night (deg). Default of 340 is midrange between typical wind directions for the site (see SITCOMTN-126). tma_percent : `float`, optional Set a percent of full-performance for the telescope TMA (0-100). Value of 10(%) is likely for start of comcam on-sky SV surveys. rotator_percent : `float`, optional Set a percent of full-performance for the rotator. Default of 100% is likely for the start of comcam on-sky SV surveys. survey_start : `float`, optional MJD of the day of the survey start of operations. This should be kept constant for a given set of simulations, so that the "night" value in the output is consistent. For surveys which use basis functions depending on season, this is also important to be constant. Returns ------- observatory : `~.scheduler.model_observatory.ModelObservatory` A ModelObservatory set up to start operations in the evening of DAYOBS. Notes ----- The time for the model observatory will be advanced to the time of `sunset_start_key` (default -12 degree sunset) in the model observatory. The filters may not be correct however; use `update_model_observatory_sunset` to get filters in place. """ # Set up a fresh model observatory mjd_now = Time(f"{dayobs}T12:00:00", format="isot", scale="utc").mjd kinematic_model = KinemModel(mjd0=mjd_now) rot = rotator_movement(rotator_percent) kinematic_model.setup_camera(**rot) tma = tma_movement(tma_percent) kinematic_model.setup_telescope(**tma) # Some weather telemetry that might be useful seeing_data = ConstantSeeingData(fwhm_500=fwhm_500) wind_data = ConstantWindData(wind_direction=wind_direction, wind_speed=wind_speed) # Set up the model observatory observatory = ModelObservatory( nside=nside, mjd=mjd_now, mjd_start=survey_start, kinem_model=kinematic_model, # Modified kinematics cloud_data="ideal", # No clouds seeing_data=seeing_data, # Modified seeing wind_data=wind_data, # Add some wind downtimes="ideal", # No downtime lax_dome=True, # dome crawl? init_load_length=1, # size of skybrightness files to load first ) return observatory
[docs] def update_model_observatory_sunset( observatory: ModelObservatory, filter_scheduler: FilterSwapScheduler, twilight: int | float = -12 ) -> ModelObservatory: """Ensure correct filters are in place according to the filter_scheduler. Parameters ---------- observatory : `~.scheduler.model_observatory.ModelObservatory` The ModelObservatory simulating the observatory. filter_scheduler : `~.scheduler.schedulers.FilterScheduler` The filter scheduler providing appropriate information on the filters that should be in place on the current observatory day. twilight : `int` or `float` If twilight is -12 or -18, the Almanac -12 or -18 degree twilight times are used to set the current observatory time. If any other value is provided, it is assumed to be a specific MJD to start operating the observatory. Filter choices are based on the time after advancing to twilight. Returns ------- observatory : `~.scheduler.model_observatory.ModelObservatory` The ModelObservatory simulating the observatory, updated to the time of 'twilight' and with mounted_filters matching the filters chosen by the filter_scheduler for the current time at 'twilight'. """ # Move to *next* possible sunset if twilight in (-12, -18): time_key = f"sun_n{twilight*-1 :0d}_setting" # Use the observatory almanac to find the exact time to forward to alm_indx = np.searchsorted(observatory.almanac.sunsets[time_key], observatory.mjd, side="right") - 1 new_mjd = observatory.almanac.sunsets[time_key][alm_indx] if new_mjd < observatory.mjd: alm_indx += 1 new_mjd = observatory.almanac.sunsets[time_key][alm_indx] observatory.mjd = new_mjd else: # Assume twilight was a particular (MJD) time observatory.mjd = twilight # Make sure correct filters are mounted conditions = observatory.return_conditions() filters_needed = filter_scheduler(conditions) observatory.observatory.mount_filters(filters_needed) return observatory
[docs] def standard_masks( nside: int, moon_distance: float = 30.0, wind_speed_maximum: float = 20.0, min_alt: float = 20, max_alt: float = 86.5, min_az: float = 0, max_az: float = 360, shadow_minutes: float = 30, ) -> list[basis_functions.BaseBasisFunction]: """A set of standard mask functions. Avoids the moon, bright planets, high wind, and areas on the sky out of bounds, using the MoonAvoidanceBasisFunction, PlanetMaskBasisFunction, AvoidDirectWindBasisFunction, and the AltAzShadowMaskBasisFunction. Parameters ---------- nside : `int` or None The healpix nside to use. Default of None uses rubin_scheduler.utils.get_default_nside. moon_distance : `float`, optional Moon avoidance distance, in degrees. wind_speed_maximum : `float`, optional Wind speed maximum to apply to the wind avoidance basis function, in m/s. min_alt : `float`, optional Minimum altitude (in degrees) to observe. max_alt : `float`, optional Maximum altitude (in degrees) to observe. min_az : `float`, optional Minimum azimuth angle (in degrees) to observe. max_az : `float`, optional Maximum azimuth angle (in degrees) to observe. shadow_minutes : `float`, optional Avoid inaccessible alt/az regions, as well as parts of the sky which will move into those regions within `shadow_minutes` (minutes). Returns ------- mask_basis_functions : `list` [`BaseBasisFunction`] Mask basis functions should always be used with a weight of 0. The masked (np.nan or -np.inf) regions will remain masked, but the basis function values won't influence the reward. """ masks = [] # Add the Moon avoidance mask masks.append(basis_functions.MoonAvoidanceBasisFunction(nside=nside, moon_distance=moon_distance)) # Add a mask around bright planets masks.append(basis_functions.PlanetMaskBasisFunction(nside=nside)) # Add the wind avoidance mask masks.append(basis_functions.AvoidDirectWind(nside=nside, wind_speed_maximum=wind_speed_maximum)) # Avoid inaccessible parts of the sky, as well as places that will # move into those places within shadow_minutes. masks.append( basis_functions.AltAzShadowMaskBasisFunction( nside=nside, min_alt=min_alt, max_alt=max_alt, min_az=min_az, max_az=max_az, shadow_minutes=shadow_minutes, ) ) return masks
[docs] def simple_rewards( footprints: Footprint, filtername: str, nside: int = DEFAULT_NSIDE, m5_weight: float = 6.0, footprint_weight: float = 1.5, slewtime_weight: float = 3.0, stayfilter_weight: float = 3.0, repeat_weight: float = -20, ) -> list[basis_functions.BaseBasisFunction]: """A simple set of rewards for area-based surveys. Parameters ---------- footprints : `Footprint` A Footprint class, which takes a target map and adds a time-dependent weighting on top (such as seasonal weighting). filtername : `str` The filtername active for these rewards. nside : `int`, optional The nside for the rewards. m5_weight : `float`, optional The weight to give the M5Diff basis function. footprint_weight : `float`, optional The weight to give the footprint basis function. slewtime_weight : `float`, optional The weight to give the slewtime basis function. stayfilter_weight : `float`, optional The weight to give the FilterChange basis function. repeat_weight : `float`, optional The weight to give the VisitRepeat basis function. This is negative by default, to avoid revisiting the same part of the sky within `gap_max` (3 hours) in any filter (except in the pairs survey itself). Returns ------- reward_functions : `list` [(`BaseBasisFunction`, `float`)] List of tuples, each tuple is a reward function followed by its respective weight. Notes ----- Increasing the m5_weight moves visits toward darker skies. Increasing the footprint weight distributes visits more evenly. Increasing the slewtime weight acquires visits with shorter slewtime. The balance in the defaults here has worked reasonably for pair surveys in simulations. """ reward_functions = [] # Add M5 basis function (rewards dark sky) reward_functions.append( (basis_functions.M5DiffBasisFunction(filtername=filtername, nside=nside), m5_weight) ) # Add a footprint basis function # (rewards sky with fewer visits) reward_functions.append( ( basis_functions.FootprintBasisFunction( filtername=filtername, footprint=footprints, out_of_bounds_val=np.nan, nside=nside, ), footprint_weight, ) ) # Add a reward function for small slewtimes. reward_functions.append( ( basis_functions.SlewtimeBasisFunction(filtername=filtername, nside=nside), slewtime_weight, ) ) # Add a reward to stay in the same filter as much as possible. reward_functions.append( (basis_functions.FilterChangeBasisFunction(filtername=filtername), stayfilter_weight) ) # And this is technically a mask, to avoid asking for observations # which are not possible. However, it depends on the filters # requested, compared to the filters available in the camera. reward_functions.append((basis_functions.FilterLoadedBasisFunction(filternames=filtername), 0)) # And add a basis function to avoid repeating the same pointing # (VisitRepeat can be used to either encourage or discourage repeats, # depending on the repeat_weight value). reward_functions.append( ( basis_functions.VisitRepeatBasisFunction( gap_min=0, gap_max=3 * 60.0, filtername=None, nside=nside, npairs=20 ), repeat_weight, ) ) return reward_functions
[docs] def simple_pairs_survey( nside: int = DEFAULT_NSIDE, filtername: str = "g", filtername2: str | None = None, mask_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, reward_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, reward_basis_functions_weights: list[float] | None = None, survey_start: float = SURVEY_START_MJD, footprints_hp: np.ndarray | None = None, camera_rot_limits: list[float] = [-80.0, 80.0], pair_time: float = 30.0, exptime: float = 30.0, nexp: int = 1, science_program: str | None = None, ) -> BlobSurvey: """Set up a simple blob survey to acquire pairs of visits. Parameters ---------- nside : `int`, optional Nside for the surveys. filtername : `str`, optional Filtername for the first visit of the pair. filtername2 : `str` or None, optional Filtername for the second visit of the pair. If None, the first filter will be used for both visits. mask_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as masks (with implied weight 0). If None, `standard_masks` is used with default parameters. reward_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as rewards. If None, a basic set of basis functions will be used. reward_basis_functions_weights : `list` [`float`] or None List of values to use as weights for the reward basis functions. If None, default values for the basic set will be used. survey_start : `float` or None The start of the survey, in MJD. If None, `survey_start_mjd()` is used. This should be the start of the survey, not the current time. footprints_hp : `np.ndarray` (N,) or None An array of healpix maps with the target survey area, with dtype like [(filtername, '<f8'), (filtername2, '<f8')]. If None, `get_current_footprint()` will be used, which will cover the expected LSST survey footprint. camera_rot_limits : `list` [`float`] The rotator limits to expect for the camera. These should be slightly padded from true limits, to allow for slight delays between requesting observations and acquiring them. pair_time : `float` The ideal time between pairs of visits, in minutes. exptime : `float` The on-sky exposure time per visit. nexp : `int` The number of exposures per visit (exptime * nexp = total on-sky time). science_program : `str` | None The science_program key for the FieldSurvey. This maps to the BLOCK and `science_program` in the consDB. Returns ------- pair_survey : `BlobSurvey` A blob survey configured to take pairs at spacing of pair_time, in filtername + filtername2. """ # Use the Almanac to find the position of the sun at the start of survey. almanac = Almanac(mjd_start=survey_start) sun_moon_info = almanac.get_sun_moon_positions(survey_start) sun_ra_start = sun_moon_info["sun_RA"].copy() if footprints_hp is None: footprints_hp, labels = get_current_footprint(nside=nside) footprints = Footprint(mjd_start=survey_start, sun_ra_start=sun_ra_start, nside=nside) for f in footprints_hp.dtype.names: footprints.set_footprint(f, footprints_hp[f]) if mask_basis_functions is None: mask_basis_functions = standard_masks(nside=nside) # Mask basis functions have zero weights. mask_basis_functions_weights = [0 for mask in mask_basis_functions] if reward_basis_functions is None: # Create list of (tuples of) basic reward basis functions and weights. m5_weight = 6.0 footprint_weight = 1.5 slewtime_weight = 3.0 stayfilter_weight = 3.0 repeat_weight = -20 if filtername2 is None: reward_functions = simple_rewards( footprints=footprints, filtername=filtername, nside=nside, m5_weight=m5_weight, footprint_weight=footprint_weight, slewtime_weight=slewtime_weight, stayfilter_weight=stayfilter_weight, repeat_weight=repeat_weight, ) else: # Add the same basis functions, but M5 and footprint # basis functions need to be added twice, with half the weight. rf1 = simple_rewards( footprints=footprints, filtername=filtername, nside=nside, m5_weight=m5_weight / 2.0, footprint_weight=footprint_weight / 2.0, slewtime_weight=slewtime_weight, stayfilter_weight=stayfilter_weight, repeat_weight=repeat_weight, ) rf2 = simple_rewards( footprints=footprints, filtername=filtername2, nside=nside, m5_weight=m5_weight / 2.0, footprint_weight=footprint_weight / 2.0, slewtime_weight=0, stayfilter_weight=0, repeat_weight=0, ) # Now clean up and combine these - and remove the separate # BasisFunction for FilterLoadedBasisFunction. reward_functions = [(i[0], i[1]) for i in rf1 if i[1] > 0] + [ (i[0], i[1]) for i in rf2 if i[1] > 0 ] # Then put back in the FilterLoadedBasisFunction with both filters. filternames = [fn for fn in [filtername, filtername2] if fn is not None] reward_functions.append((basis_functions.FilterLoadedBasisFunction(filternames=filternames), 0)) # unpack the basis functions and weights reward_basis_functions_weights = [val[1] for val in reward_functions] reward_basis_functions = [val[0] for val in reward_functions] # Set up blob surveys. if filtername2 is None: survey_name = "simple pair %i, %s" % (pair_time, filtername) else: survey_name = "simple pair %i, %s%s" % (pair_time, filtername, filtername2) # Set up detailers for each requested observation. detailer_list = [] # Avoid camera rotator limits. detailer_list.append( detailers.CameraRotDetailer(min_rot=np.min(camera_rot_limits), max_rot=np.max(camera_rot_limits)) ) # Convert rotTelPos to rotSkyPos_desired detailer_list.append(detailers.Rottep2RotspDesiredDetailer(telescope="rubin")) # Reorder visits in a blob so that closest to current altitude is first. detailer_list.append(detailers.CloseAltDetailer()) # Sets a flush-by date to avoid running into prescheduled visits. detailer_list.append(detailers.FlushForSchedDetailer()) # Add a detailer to label visits as either first or second of the pair. if filtername2 is not None: detailer_list.append(detailers.TakeAsPairsDetailer(filtername=filtername2)) # Set up the survey. ignore_obs = ["DD"] BlobSurvey_params = { "slew_approx": 7.5, "filter_change_approx": 140.0, "read_approx": 2.4, "flush_time": pair_time * 3, "smoothing_kernel": None, "nside": nside, "seed": 42, "dither": True, "twilight_scale": False, } pair_survey = BlobSurvey( reward_basis_functions + mask_basis_functions, reward_basis_functions_weights + mask_basis_functions_weights, filtername1=filtername, filtername2=filtername2, exptime=exptime, ideal_pair_time=pair_time, survey_name=survey_name, ignore_obs=ignore_obs, nexp=nexp, detailers=detailer_list, science_program=science_program, **BlobSurvey_params, ) # Tucking this here so we can look at how many observations # recorded for this survey and what was the last one. pair_survey.extra_features["ObsRecorded"] = features.NObsCount() pair_survey.extra_features["LastObs"] = features.LastObservation() return pair_survey
[docs] def simple_greedy_survey( nside: int = DEFAULT_NSIDE, filtername: str = "r", mask_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, reward_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, reward_basis_functions_weights: list[float] | None = None, survey_start: float = SURVEY_START_MJD, footprints_hp: np.ndarray | None = None, camera_rot_limits: list[float] = [-80.0, 80.0], exptime: float = 30.0, nexp: int = 1, science_program: str | None = None, ) -> GreedySurvey: """Set up a simple greedy survey to just observe single visits. Parameters ---------- nside : `int`, optional Nside for the surveys. filtername : `str`, optional Filtername for the visits. mask_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as masks (with implied weight 0). If None, `standard_masks` is used with default parameters. reward_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as rewards. If None, a basic set of basis functions will be used. reward_basis_functions_weights : `list` [`float`] or None List of values to use as weights for the reward basis functions. If None, default values for the basic set will be used. survey_start : `float` or None The start of the survey, in MJD. If None, `survey_start_mjd()` is used. This should be the start of the survey, not the current time. footprints_hp : `np.ndarray` (N,) or None An array of healpix maps with the target survey area, with dtype like [(filtername, '<f8'), (filtername2, '<f8')]. If None, `get_current_footprint()` will be used, which will cover the expected LSST survey footprint. camera_rot_limits : `list` [`float`] The rotator limits to expect for the camera. These should be slightly padded from true limits, to allow for slight delays between requesting observations and acquiring them. exptime : `float` The on-sky exposure time per visit. nexp : `int` The number of exposures per visit (exptime * nexp = total on-sky time). science_program : `str` | None The science_program key for the FieldSurvey. This maps to the BLOCK and `science_program` in the consDB. Returns ------- greedy_survey : `GreedySurvey` A greedy survey configured to take the next best (single) visit in filtername. """ # Use the Almanac to find the position of the sun at the start of survey. almanac = Almanac(mjd_start=survey_start) sun_moon_info = almanac.get_sun_moon_positions(survey_start) sun_ra_start = sun_moon_info["sun_RA"].copy() if footprints_hp is None: footprints_hp, labels = get_current_footprint(nside=nside) footprints = Footprint(mjd_start=survey_start, sun_ra_start=sun_ra_start, nside=nside) for f in footprints_hp.dtype.names: footprints.set_footprint(f, footprints_hp[f]) if mask_basis_functions is None: mask_basis_functions = standard_masks(nside=nside) # Mask basis functions have zero weights. mask_basis_functions_weights = [0 for mask in mask_basis_functions] if reward_basis_functions is None: # Create list of (tuples of) basic reward basis functions and weights. m5_weight = 6.0 footprint_weight = 1.5 slewtime_weight = 3.0 stayfilter_weight = 3.0 repeat_weight = -5 reward_functions = simple_rewards( footprints=footprints, filtername=filtername, nside=nside, m5_weight=m5_weight, footprint_weight=footprint_weight, slewtime_weight=slewtime_weight, stayfilter_weight=stayfilter_weight, repeat_weight=repeat_weight, ) # unpack the basis functions and weights reward_basis_functions_weights = [val[1] for val in reward_functions] reward_basis_functions = [val[0] for val in reward_functions] # Set up survey name, use also for scheduler note. survey_name = f"simple greedy {filtername}" # Set up detailers for each requested observation. detailer_list = [] # Avoid camera rotator limits. detailer_list.append( detailers.CameraRotDetailer(min_rot=np.min(camera_rot_limits), max_rot=np.max(camera_rot_limits)) ) # Convert rotTelPos to rotSkyPos_desired detailer_list.append(detailers.Rottep2RotspDesiredDetailer(telescope="rubin")) # Reorder visits in a blob so that closest to current altitude is first. detailer_list.append(detailers.CloseAltDetailer()) # Sets a flush-by date to avoid running into prescheduled visits. detailer_list.append(detailers.FlushForSchedDetailer()) # Set up the survey. ignore_obs = ["DD"] GreedySurvey_params = { "nside": nside, "seed": 42, "dither": True, } greedy_survey = GreedySurvey( reward_basis_functions + mask_basis_functions, reward_basis_functions_weights + mask_basis_functions_weights, filtername=filtername, exptime=exptime, survey_name=survey_name, ignore_obs=ignore_obs, nexp=nexp, detailers=detailer_list, science_program=science_program, **GreedySurvey_params, ) # Tucking this here so we can look at how many observations # recorded for this survey and what was the last one. greedy_survey.extra_features["ObsRecorded"] = features.NObsCount() greedy_survey.extra_features["LastObs"] = features.LastObservation() return greedy_survey
[docs] def simple_rewards_field_survey( scheduler_note: str, nside: int = DEFAULT_NSIDE, sun_alt_limit: float = -12.0, ) -> list[basis_functions.BaseBasisFunction]: """Get some simple rewards to observe a field survey for a long period. Parameters ---------- scheduler_note : `str` The scheduler note for the field survey. Typically this will be the same as the field name. nside : `int` The nside value for the healpix grid. sun_alt_limit : `float`, optional Value for the sun's altitude at which to allow observations to start (or finish). Returns ------- bfs : `list` of `~.scheduler.basis_functions.BaseBasisFunction` """ bfs = [ basis_functions.NotTwilightBasisFunction(sun_alt_limit=sun_alt_limit), # Avoid revisits within 30 minutes - but we'll have to replace "note" basis_functions.VisitGap(filter_names=None, note=scheduler_note, gap_min=30.0), # reward fields which are rising, but don't mask out after zenith basis_functions.RewardRisingBasisFunction(nside=nside, slope=0.1, penalty_val=0), # Reward parts of the sky which are darker -- # note that this is only for r band, so relying on skymap in r band. # if there isn't a strong reason to go with the darkest pointing, # it might be reasonable to just drop this basis function basis_functions.M5DiffBasisFunction(filtername="r", nside=nside), ] return bfs
[docs] def simple_field_survey( field_ra_deg: float, field_dec_deg: float, field_name: str, mask_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, reward_basis_functions: list[basis_functions.BaseBasisFunction] | None = None, detailers: list[detailers.BaseDetailer] | None = None, sequence: str | list[str] = "ugrizy", nvisits: dict | None = None, exptimes: dict | None = None, nexps: dict | None = None, nside: int = DEFAULT_NSIDE, science_program: str | None = None, ) -> FieldSurvey: """Set up a simple field survey. Parameters ---------- field_ra_deg : `float` The RA (in degrees) of the field. field_dec_deg : `float` The Dec (in degrees) of the field. field_name : `str` The name of the field. This is used for the survey_name and transferred to the 'target' information in the output observation. Also used in 'scheduler_note', which is important for the FieldSurvey to know whether to count particular observations for the Survey. mask_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as masks (with implied weight 0). If None, `standard_masks` is used with default parameters. reward_basis_functions : `list` [`BaseBasisFunction`] or None List of basis functions to use as rewards. If None, a basic set of basis functions useful for long observations of a field within a night will be used (`get detailers : `list` of [`~.scheduler.detailer` objects] Detailers for the survey. Detailers can add information to output observations, including specifying rotator or dither positions. sequence : `str` or `list` [`str`] The filters (in order?) for the sequence of observations. nvisits : `dict` {`str`:`int`} | None Number of visits per filter to program in the sequence. Default of None uses nvisits = {"u": 20, "g": 20, "r": 20, "i": 20, "z": 20, "y": 20} exptimes : `dict` {`str`: `float`} | None Exposure times per filter to program in the sequence. Default of None uses exptimes = {"u": 38, "g": 30, "r": 30, "i": 30, "z": 30, "y": 30} nexps : `dict` {`str`: `int`} | None Number of exposures per filter to program in the sequence. Default of None uses nexps = {"u": 1, "g": 2, "r": 2, "i": 2, "z": 2, "y": 2} nside : `int`, optional Nside for the survey. Default DEFAULT_NSIDE. science_program : `str` | None The science_program key for the FieldSurvey. This maps to the BLOCK and `science_program` in the consDB. Returns ------- field_survey : `~.scheduler.surveys.FieldSurvey` The configured FieldSurvey. Notes ----- The sequences for a given field survey can be set via kwargs, not necessarily easily accessible here. Only portions of the sequence which correspond to mounted filters will be requested by the FieldSurvey. field_survey.extra_features['ObsRecord'] tracks how many observations have been accepted by the Survey (and can be useful for diagnostics). """ if mask_basis_functions is None: mask_basis_functions = standard_masks(nside=nside) if reward_basis_functions is None: reward_basis_functions = simple_rewards_field_survey(field_name, nside=nside) basis_functions = mask_basis_functions + reward_basis_functions if nvisits is None: nvisits = {"u": 20, "g": 20, "r": 20, "i": 20, "z": 20, "y": 20} if exptimes is None: exptimes = {"u": 38, "g": 30, "r": 30, "i": 30, "z": 30, "y": 30} if nexps is None: nexps = {"u": 1, "g": 2, "r": 2, "i": 2, "z": 2, "y": 2} field_survey = FieldSurvey( basis_functions, field_ra_deg, field_dec_deg, sequence=sequence, nvisits=nvisits, exptimes=exptimes, nexps=nexps, ignore_obs=None, survey_name=field_name, scheduler_note=field_name, target_name=field_name, readtime=2.4, filter_change_time=120.0, nside=nside, flush_pad=30.0, detailers=detailers, science_program=science_program, ) return field_survey