import logging
import pandas as pd
from astropy.time import Time
from .influx_query import InfluxQueryClient
logger = logging.getLogger(__name__)
__all__ = [
"diasource_visit_summaries",
]
[docs]
def diasource_visit_summaries(
t_start: Time, t_end: Time, influx_client: InfluxQueryClient | None = None
) -> pd.DataFrame:
"""Query lsst.prompt.prod topics to retrieve diasource metrics outputs.
Parameters
----------
t_start
Time of the start of the events.
t_end
Time of the end of the events.
influx_client
InfluxQueryClient with prompt processing metric outputs.
If None, appropriate connection at USDF will be used.
This should be equivalent to the endpoints['pp'] client.
Returns
-------
alertsum : `pd.DataFrame`
DataFrame with visit summaries of diasource metrics.
"""
if influx_client is None:
influx_client = InfluxQueryClient("usdf-dev", db_name="lsst.prompt")
# DiaSources
topic = "lsst.prompt.prod.numDiaSourcesGood"
dia_det: pd.DataFrame = influx_client.select_time_series(
topic, ["visit", "detector", "numAllDiaSources", "numGoodDiaSources", "run"], t_start, t_end
)
logger.info(f"Retrieved {len(dia_det)} records from {topic}.")
if len(dia_det) > 0:
alertsum = dia_det.groupby("visit").agg(
{
"numAllDiaSources": ("sum", "median", "std"),
"numGoodDiaSources": ("sum", "median"),
"detector": "count",
}
)
alertsum.index = alertsum.index.astype(int)
cols = [f"{c[0]}_{c[1]}" for c in alertsum.columns]
alertsum = alertsum.droplevel(level=0, axis=1)
alertsum.columns = cols
alertsum.rename({"detector_count": "nDiaDetectors_count"}, axis=1, inplace=True)
else:
alertsum = pd.DataFrame(
[],
columns=[
"numAllDiaSources_sum",
"numAllDiaSources_median",
"numGoodDiaSources_sum",
"numGoodDiaSources_median",
"nDiaDetectors_count",
],
)
logger.warning(f"No records in {topic}.")
# Solar system alerts
topic = "lsst.prompt.prod.numSsObjects"
sso_det: pd.DataFrame = influx_client.select_time_series(
topic, ["visit", "detector", "NumSsObjectsMetric", "run"], t_start, t_end
)
logger.info(f"Retrieved {len(sso_det)} records from {topic}.")
if len(sso_det) > 0:
ssosum = sso_det.groupby("visit").agg({"NumSsObjectsMetric": ("sum", "median"), "detector": "count"})
ssosum.index = ssosum.index.astype(int)
cols = [f"{c[0]}_{c[1]}" for c in ssosum.columns]
ssosum = ssosum.droplevel(level=0, axis=1)
ssosum.columns = cols
ssosum.rename(
{
"detector_count": "nSsDetectors_count",
"NumSsObjectsMetric_sum": "numSsObjects_sum",
"NumSsObjectsMetric_median": "numSsObjects_median",
},
axis=1,
inplace=True,
)
else:
ssosum = pd.DataFrame([], columns=["numSsObjects_sum", "numSsObjects_median", "nSsDetectors_count"])
logger.warning(f"No records from {topic}")
# And direct solar system associations (not alerts)
topic = "lsst.prompt.prod.numDirectSsObjects"
asso_det: pd.DataFrame = influx_client.select_time_series(
topic, ["visit", "detector", "NumSsObjectsMetric", "run"], t_start, t_end
)
logger.info(f"Retrieved {len(asso_det)} records from {topic}.")
if len(asso_det) > 0:
assosum = asso_det.groupby("visit").agg({"NumSsObjectsMetric": "sum", "detector": "count"})
assosum.index = assosum.index.astype(int)
assosum.rename(
{"detector": "nDSsDetectors_count", "NumSsObjectsMetric": "numDirectSsObjects_sum"},
axis=1,
inplace=True,
)
else:
assosum = pd.DataFrame([], columns=["numDirectSsObjects_sum", "nDSsDetectors_count"])
logger.warning(f"No records from {topic}")
alertsum = pd.merge(ssosum, alertsum, how="outer", left_index=True, right_index=True)
alertsum = pd.merge(assosum, alertsum, how="outer", left_index=True, right_index=True)
return alertsum