Source code for rubin_nights.consdb_query

import datetime
import logging
from json import JSONDecodeError

import httpx
import numpy as np
import pandas as pd
import pyvo
from astropy.time import Time
from pyvo.dal import DALQueryError

try:
    import sqlalchemy
    from psycopg import ProgrammingError

    HAS_SQLALCHEMY = True
except ModuleNotFoundError:
    HAS_SQLALCHEMY = False

from .augment_visits import augment_visits

logger = logging.getLogger(__name__)

__all__ = ["ConsDbTap", "ConsDbFastAPI", "ConsDbSql"]


class ConsDb:

    def query(self, query: str) -> pd.DataFrame:
        """The query method is implemented in the child classes,
        according to the specific service/interface used to access the ConsDB.
        """
        logger.error("Use a specific-service version of the ConsDB.")
        logger.error("Query is implemented in these classes only.")
        raise NotImplementedError

    def get_visits(
        self,
        instrument: str,
        t_start: Time | None = None,
        t_end: Time | None = None,
        visit_constraint: str | None = None,
        augment: bool = True,
    ) -> pd.DataFrame:
        """Fetch visit and quicklook values from the ConsDB.

        Parameters
        ----------
        instrument
            The instrument to search for.
            Typical values would include lsstcomcam, latiss, and lsstcam.
            See https://sdm-schemas.lsst.io/ for more details.
        t_start
            The earliest time to match obs_start.
        t_end
            The latest time to match obs_start.
        visit_constraint
            A constraint to apply to the cdb_{instrument}.visit1 table.
            Example: `"visit1.science_program = 'BLOCK-365'"`
        augment
            If True, immediately call `augment_visits.augment_visits`
            after fetching visit1 and visit1_quicklook values from the ConsDB.

        Returns
        -------
        visits : `pd.DataFrame`
            The visit information from cdb_{instrument}.visit1 and
            cdb_{instrument}.visit1_quicklook (if available).
            Additional information may be added, such as `visit_gap`
            if `augment_visits` is True.
        """

        query = (
            f"select visit1.*, visit1_quicklook.* "
            f"from  cdb_{instrument}.visit1 as visit1 "
            f"left join cdb_{instrument}.visit1_quicklook as visit1_quicklook "
            f"on visit1.visit_id = visit1_quicklook.visit_id "
        )
        constraint = []
        if t_start is not None:
            constraint.append(f" visit1.obs_start_mjd >= {t_start.mjd} ")
        if t_end is not None:
            constraint.append(f" visit1.obs_start_mjd <= {t_end.mjd} ")
        if visit_constraint is not None:
            constraint.append(f" ({visit_constraint}) ")
        constraint_str = "and".join(constraint)
        if len(constraint_str) > 0:
            query = query + f" where {constraint_str}"
        query += " order by visit1.visit_id"
        logger.debug(f"Query executed: {query}")
        visits = self.query(query)

        if len(visits) == 0:
            logger.info(f"No visits for {instrument} retrieved from consdb")
            return pd.DataFrame([])

        if augment:
            visits = augment_visits(visits, instrument=instrument)
        return visits

    def query_ccdvisits(
        self,
        instrument: str,
        visit_id: int,
        detector_min: int | None = None,
        detector_max: int | None = None,
    ) -> pd.DataFrame:
        """Fetch ccdvisit data.

        Parameters
        ----------
        instrument
            The instrument to search for.
            Typical values would include lsstcomcam, latiss, and lsstcam.
            See https://sdm-schemas.lsst.io/ for more details.
        visit_id
            The visit for which to fetch the detector values.
        detector_min, detector_max
            The minimum and maximum detector number to fetch.
            Values of None will fetch all detectors.
            Values of `detector_min=90, detector_max=98` will fetch
            the center raft.

        Returns
        -------
        ccdvisits : `pd.DataFrame`
            The visit information from cdb_{instrument}.visit1 and the
            per-detector ccdvisit information.
        """
        query = (
            f"select v.*, c.detector, cq.* "
            f"from cdb_{instrument}.visit1 as v join cdb_{instrument}.ccdvisit1 as c "
            f"on v.visit_id = c.visit_id  "
            f"left join cdb_{instrument}.ccdvisit1_quicklook as cq "
            f"on c.ccdvisit_id = cq.ccdvisit_id "
            f"where v.visit_id = {visit_id}"
        )
        if detector_min is not None:
            query += f" and c.detector >= {detector_min}"
        if detector_max is not None:
            query += f" and c.detector <= {detector_max}"
        ccdvisits = self.query(query)

        return ccdvisits


