Source code for rubin_scheduler.site_models.downtime_model

__all__ = ("DowntimeModel",)

import warnings


[docs] class DowntimeModel: """Downtime estimates, both scheduled and unscheduled. Parameters ---------- config: DowntimeModelConfig, optional A configuration class for the downtime model. This can be None, in which case the default DowntimeModelConfig is used. The user should set any non-default values for DowntimeModelConfig before configuration of the actual DowntimeModel. self.efd_requirements and self.target_requirements are also set. efd_requirements is a tuple: (list of str, float). This corresponds to the data columns required from the EFD and the amount of time history required. target_requirements is a list of str. This corresponds to the data columns required in the target dictionary passed when calculating the processed telemetry values. """ def __init__( self, sched_down_col="scheduled_downtimes", unsched_down_col="unscheduled_downtimes", time_col="time", ): self.sched_down = sched_down_col self.unsched_down = unsched_down_col self.target_requirements = time_col def configure(self, config=None): warnings.warn("The configure method is deprecated.") def config_info(self): warnings.warn("The configure method is deprecated.")
[docs] def __call__(self, efd_data, target_dict): """Calculate the sky coverage due to clouds. Parameters ---------- efd_data: dict Dictionary of input telemetry, typically from the EFD. This must contain columns self.efd_requirements. (work in progress on handling time history). target_dict: dict Dictionary of target values over which to calculate the processed telemetry. (e.g. mapDict = {'ra': [], 'dec': [], 'altitude': [], 'azimuth': [], 'airmass': []}) Here we use 'time', an astropy.time.Time, as we just need to know the time. Returns ------- dict of bool, astropy.time.Time, astropy.time.Time Status of telescope (True = Down, False = Up) at time, time of expected end of downtime (~noon of the first available day), time of next scheduled downtime (~noon of the first available day). """ # Check for downtime in scheduled downtimes. time = target_dict[self.target_requirements] next_start = efd_data[self.sched_down]["start"].searchsorted(time, side="right") next_end = efd_data[self.sched_down]["end"].searchsorted(time, side="right") if next_start > next_end: # Currently in a scheduled downtime. current_sched = efd_data[self.sched_down][next_end] else: # Not currently in a scheduled downtime. current_sched = None # This will be the next reported/expected downtime. next_sched = efd_data[self.sched_down][next_start] # Check for downtime in unscheduled downtimes. next_start = efd_data[self.unsched_down]["start"].searchsorted(time, side="right") next_end = efd_data[self.unsched_down]["end"].searchsorted(time, side="right") if next_start > next_end: # Currently in an unscheduled downtime. current_unsched = efd_data[self.unsched_down][next_end] else: current_unsched = None # Figure out what to report about current state. if current_sched is None and current_unsched is None: # neither down status = False end_down = None else: # we have a downtime from something .. if current_unsched is None: # sched down only status = True end_down = current_sched["end"] elif current_sched is None: # unsched down only status = True # should decide what to report on end of downtime here .. end_down = current_unsched["end"] else: # both down .. status = True end_down = max(current_sched["end"], current_unsched["end"]) return {"status": status, "end": end_down, "next": next_sched["start"]}