Source code for rubin_nights.targets_and_visits

import logging

import numpy as np
import pandas as pd
from astropy.time import Time

from .ts_xml_enums import CSCState

logger = logging.getLogger(__name__)

__all__ = [
    "targets_and_visits",
]


[docs] def targets_and_visits( t_start: Time, t_end: Time, endpoints: dict, queue_index: int = 1 ) -> tuple[pd.DataFrame, list[str], pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Dataframe showing linked Targets, Observations, NextVisits and Visits. Parameters ---------- t_start Time of the start of the events. t_end Time of the end of the events. endpoints Endpoints is a dictionary of client connections to the EFD and the ConsDb, such as returned by `rubin_nights.connections.get_clients`. queueIndex The SalIndex to query for Targets, corresponding to the Scheduler queue. Default of 1 corresponds to the Simonyi queue. Using queueIndex = 2 will trigger a request for latiss visits. Returns ------- targets_and_visits : `pd.DataFrame` A Dataframe of Target, Observation, NextVisit and Visits. cols: `list` [`str`] The short-list of columns for display in the table. target_and_observations : `pd.DataFrame` A Dataframe of Targets joined to Observations. nextvisit_and_visits : `pd.DataFrame` A Dataframe of nextVisits joined to Visits. visits : `pd.DataFrame` A dataframe of all of the visits during the time period. """ # Fetch the targets topic = "lsst.sal.Scheduler.logevent_target" targets = endpoints["efd"].select_time_series(topic, "*", t_start, t_end, index=queue_index) if len(targets) > 0: targets = targets.query("snapshotUri != ''") logger.debug(f"{len(targets)} targets events") # Fetch the observations topic = "lsst.sal.Scheduler.logevent_observation" fields = [ "additionalInformation", "blockId", "decl", "exptime", "filter", "mjd", "nexp", "ra", "rotSkyPos", "salIndex", "targetId", ] observations = endpoints["efd"].select_time_series(topic, fields, t_start, t_end, index=queue_index) if len(observations) == 0: observations = pd.DataFrame([], columns=fields + ["time"]) logger.debug(f"{len(observations)} observation events") # Fetch and consolidate the nextVisits topic = "lsst.sal.ScriptQueue.logevent_nextVisit" fields = ["scriptSalIndex", "groupId", "position0", "position1", "cameraAngle"] nextvisits = endpoints["efd"].select_time_series(topic, fields, t_start, t_end, index=queue_index) logger.debug(f"{len(nextvisits)} next visit events") if len(nextvisits) > 0: # Multiple next visit events can be issued for the same target, so # group next visit events on script salindex if the target is the same. # Only the last groupId will be the acquired exposure. nextvisits = ( nextvisits.reset_index() .groupby(["scriptSalIndex", "position0", "position1", "cameraAngle"]) .last() .reset_index() ) nextvisits = nextvisits.set_index("time") logger.debug(f"{len(nextvisits)} next visit events for unique targets") # Fetch the visits from the ConsDB if queue_index == 2: instrument = "latiss" else: instrument = "lsstcam" visits = endpoints["consdb"].get_visits(instrument, t_start, t_end, augment=True) logger.debug(f"{len(visits)} visits") if len(targets) == 0: logger.info("Found 0 targets; returning visits") return pd.DataFrame([]), [], pd.DataFrame([]), pd.DataFrame([]), visits # In theory, targets and observations could be merged directly on targetId. # However, targetId is not unique across Scheduler re-enable times. # This can be due to resetting unused targetIds OR it could be due # to using different FBS databases for recording observations. # Merge only within periods where the scheduler was continuously enabled. # Scheduler restarts: enabled_state = CSCState.ENABLED.value # noqa: F841 topic = "lsst.sal.Scheduler.logevent_summaryState" fields = ["summaryState"] dd = endpoints["efd"].select_time_series(topic, fields, t_start, t_end, index=queue_index) if len(dd) > 0: # Identify re-enable times restarts = dd.query("summaryState == @enabled_state") # Split targets up into sections. target_idxs = np.searchsorted(targets.index.values, restarts.index.values) target_idx_start = np.concatenate([np.array([0]), target_idxs]) target_idx_end = np.concatenate([target_idxs, np.array([len(targets)])]) obs_idxs = np.searchsorted(observations.index.values, restarts.index.values) obs_idx_start = np.concatenate([np.array([0]), obs_idxs]) obs_idx_end = np.concatenate([obs_idxs, np.array([len(observations)])]) else: # There were no restarts of the queue. target_idx_start = np.array([0]) target_idx_end = np.array([len(targets)]) obs_idx_start = np.array([0]) obs_idx_end = np.array([len(observations)]) to = [] for i in range(len(target_idx_start)): t_targets = targets.iloc[target_idx_start[i] : target_idx_end[i]] t_observations = observations.iloc[obs_idx_start[i] : obs_idx_end[i]] if len(t_targets) == 0: # Nothing to merge/add from this period. continue elif len(t_observations) == 0: new_df = pd.DataFrame( np.zeros((len(t_targets.index.values), len(observations.columns.values))), columns=observations.columns.values, index=t_targets.index, ) new_df.rename({"time": "time_o"}, axis=1, inplace=True) new_df.time_o = np.nan t_to = pd.merge(targets, new_df, left_index=True, right_index=True, suffixes=("", "_o")) t_to.reset_index("time", inplace=True) else: t_to = pd.merge_asof( t_targets.sort_values("targetId").reset_index("time"), t_observations.sort_values("targetId").reset_index("time"), on="targetId", left_by=["ra", "decl", "skyAngle"], right_by=["ra", "decl", "rotSkyPos"], suffixes=("", "_o"), allow_exact_matches=True, direction="forward", ) t_to.sort_values(by="time", inplace=True) to.append(t_to) if len(to) == 0: # No information; make a minimal dataframe to be able to continue. to = pd.DataFrame([], columns=["targetId", "blockId", "skyAngle"]) else: to = pd.concat(to) to = to.astype({"targetId": int, "blockId": int, "skyAngle": float}) to.drop([c for c in to.columns if "private" in c], axis=1, inplace=True) logger.debug(f"Joined targets and observations for {len(to)} events") # If either visit or nextvisit are empty, just quit here. if len(visits) == 0: logger.warning("Could not retrieve any visits") return pd.DataFrame([]), [], to, nextvisits, visits elif len(nextvisits) == 0: logger.warning("Could not find any nextVisits, can't link to visits") return pd.DataFrame([]), [], to, nextvisits, visits # nextVisit to visits groupId should be unique nv = pd.merge( visits, nextvisits.reset_index("time"), how="outer", left_on="group_id", right_on="groupId", suffixes=["", "_nv"], ) visit_id = np.where(np.isnan(nv["visit_id"].values), 0, nv["visit_id"].values) nv["visit_id"] = visit_id scriptSalIndex = np.where(np.isnan(nv["scriptSalIndex"].values), 0, nv["scriptSalIndex"].values) nv["scriptSalIndex"] = scriptSalIndex nv = nv.astype({"visit_id": int, "scriptSalIndex": int, "cameraAngle": float}) nv.drop([c for c in nv.columns if "private" in c], axis=1, inplace=True) logger.debug(f"Joined nextvisit and visits for {len(nv)} records") # Join targets and next visit BUT blockId == salScriptId # is only unique within times of scriptqueue restarts # We can narrow down the links using the angle of the rotator # (better would be to fetch times of restarts, but this is cheap) # (works for science visits, but other programs may not) # Make sure column names in visits take priority vt = pd.merge_asof( to.sort_values("blockId"), nv.sort_values("scriptSalIndex"), left_on="blockId", right_on="scriptSalIndex", suffixes=["_tob", ""], left_by=["skyAngle"], right_by=["cameraAngle"], allow_exact_matches=True, direction="forward", ) int_cols = ["visit_id", "day_obs", "seq_num", "scriptSalIndex"] for col in int_cols: tt = np.where(np.isnan(vt[col].values), 0, vt[col].values) vt[col] = tt vt = vt.astype(dict([(col, int) for col in int_cols])) vt.sort_values("time", inplace=True) logger.debug(f"Joined targets+observations with nextvisit+visit for {len(vt)} records") # Rename values for clarity vt.rename( {"time_tob": "time_target", "time_o": "time_observation", "time": "time_nextvisit"}, axis=1, inplace=True, ) # Some handy columns cols = [ "visit_id", "day_obs", "seq_num", "time_target", "time_observation", "time_nextvisit", "obs_start", "group_id", "s_ra", "s_dec", "sky_rotation", "skyAngle", "band", "scriptSalIndex", "note", "observation_reason", "target_name", ] return vt, cols, to, nv, visits