import logging
import os
from urllib.parse import urlparse
from .consdb_query import ConsDbFastAPI, ConsDbTap
from .influx_query import InfluxQueryClient
from .logging_query import ExposureLogClient, NarrativeLogClient, NightReportClient
from .reference_values import API_ENDPOINTS
__all__ = ["get_access_token", "get_clients", "usdf_lfa"]
logger = logging.getLogger(__name__)
DEFAULT_TOKENFILE = "usdf_rsp"
[docs]
def get_access_token(tokenfile: str | None = None) -> str:
"""Retrieve RSP access token.
Parameters
----------
tokenfile
Path to the RSP token file. See documentation on RSP tokens at
https://rsp.lsst.io/guides/auth/creating-user-tokens.html
The token will be read from the tokenfile if available.
If tokenfile is None, then further attempts will be made to
access the token value from:
1. `lsst.rsp.get_access_token`
2. the environment variable "ACCESS_TOKEN"
3. the environment variable "ACCESS_TOKEN_FILE"
4. the home directory + '.lsst' + DEFAULT_TOKENFILE
If no RSP token is available, access to most services will not
be available.
Returns
-------
token : `str`
Token value.
A zero-length token will not be valid for use.
Notes
-----
RSP access tokens are unique to the different RSP sites, and the
services which run on a particular site must receive tokens from the
same site.
"""
token = None
# First - tokenfile explicitly provided.
if tokenfile is not None:
with open(tokenfile, "r") as f:
token = f.read().strip()
else:
logger.debug("Tokenfile not specified.")
# Second - are we at an RSP and should use lsst.rsp.get_access_token
try:
import lsst.rsp.get_access_token as rsp_get_access_token
token = rsp_get_access_token(tokenfile=tokenfile)
except ImportError:
# Not on an RSP.
logger.debug("Attempt to import lsst.rsp.get_access_token failed.")
pass
# Third - try environment variable ACCESS_TOKEN (containing token)
if token is None:
token = os.environ.get("ACCESS_TOKEN", None)
# Fourth - try environment variable ACCESS_TOKEN_FILE (file location)
if token is None:
logger.debug("$ACCESS_TOKEN not set.")
tokenfile = os.environ.get("ACCESS_TOKEN_FILE", None)
if tokenfile is not None:
logger.debug(f"Checking $ACCESS_TOKEN_FILE {tokenfile}")
# Try to read this, but an error is not an exception.
try:
with open(tokenfile, "r") as f:
token = f.read().strip()
except FileNotFoundError:
logger.debug(f"{tokenfile} does not exist.")
pass
# Fifth - try a default home directory location.
if token is None:
logger.debug("$ACCESS_TOKEN_FILE not set.")
tokenfile = os.path.join(os.path.expanduser("~"), ".lsst", DEFAULT_TOKENFILE)
logger.debug(f"Checking {tokenfile}")
# Try to read this, but an error is not an exception.
try:
with open(tokenfile, "r") as f:
token = f.read().strip()
except FileNotFoundError:
logger.debug(f"{tokenfile} does not exist.")
pass
# Final check on token value, in order to issue warning.
if token is None:
token = ""
logging.error("No RSP token found.")
return token
[docs]
def get_clients(
tokenfile: str | None = None,
site: str | None = None,
auth_token: str | None = None,
) -> dict:
"""Return a wide set of site-specific client connections.
Parameters
----------
tokenfile
Path to the RSP tokenfile. See also `get_access_token`.
Can be None if one of other methods to set token will be successful.
site
Override site location to a preferred site.
Most likely to be used to specify `usdf-dev` vs `usdf`.
auth_token
The bare authentication token string.
If not None, this will override any tokenfile argument.
Useful in services running behind Gafaelfawr authentication.
Returns
-------
endpoints : `dict`
Dictionary with `efd`, `obsenv`, `sasquatch`,
`narrative_log`, `exposure_log`, `night_log`, and `consdb`
connection information.
Note
----
The authentication token required to access the log services
is an RSP token, and is RSP site-specific (including usdf vs usdf-dev).
For users outside the RSP, a token can be created as described in
https://rsp.lsst.io/v/usdfprod/guides/auth/creating-user-tokens.html
"""
# For more information on rubin tokens see DMTN-234.
# For information on scopes, see DMTN-235.
if auth_token is not None:
# Override and use provided token as-is.
token = auth_token
else:
# Set up authentication
token = get_access_token(tokenfile)
auth = ("user", token)
if site is None:
# Guess site from EXTERNAL_INSTANCE_URL (set for RSPs)
location = os.getenv("EXTERNAL_INSTANCE_URL", "")
if "summit-lsp" in location:
site = "summit"
elif "base-lsp" in location:
site = "base"
elif "usdf-rsp-int" in location:
site = "usdf-int"
elif "usdf-rsp-dev" in location:
site = "usdf-dev"
elif "usdf-rsp" in location:
site = "usdf"
elif "base" in location:
site = "base"
# Otherwise, use the USDF resources, outside of the RSP
if site is None:
site = "usdf"
else:
site = site
if site not in API_ENDPOINTS:
raise ValueError(f"Site {site} must be in {list(API_ENDPOINTS.keys())}")
api_base = API_ENDPOINTS[site]
narrative_log = NarrativeLogClient(api_base, auth)
exposure_log = ExposureLogClient(api_base, auth)
night_report = NightReportClient(api_base, auth)
consdb_query = ConsDbFastAPI(api_base, auth)
consdb_tap = ConsDbTap(api_base, token=token)
# We'll pass along the auth for the InfluxQueryClients
# although there's still work to be done on auth + service site.
efd_client = InfluxQueryClient(site, db_name="efd", auth=auth)
obsenv_client = InfluxQueryClient(site, db_name="lsst.obsenv", auth=auth)
pp_client = InfluxQueryClient(site, db_name="lsst.prompt", auth=auth)
# Some special clients that are site-agnostic (only one location)
too_client = InfluxQueryClient("summit", db_name="lsst.scimma")
# Be extra helpful with environment variables if using USDF for LFA
if "usdf" in site:
# And some env variables for S3 through USDF
os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"
os.environ["S3_ENDPOINT_URL"] = "https://s3dfrgw.slac.stanford.edu/"
# Or if you're actually using one of the USDF RSPs (or kubernetes)
if "usdf" in os.getenv("EXTERNAL_INSTANCE_URL", ""):
if os.getenv("RUBIN_SIM_DATA_DIR") is None:
# Use shared RUBIN_SIM_DATA_DIR
os.environ["RUBIN_SIM_DATA_DIR"] = "/sdf/data/rubin/shared/rubin_sim_data"
endpoints = {
"api_base": api_base,
"efd": efd_client,
"obsenv": obsenv_client,
"pp": pp_client,
"too": too_client,
"consdb": consdb_query,
"consdb_tap": consdb_tap,
"narrative_log": narrative_log,
"exposure_log": exposure_log,
"night_report": night_report,
}
logger.info(f"Endpoint base url: {endpoints['api_base']}")
return endpoints
[docs]
def usdf_lfa(uri: str, bucket: str = "s3://lfa@") -> str:
"""Convert LFA uri recorded in the EFD to a version accessible at USDF.
Parameters
----------
uri : `str`
The URI written into the EFD from the summit.
bucket : `str`
The bucket access at the USDF.
Returns
-------
uri : `str`
The LFA uri at USDF.
"""
filekey = urlparse(uri).path.lstrip("/")
return bucket + filekey