Source code for rubin_nights.rubin_scheduler_addons

import logging
import warnings

import numpy as np
import numpy.typing as npt
import pandas as pd
from astropy.time import Time

from .influx_query import InfluxQueryClient
from .observatory_status import get_tma_limits
from .reference_values import PLATESCALE, SIGMA_TO_FWHM

try:
    from rubin_scheduler.scheduler.model_observatory import KinemModel, rotator_movement, tma_movement
    from rubin_scheduler.site_models import Almanac, SeeingModel
    from rubin_scheduler.utils import (
        Site,
        SysEngVals,
        angular_separation,
        approx_altaz2pa,
        approx_ra_dec2_alt_az,
    )

    HAS_RUBIN_SCHEDULER = True
except ModuleNotFoundError:
    HAS_RUBIN_SCHEDULER = False

logger = logging.getLogger(__name__)

SKIPTIME = 600.0 / 60 / 60 / 24  # a big slew in JD/days
CAM_FWHM = 0.207  # arcseconds

__all__ = ["add_rubin_scheduler_cols", "add_model_slew_times"]


[docs] def add_rubin_scheduler_cols( visits: pd.DataFrame, cols_from: str = "visit1_quicklook", ) -> pd.DataFrame: """Add columns that require rubin_scheduler (including Almanac) parallactic angle and rotator angle, LST, and moon information. Parameters ---------- visits The visit information from cdb_{instrument}.visit1 and cdb_{instrument}.visit1_quicklook (if available). cols_from Use columns expected from the visit1_quicklook table or from the ccdvisit1_quicklook table. The difference is whether _median is at the end of the column name. Returns ------- visits : `pd.DataFrame` The visit information, with additional columns added for predicted zeropoint values, sky background in magnitudes, an estimated m5 depth (from zeropoint + sky). Notes ----- Columns calculated and added: lst, HA (via astropy) moon_alt, moon_az, moon_RA, moon_dec, moon_distance, moon_illum (via the rubin_scheduler almanac) fwhm_eff, fwhm_geom, fwhm_500_zenith (via something close to the rubin_scheduler SeeingModel (but lambda^-0.2) approx_pa, approx_rotTelPos (via rubin_scheduler approx values) """ if not HAS_RUBIN_SCHEDULER: logger.info("No rubin_scheduler available, simply returning visits.") return visits if cols_from.startswith("visit"): psf_col = "psf_sigma_median" psf_area_col = "psf_area_median" pixel_scale_col = "pixel_scale_median" elif cols_from.startswith("ccd"): psf_col = "psf_sigma" psf_area_col = "psf_area" pixel_scale_col = "pixel_scale" else: raise ValueError( "cols_from should indicate either ccd or visit table, and start with 'ccd' or 'visit'", ) # Try to add seeing columns, if psf_sigma column in visits. if psf_col in visits.columns: # replace PLATESCALE with x.pixel_scale_median when available and good pixel_scale: float | npt.NDArray if pixel_scale_col in visits.columns: pixel_scale = np.where( np.isnan(visits[pixel_scale_col].values), PLATESCALE, visits[pixel_scale_col].values ) # Remove nonsense values pixel_scale = np.where(visits[pixel_scale_col].values > PLATESCALE * 2.5, PLATESCALE, pixel_scale) else: pixel_scale = PLATESCALE # psf_sigma -> fwhm_geom ("fwhm_second_moment") fwhm_geom = visits[psf_col] * SIGMA_TO_FWHM * pixel_scale # fwhm_geom -> fwhm_eff .. perhaps incorrect for rubin + PSF model fwhm_eff_geom = SeeingModel.fwhm_geom_to_fwhm_eff(fwhm_geom) # n_eff -> fwhm_eff fwhm_eff = np.sqrt(visits[psf_area_col] / 2.266) * pixel_scale sev = SysEngVals() wavelen_corrections = np.zeros(len(visits), float) for band in visits.band.unique(): match = np.where(visits.band.values == band) if band not in "ugrizy": wavelen_corrections[match] = 1 else: # SeeingModel uses 0.3, but summit_utils (+RHL) says 0.2 wavelen_corrections[match] = np.power(500 / sev.eff_wavelengths[band], 0.2) # SeeingModel uses 0.6 and summit_utils agrees airmass_corrections = np.power(visits.airmass.values, 0.6) # Note: these corrections *multiply* the 500nm/zenith to the # actual airmass/bandpass values (so divide the actual values to get # back to 500nm/zenith). This is the opposite sense to summit_utils. if "aos_fwhm" in visits.columns and "donut_blur_fwhm" in visits.columns: idiq_aos_cam = np.sqrt(visits.aos_fwhm**2 + CAM_FWHM**2) # donut_blur (especially in y band) can be larger than fwhm with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) idiq_donut_blur = np.sqrt(fwhm_geom**2 - visits.donut_blur_fwhm**2 + CAM_FWHM**2) atm_fwhm_donut_blur = np.sqrt(visits.donut_blur_fwhm**2 - CAM_FWHM**2) atm_fwhm_aos_cam = np.sqrt(fwhm_geom**2 - idiq_aos_cam**2) atm_500_zenith = atm_fwhm_aos_cam / wavelen_corrections / airmass_corrections atm_500_zenith_donut_blur = atm_fwhm_donut_blur / wavelen_corrections / airmass_corrections # Simple quadrature fwhm_500_zenith = np.sqrt(atm_500_zenith**2 + idiq_aos_cam**2) seeing_df = pd.DataFrame( { "fwhm_eff": fwhm_eff, "fwhm_geom": fwhm_geom, "fwhm_eff_geom": fwhm_eff_geom, "fwhm_500_zenith": fwhm_500_zenith, "atm_fwhm": atm_fwhm_aos_cam, "atm_500_zenith": atm_500_zenith, "idiq_aos_cam": idiq_aos_cam, "atm_fwhm_donut_blur": atm_fwhm_donut_blur, "atm_500_zenith_donut_blur": atm_500_zenith_donut_blur, "idiq_donut_blur": idiq_donut_blur, "pixel_scale_est": pixel_scale, } ) else: seeing_df = pd.DataFrame( {"fwhm_eff": fwhm_eff, "fwhm_geom": fwhm_geom, "pixel_scale_est": pixel_scale} ) for col in seeing_df.columns: if col in visits.columns: visits.drop(labels=col, axis=1, inplace=True) visits = visits.merge(seeing_df, right_index=True, left_index=True) # Add more columns about sky conditions lsst_loc = Site("LSST") times = Time(visits.obs_start_mjd, format="mjd", scale="tai", location=lsst_loc.to_earth_location()) lst = times.sidereal_time("mean").deg hour_angle = (visits.s_ra - lst) / 360 * 12 % 24 if "altitude" in visits and "azimuth" in visits: alt = visits.altitude az = visits.azimuth else: alt, az = approx_ra_dec2_alt_az( visits.s_ra.values, visits.s_dec.values, lsst_loc.latitude, lsst_loc.longitude, visits.exp_midpt_mjd.values, lmst=None, ) approx_parallactic = approx_altaz2pa(alt, az, lsst_loc.latitude) almanac = Almanac() almanac_values = almanac.get_sun_moon_positions(visits.exp_midpt_mjd.values) moon_RA = np.degrees(almanac_values["moon_RA"]) moon_dec = np.degrees(almanac_values["moon_dec"]) moon_distance = angular_separation(moon_RA, moon_dec, visits.s_ra.values, visits.s_dec.values) moon_illum = almanac.get_sun_moon_positions(visits.exp_midpt_mjd.values)["moon_phase"] new_df = pd.DataFrame( { "lst": lst, "HA": hour_angle, "approx_parallactic": approx_parallactic, "sun_alt": np.degrees(almanac_values["sun_alt"]), "sun_az": np.degrees(almanac_values["sun_az"]), "sun_RA": np.degrees(almanac_values["sun_RA"]), "sun_Dec": np.degrees(almanac_values["sun_dec"]), "moon_alt": np.degrees(almanac_values["moon_alt"]), "moon_az": np.degrees(almanac_values["moon_az"]), "moon_RA": moon_RA, "moon_Dec": moon_dec, "moon_distance": moon_distance, "moon_illum": moon_illum, }, index=visits.index, ) for n in new_df.columns: if n in visits.columns: visits.drop(labels=n, axis=1, inplace=True) visits = visits.merge(new_df, right_index=True, left_index=True) return visits
[docs] def add_model_slew_times( visits: pd.DataFrame, efd_client: InfluxQueryClient, model_settle: float = 1, dome_crawl: bool = False, slew_while_changing_filter: bool = False, ideal_tma: float = 40, ) -> tuple[pd.DataFrame, pd.DataFrame]: """ "Add model (applied tma limits plus FBS-default tma limits) calculated slewtimes to visits dataframe, in `slew_model` and `slew_model_ideal`. This is only applicable to SimonyiTel at present! Parameters ---------- visits The visit information. Expected to contain columns of s_ra, s_dec, sky_rotation, obs_start_mjd and band for slewtime calculation. efd_client Used to query the EFD for the applied TMA limits at the time of the visits. model_settle The amount of settle time to add to the model_slew. This should make the model_slew time match the TMAevent time. Might vary over time. dome_crawl Enable dome crawl when calculating slew times, if True. slew_while_changing_filter Slew and change filter at the same time, or if False - sequentially. ideal_tma Model TMA movement value to use for the ideal model, in percent. Returns ------- visits_with_slews, slews : `pd.DataFrame`, `pd.DataFrame` Same visit information, with additional columns `slew_model` and `slew_model_ideal`. Notes ----- Since the slew should be calculated from the previous location on the sky, subsets of visits that do not include the starting position may have inaccurate first slew estimates. Slews are the model slewtime *to* the visit (and compare against `visit_gap` for the same visit). """ if not HAS_RUBIN_SCHEDULER: logger.info("No rubin_scheduler available, cannot calculate model slew times.") return visits, None t_start = Time(visits.obs_start_mjd.min(), format="mjd", scale="tai") t_end = Time(visits.obs_start_mjd.max(), format="mjd", scale="tai") tma_speeds = get_tma_limits(t_start, t_end, efd_client) readtime = 3.07 kinematic_model_ideal = KinemModel(mjd0=t_start.mjd - 0.1) # When evaluating slew times between actual images, need to # remove delay for closed-loop (the image itself represents the delay) kinematic_model_ideal.setup_optics(cl_delay=[0, 0]) kinematic_model_ideal.setup_telescope( **tma_movement(ideal_tma), altitude_minpos=15, altitude_maxpos=86.5, azimuth_minpos=-262, azimuth_maxpos=262, ) kinematic_model_ideal.setup_camera(**rotator_movement(100), readtime=readtime) kinematic_model_ideal.mount_bands(["u", "g", "r", "i", "z", "y"]) # Slower kinematic model to modify with actual telescope parameters # Set up current kinematic model. kinematic_model = KinemModel(mjd0=t_start.mjd - 0.1) # When evaluating slew times between actual images, need to # remove delay for closed-loop (the image itself represents the delay) kinematic_model.setup_optics(cl_delay=[0, 0]) kinematic_model.setup_camera(band_changetime=120, **rotator_movement(100), readtime=readtime) kinematic_model.mount_bands(["u", "g", "r", "i", "z", "y"]) model_slewtimes = {} # current performance model model_slewtimes_ideal = {} # ideal performance model tma_alt_maxv = {} tma_az_maxv = {} for dayobs in visits.day_obs.unique(): night_visits = visits.query("day_obs == @dayobs").sort_values(by="seq_num") if len(night_visits) > 0: # Park the kinematic models at the start of the night kinematic_model.park() kinematic_model_ideal.park() # Now sequentially slew through visits for visitid, v in night_visits.iterrows(): last_idx = np.where(tma_speeds.index.values - np.datetime64(v.obs_start) < 0)[0][-1] tma = dict(tma_speeds.iloc[last_idx]) tma["settle_time"] = model_settle # Change speeds on non-ideal kinematic model kinematic_model.setup_telescope(**tma) tma_alt_maxv[visitid] = tma["altitude_maxspeed"] tma_az_maxv[visitid] = tma["azimuth_maxspeed"] if np.isnan(v.s_ra) | np.isnan(v.s_dec): model_slewtimes[visitid] = np.nan model_slewtimes_ideal[visitid] = np.nan else: ra_rad = np.array([np.radians(v.s_ra)]) dec_rad = np.array([np.radians(v.s_dec)]) sky_angle = np.array([np.radians(v.sky_rotation)]) # MJD should be time at start of slew # But we may have large gaps or skipped visits if (v.obs_start_mjd - v.prev_obs_end_mjd) > SKIPTIME: mjd = v.obs_start_mjd min_overhead = 0.0 else: mjd = v.prev_obs_end_mjd min_overhead = readtime band = np.array([v.band]) slewtime = kinematic_model.slew_times( ra_rad, dec_rad, mjd, rot_sky_pos=sky_angle, bandname=band, lax_dome=dome_crawl, slew_while_changing_filter=slew_while_changing_filter, constant_band_changetime=False, update_tracking=True, ) if isinstance(slewtime, float): model_slewtimes[visitid] = max(slewtime, min_overhead) else: model_slewtimes[visitid] = max(slewtime[0], min_overhead) slewtime = kinematic_model_ideal.slew_times( ra_rad, dec_rad, mjd, rot_sky_pos=sky_angle, bandname=band, lax_dome=True, slew_while_changing_filter=slew_while_changing_filter, constant_band_changetime=False, update_tracking=True, ) if isinstance(slewtime, float): model_slewtimes_ideal[visitid] = max(slewtime, min_overhead) else: model_slewtimes_ideal[visitid] = max(slewtime[0], min_overhead) slewing = pd.DataFrame( { "slew_model": model_slewtimes, "slew_model_ideal": model_slewtimes_ideal, "tma_alt_maxv": tma_alt_maxv, "tma_ax_maxv": tma_az_maxv, }, index=visits.index, ) if "visit_gap" in visits: slewing["model_gap"] = visits.visit_gap - slewing.slew_model # Add also the distance on the sky between the visits (degrees) # This isn't always the slew distance, but it's the best we can do here distances = angular_separation( visits.s_ra[1:].values, visits.s_dec[1:].values, visits.s_ra[0:-1].values, visits.s_dec[0:-1].values ) slewing["slew_distance"] = np.concatenate([np.array([0]), distances]) visits = visits.merge(slewing, right_index=True, left_index=True) return visits, slewing