Source code for rubin_nights.influx_query

import getpass
import logging
from functools import cache

import httpx
import numpy as np
import pandas as pd
from astropy.time import Time

from .reference_values import API_ENDPOINTS

logger = logging.getLogger(__name__)

__all__ = ["InfluxQueryClient", "day_obs_from_efd_index"]


[docs] def day_obs_from_efd_index(x: pd.Series) -> int: """Use with pandas apply(efd_values, axis=1) to get dayobs.""" dayobs_time = Time(np.floor(Time(x.name, scale="utc").tai.mjd - 0.5), format="mjd", scale="tai") return int(dayobs_time.isot.split("T")[0].replace("-", ""))
class RepertoireCredsError(Exception): pass
[docs] class InfluxQueryClient: """Query for InfluxDB data such as EFD. Parameters ---------- site The site to use for the EFD, e.g. usdf, summit, base. Note that influxdb sites can be special: e.g. currently the EFD at USDF is only on usdf-rsp (usdf, not -dev) and Sasquatch (PP metrics) is only at usdf-rsp-dev (usdf-dev). db_name The database to query. Not used for credentials info, but will be used to help guide to the correct location to fetch the credentials (usdf-int efd =>usdf efd). Default is "efd". auth The username and password for authentication to repertoire. Note that *repertoire* auth is site-specific, even though influx databases can be only available at one site (thus you may need cross-site authentication). Also only usdf-rsp currently works. If None, will fallback to segwarides. id_tag Add the service name or user name as a comment to the query. This aids in tracking query sources in EFD logs. If None, will be set to username. results_as_dataframe If True, convert query results into a pandas DataFrame. If False, results are returned as a list of dictionaries. query_timeout Time (in seconds) to wait for query to return. """ def __init__( self, site: str = "usdf", db_name: str = "efd", auth: tuple | None = None, id_tag: str | None = None, results_as_dataframe: bool = True, query_timeout: float = 5 * 60, ) -> None: # Site should match general user-expectations and keys in API_ENDPOINTS self.site = site.lower() # db_name is the identifier when sending query params to the RESTAPI self.db_name = db_name # influx_db will be the name in repertoire for the influxdb # so let's create that name from some potential values # aka lsst.prompt @ usdf -> usdf_prompt # aka efd @ usdf-dev -> usdfdev_efd self.influx_db = f"{self.site.replace('-', '')}" + "_" self.influx_db += f"{db_name.lower().replace("lsst.", "")}" self.results_as_dataframe = results_as_dataframe if self.results_as_dataframe: self.null_result = pd.DataFrame([]) else: self.null_result = [] if id_tag is None: id_tag = getpass.getuser() self.query_tag = f" /* {id_tag} via rubin_nights.InfluxQueryClient */" # For user convenience, save the last query. self.last_query = "No query issued yet." # Fetch the influxdb credentials. if auth is not None: try: self.url, influx_auth = self._fetch_credentials_repertoire(auth) except RepertoireCredsError: logger.error("Failed to fetch credentials from repertoire. Trying segwarides.") self.url, influx_auth = self._fetch_credentials_segwarides() else: logger.warning( "Fetching influx credentials from segwarides, " "without an auth token will soon be deprecated. " "Please add an auth tuple to your kwarg values." ) self.url, influx_auth = self._fetch_credentials_segwarides() # Set up connections to influx database RestAPI endpoint. timeout = httpx.Timeout(query_timeout, connect=10.0) self.httpx_client = httpx.Client(base_url=self.url, timeout=timeout, auth=influx_auth) self.async_client = httpx.AsyncClient(base_url=self.url, timeout=timeout, auth=influx_auth) def _fetch_credentials_repertoire(self, auth: tuple) -> tuple[str, tuple[str, bytes]]: """Fetch the credentials via repertoire. Parameters ---------- auth The username and password for authentication to repertoire (RSP/gaefaelfwr token). """ creds_service = f"{API_ENDPOINTS[self.site]}/repertoire/discovery/influxdb/{self.influx_db}" logger.debug(f"Attempting to fetch credentials from {creds_service}") try: response = httpx.get(creds_service, auth=auth) response.raise_for_status() except Exception as e: logger.error(f"Could not fetch credentials from repertoire at {creds_service}.") logger.error(e) raise RepertoireCredsError # Parse the influx db credentials. try: influx_creds = response.json() except Exception as e: logger.error(f"Could not parse credentials from repertoire at {creds_service}.") logger.error(e) raise RepertoireCredsError auth = (influx_creds["username"], influx_creds["password"]) url = influx_creds["url"] logger.info(f"Fetched credentials from repertoire for {self.influx_db}.") return url, auth def _fetch_credentials_segwarides(self) -> tuple[str, tuple[str, bytes]]: "Fetch the credentials via segwarides (to be deprecated)." # Segwarides fallback is more complicated if (self.influx_db == "usdf_efd") | (self.influx_db == "usdfdev_efd"): segwarides_db = "usdf_efd" elif self.influx_db == "usdf_obsenv": segwarides_db = "usdf_efd" else: segwarides_db = "usdfdev_efd" creds_service = f"https://roundtable.lsst.codes/segwarides/creds/{segwarides_db}" try: response = httpx.get(creds_service) response.raise_for_status() except Exception as e: logger.error( f"Could not fetch credentials from segwarides for {self.influx_db} " f"using {segwarides_db}" ) logger.error(e) raise e # Parse the creds. logger.info(f"Fetched credentials from segwarides for {self.influx_db}.") influx_creds = response.json() auth = (influx_creds["username"], influx_creds["password"]) url = "https://" + influx_creds["host"] + influx_creds["path"].rstrip("/") return url, auth def __repr__(self) -> str: return f"{self.influx_db} at {self.url}" def _to_dataframe(self, response: dict) -> pd.DataFrame: """Convert an InfluxDB response to a dataframe. Parameters ---------- response The JSON response from the InfluxDB API. """ # One InfluxQL query is submitted at a time statement = response["results"][0] if "series" not in statement: # zero results return pd.DataFrame([]) # One InfluxDB measurement queried at a time series = statement["series"][0] result = pd.DataFrame(series.get("values", []), columns=series["columns"]) if "time" not in result.columns: return result result = result.set_index(pd.to_datetime(result["time"], format="ISO8601")).drop("time", axis=1) if result.index.tzinfo is None: result.index = result.index.tz_localize("UTC") if "tags" in series: for k, v in series["tags"].items(): result[k] = v if "name" in series: result.name = series["name"] return result
[docs] @staticmethod def build_influxdb_query( measurement: str, fields: list[str] | str = "*", time_range: tuple[Time, Time] | None = None, filters: list[tuple[str, str]] | None = None, ) -> str: """Build an influx DB query for `fields` from `measurement` (topic), usually over a time range. Parameters ---------- measurement The name of the topic / measurement. fields List of fields to return from the topic. Default `*` returns all fields. time_range The time window (in astropy.time.Time) to query. filters The additional conditions to match for the query. e.g. ('salIndex', 1) would add salIndex=1 to the query. Returns ------- query : `str` """ if isinstance(fields, str): fields = [fields] fields = ", ".join(fields) if fields else "*" query = f'SELECT {fields} FROM "{measurement}"' conditions = [] if time_range: t_start, t_end = time_range conditions.append(f"time >= '{t_start.utc.isot}Z' AND time <= '{t_end.utc.isot}Z'") if filters: for key, value in filters: conditions.append(f"{key} = {value}") if conditions: query += " WHERE " + " AND ".join(conditions) return query
[docs] @staticmethod def build_influxdb_top_n_query( measurement: str, fields: list[str] | str = "*", num: int = 10, time_cut: Time | None = None, filters: list[tuple[str, str]] | None = None, ) -> str: """Build an influx DB query for `fields` from `measurement`, restricted to `num` records. Parameters ---------- measurement The name of the topic / measurement. fields List of fields to return from the topic. Default `*` will return all fields. num The maximum number of records to return. time_cut Search for only records at or before this time. filters The additional conditions to match for the query. e.g. ('salIndex', 1) would add salIndex=1 to the query. Returns ------- query : `str` """ if isinstance(fields, str): fields = [fields] fields = ", ".join(fields) if fields else "*" query = f'SELECT {fields} FROM "{measurement}"' conditions = [] if time_cut: conditions.append(f"time <= '{time_cut.utc.isot}Z'") if filters: for key, value in filters: conditions.append(f"{key} = {value}") if conditions: query += " WHERE " + " AND ".join(conditions) limit = f" GROUP BY * ORDER BY DESC LIMIT {num}" query += limit return query
[docs] def query(self, query: str) -> dict | pd.DataFrame: """Send and receive results from the InfluxDB API, with a synchronous query. Parameters ---------- query The query to send to the InfluxDb. Returns ------- result : `dict` or `pd.DataFrame` """ # Add an identifier string to the query params = {"db": self.db_name, "q": query + self.query_tag} self.last_query = query + self.query_tag try: response = self.httpx_client.get( "/query", params=params, ) logger.debug(f"Issued query: {params['q']}") response.raise_for_status() except Exception as e: logger.warning(e) response = None if response: if self.results_as_dataframe: result = self._to_dataframe(response.json()) else: result = response.json() else: result = [] if self.results_as_dataframe: result = pd.DataFrame(result) return result
[docs] async def async_query(self, query: str) -> dict | pd.DataFrame: """Send and receive results from the InfluxDB API, with an asynchronous query. Parameters ---------- query The query to send to the InfluxDb. Returns ------- result : `dict` or `pd.DataFrame` """ # Add an identifier string to the query params = {"db": self.db_name, "q": query + self.query_tag} self.last_query = query + self.query_tag try: response = await self.async_client.get( "/query", params=params, ) response.raise_for_status() logger.debug(f"Issued query: {params['q']}") except Exception as e: logger.warning(e) response = None if response: if self.results_as_dataframe: result = self._to_dataframe(response.json()) else: result = response.json() else: result = [] if self.results_as_dataframe: result = pd.DataFrame(result) return result
[docs] @cache def get_topics(self) -> list[str]: """Find all available topics.""" # Just use sync query. It runs once. r = self.query("show measurements") self.topics = list(r["name"].values) return self.topics
[docs] def get_fields(self, measurement: str) -> pd.DataFrame: """Query the list of field names for a topic. Parameters ---------- measurement Name of measurement/topic to query for field names. Returns ------- fields : `pd.DataFrame` DataFrame with fieldKey / fieldType columns. """ query = f'show field keys from "{measurement}"' return self.query(query)
[docs] async def async_get_fields(self, measurement: str) -> pd.DataFrame: """Query the list of field names for a topic. Parameters ---------- measurement Name of measurement/topic to query for field names. Returns ------- fields : `pd.DataFrame` DataFrame with fieldKey / fieldType columns. """ query = f'show field keys from "{measurement}"' return await self.async_query(query)
def _time_series_query( self, topic_name: str, fields: str | list[str], t_start: Time, t_end: Time, index: int | None = None, ) -> str: """Build specific query between t_start and t_end. Adds some logging and checks around build_influxdb_query. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. t_start The start of the time window for the query. t_end The end of the time window for the query. Returns ------- query: `str` """ if topic_name not in self.get_topics(): logger.error(f"{topic_name} not in {self.db_name} topics.") return self.null_result if index: filters = [("salIndex", str(index))] else: filters = None query = self.build_influxdb_query( topic_name, fields=fields, time_range=(t_start, t_end), filters=filters ) return query
[docs] def select_time_series( self, topic_name: str, fields: str | list[str], t_start: Time, t_end: Time, index: int | None = None, ) -> pd.DataFrame | list[dict]: """Sync query to return data from `topic_name` between `t_start` and `t_end`. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. t_start The start of the time window for the query. t_end The end of the time window for the query. Returns ------- query_results: `pd.DataFrame` or `list` [ `dict` ] The result of the query. """ query = self._time_series_query( topic_name=topic_name, fields=fields, t_start=t_start, t_end=t_end, index=index ) return self.query(query)
[docs] async def async_select_time_series( self, topic_name: str, fields: str | list[str], t_start: Time, t_end: Time, index: int | None = None, ) -> pd.DataFrame | list[dict]: """Async query to return data from `topic_name` between `t_start` and `t_end`. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. t_start The start of the time window for the query. t_end The end of the time window for the query. Returns ------- query_results: `pd.DataFrame` or `list` [ `dict` ] The result of the query. """ query = self._time_series_query( topic_name=topic_name, fields=fields, t_start=t_start, t_end=t_end, index=index ) return await self.async_query(query)
def _top_n_query( self, topic_name: str, fields: str | list[str], num: int, time_cut: Time = None, index: int | None = None, ) -> str: """Build specific query for most recent `num` records. Adds some logging and checks around build_influxdb_top_n_query. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. num The number of records to return. time_cut If not None, looks for records prior to this time. Returns ------- query: `str` """ if topic_name not in self.get_topics(): logger.error(f"{topic_name} not in {self.db_name} topics.") return self.null_result if index: filters = [("salIndex", str(index))] else: filters = None query = self.build_influxdb_top_n_query( topic_name, fields=fields, num=num, time_cut=time_cut, filters=filters ) return query
[docs] def select_top_n( self, topic_name: str, fields: str | list[str], num: int, time_cut: Time = None, index: int | None = None, ) -> pd.DataFrame | list[dict]: """Sync query to return `num` records from `topic_name`. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. num The number of records to return. time_cut If not None, looks for records prior to this time. Returns ------- query_results: `pd.DataFrame` or `list` [ `dict` ] The result of the query. """ query = self._top_n_query( topic_name=topic_name, fields=fields, num=num, time_cut=time_cut, index=index ) return self.query(query)
[docs] async def async_select_top_n( self, topic_name: str, fields: str | list[str], num: int, time_cut: Time = None, index: int | None = None, ) -> pd.DataFrame | list[dict]: """Async query to return `num` records from `topic_name`. Parameters ---------- topic_name The name of the topic or measurement to query. fields The field or fields to return from topic_name. The entry '*' will return all fields. num The number of records to return. time_cut If not None, looks for records prior to this time. Returns ------- query_results: `pd.DataFrame` or `list` [ `dict` ] The result of the query. """ query = self._top_n_query( topic_name=topic_name, fields=fields, num=num, time_cut=time_cut, index=index ) return await self.async_query(query)