[docs] class ConsDbTap(ConsDb): """Query the ConsDB through the TAP service. Parameters ---------- api_base Base API for services. e.g. https://usdf-rsp.slac.stanford.edu token The token for authentication. query_timeout Seconds to wait for a query to complete, in remote service. Notes ----- This class provides a `pyvo.dal.TAPService` connection to the Consdb, accessible at `ConsDbTap.tap`. """ def __init__(self, api_base: str, token: str, query_timeout: float = 10 * 60): url = api_base + "/api/consdbtap" self.query_timeout = query_timeout cred = pyvo.auth.CredentialStore() cred.set_password("x-oauth-basic", token) self.credential = cred.get("ivo://ivoa.net/sso#BasicAA") self.tap = pyvo.dal.TAPService(url, session=self.credential) def __repr__(self) -> str: return self.tap.baseurl
[docs] def query(self, query: str) -> pd.DataFrame: """Execute TAP ConsDB query. Parameters ---------- query SQL query. Returns ------- results : `pd.DataFrame` """ job = self.tap.submit_job(query) job.run() job.wait(phases=["COMPLETED", "ERROR", "ABORTED"], timeout=self.query_timeout) if job.phase == "COMPLETED": results = job.fetch_result().to_table().to_pandas() else: try: job.raise_if_error() except DALQueryError as e: logger.error(e) results = pd.DataFrame([]) return results
[docs] async def async_query(self, query: str) -> pd.DataFrame: """PyvoTapService handles async queries outside of asyncio. Use the `tap` attribute directly. """ logger.error( "async queries via TAP should be handled by interacting" "with the ConsDbTAP.tap service directly." ) raise NotImplementedError
[docs] class ConsDbFastAPI(ConsDb): """Query the ConsDB through the REST API / FastAPI interface. Parameters ---------- api_base Base API for services. e.g. https://usdf-rsp.slac.stanford.edu auth The username and password for authentication. query_timeout Seconds to wait for the query to return data. """ # From within the USDF RSP, you could also use # http://consdb-pq.consdb:8080/ for the ConsDB api_base. # This may be slightly faster without F5 load balancer packet checking. def __init__(self, api_base: str, auth: tuple, query_timeout: float = 10 * 60) -> None: self.base_url = api_base + "/consdb" timeout = httpx.Timeout(timeout=query_timeout, connect=60.0) transport = httpx.HTTPTransport(retries=2) self.httpx_client = httpx.Client( base_url=self.base_url, timeout=timeout, transport=transport, auth=auth ) atransport = httpx.AsyncHTTPTransport(retries=2) self.async_client = httpx.AsyncClient( base_url=self.base_url, timeout=timeout, transport=atransport, auth=auth ) def __del__(self) -> None: self.httpx_client.close() def __repr__(self) -> str: return self.base_url def _to_pandas(self, messages: dict) -> pd.DataFrame: # Turn json dictionary returned from consdb into pandas dataframe # De-duplicate columns (assumes they are identical). # Non-duplication breaks later groupby. results = pd.DataFrame(messages["data"], columns=messages["columns"]) # Check for duplicate columns. indices = np.where(pd.Series(results.columns.duplicated()))[0] newcols = results.columns.to_list() for i in indices: newcols[i] = newcols[i] + "_duplicate" # Have to change only some instances of the duplicates results.columns = newcols results.drop(results.columns[indices], axis=1, inplace=True) return results
[docs] def query(self, query: str) -> pd.DataFrame: """Execute synchronous FastAPI ConsDB query. Parameters ---------- query SQL query. Returns ------- results : `pd.DataFrame` """ params = {"query": query} try: response = self.httpx_client.post("/query", json=params) # We add this little test here because sometimes the consdb # FastAPI connections to the consdb itself fall asleep. if response.status_code == 500: try: sql_problems = response.json()["message"].replace("\n\n", "\n") if "OperationalError" in sql_problems: # Just try again - consdb to FastAPI fell asleep? logger.info( f"Consdb Operational error at " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')} " f"- trying again." ) response = self.httpx_client.post("/query", json=params) except JSONDecodeError: error_message = "SQL query error, failing in sqlalchemy not postgres." error_message += " A common issue might be using a single % for wildcards, instead of %%." logger.error(error_message) response.raise_for_status() except httpx.RequestError as exc: error_message = f"An error occurred while requesting {exc.request.url!r}.\n" error_message += ( f"Error at UTC time " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')}" ) logger.error(error_message) except httpx.HTTPStatusError as exc: # This might be a problem with the server closing the connection # Or it might be a problem with the sql query. # All messages from the database are in the response. error_message = ( f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.\n" ) try: sql_problems = response.json()["message"].replace("\n\n", "\n") error_message += f"{sql_problems}\n" except Exception: pass error_message += ( f"Error at UTC time " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')}" ) logger.error(error_message) if response.status_code != 200: messages = dict() else: messages = response.json() if len(messages) > 0: results = self._to_pandas(messages) else: results = pd.DataFrame([]) return results
[docs] async def async_query(self, query: str) -> pd.DataFrame: """Execute asynchronous FastAPI ConsDB query. Parameters ---------- query SQL query. Returns ------- results : `pd.DataFrame` """ params = {"query": query} try: response = await self.async_client.post("/query", json=params) if response.status_code == 500: sql_problems = response.json()["message"].replace("\n\n", "\n") if "OperationalError" in sql_problems: # Just try again - consdb to FastAPI fell asleep? logger.info( f"Consdb Operational error at " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')} " f"- trying again." ) response = await self.async_client.post("/query", json=params) response.raise_for_status() except httpx.RequestError as exc: error_message = f"An error occurred while requesting {exc.request.url!r}.\n" error_message += ( f"Error at UTC time " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')}" ) logger.error(error_message) except httpx.HTTPStatusError as exc: # This might be a problem with the server closing the connection # Or it might be a problem with the sql query. # All messages from the database are in the response. error_message = ( f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.\n" ) try: sql_problems = response.json()["message"].replace("\n\n", "\n") error_message += f"{sql_problems}\n" except Exception: pass error_message += ( f"Error at UTC time " f"{datetime.datetime.now(datetime.UTC).strftime('%Y-%m-%d %H:%M:%S')}" ) logger.error(error_message) if response.status_code != 200: messages = dict() else: messages = response.json() if len(messages) > 0: results = self._to_pandas(messages) else: results = pd.DataFrame([]) return results
[docs] class ConsDbSql(ConsDb): """Query the ConsDB through pandas with a SQLAlchemy Postgres connection. Parameters ---------- site Two options for site, to connect directly to the postgres servers, either 'usdf' or 'summit'. Note that these postgres servers are not exposed outside of the USDF or Summit; you must use one of the other ConsDb query services in that case. connection_string Optional kwarg to explicitly set the connection string instead of using the defaults for 'usdf' or 'summit'. Notes ----- Credentials must be available in ~/.lsst/postgres-credentials.txt For access external to the USDF, base or summit, a different method must be used. """ def __init__(self, site: str = "usdf", connection_string: str | None = None) -> None: # Authentication for the native postgres connection is via # credentials in ~/.lsst/postgres-credentials.txt if not HAS_SQLALCHEMY: logging.warning( "Cannot use ConsDbSql class without installing " "SQLAlchemy. Please install sqlalchemy or use a different method." ) return if connection_string is None: if site.lower() == "summit": self.conn_str = "postgresql+psycopg://usdf@postgresdb01.cp.lsst.org/exposurelog" else: self.conn_str = "postgresql+psycopg://usdf@usdf-summitdb-logical-replica-svc.sdf.slac.stanford.edu/exposurelog" else: self.conn_str = connection_string self.engine = sqlalchemy.create_engine(self.conn_str) self.conn = self.engine.connect() def __del__(self) -> None: self.conn.close() self.engine.dispose() def __repr__(self) -> str: return self.conn_str
[docs] def query(self, query: str) -> pd.DataFrame: """Execute a SQL query Parameters ---------- query : `str` SQL query. Returns ------- results : `pd.DataFrame` """ try: result = pd.read_sql(query, self.conn) except ProgrammingError as e: self.conn.rollback() logger.error(e) result = pd.DataFrame([]) return result
[docs] async def async_query(self, query: str) -> pd.DataFrame: """The ConsDbSql client uses sqlalchemy + pandas.read_sql. Async queries are unavailable. """ logger.error("Using sqlalchemy + pandas; async unavailable.") raise NotImplementedError