import json
import logging
import os
import urllib.parse
from typing import Any, Callable, Optional
import requests
from pyeodh import consts
from pyeodh.ades import Ades
from pyeodh.resource_catalog import CatalogService
from pyeodh.types import Headers, Params, RequestMethod
from pyeodh.utils import is_absolute_url
from pyeodh.workspace import Workspace
logger = logging.getLogger(__name__)
def _encode_json(data: dict) -> tuple[str, str]:
return "application/json", json.dumps(data)
[docs]
class Client:
def __init__(
self,
base_url: Optional[str] = None,
username: Optional[str] = None,
token: Optional[str] = None,
) -> None:
"""Initialize the client.
Args:
base_url (str, optional): Base URL of the EODH API. If not provided,
the EODH_BASE_URL environment variable is used. Defaults to EODH
production URL.
username (str, optional): Username for authentication. Defaults to None.
token (str, optional): Token for authentication. Defaults to None.
"""
self.url_base = base_url or os.getenv("EODH_BASE_URL", consts.API_BASE_URL)
if not is_absolute_url(self.url_base):
raise ValueError(
"base_url parameter or EODH_BASE_URL environment variable must be an "
"absolute URL"
)
self.username = username
self.token = token
self.environment = self._get_environment(self.url_base)
self._build_session()
self.workspace = Workspace(self)
def _get_environment(self, url: str) -> str:
"""Get the environment from the base URL.
Returns:
str: The environment
"""
env = urllib.parse.urlparse(url).netloc.split(".")[0]
logger.debug(f"Environment: {env}")
if env == "eodatahub": # prod environment doesn't have a subdomain
return consts.Environment.PRODUCTION.value
if env not in set(e.value for e in consts.Environment):
raise ValueError(f"Invalid environment: {env}")
return env
def _build_session(
self,
) -> None:
self._session = requests.Session()
if self.token is not None:
self._session.headers.update({"Authorization": f"Bearer {self.token}"})
def _request_raw(
self,
method: RequestMethod,
url: str,
headers: Optional[Headers] = None,
params: Optional[Params] = None,
data: Optional[Any] = None,
encode: Callable[[Any], tuple[str, Any]] = _encode_json,
) -> requests.Response:
"""Make a raw request.
Args:
method (RequestMethod): HTTP method to use
url (str): Target URL
headers (Optional[Headers], optional): Headers to send with the request.
Defaults to None.
params (Optional[Params], optional): Query parameters to send with the
request. Defaults to None.
data (Optional[Any], optional): Raw data to send with the request. Data is
encoded by the `encode` function before sending. Defaults to None.
encode (Callable[[Any], tuple[str, Any]], optional): Function to encode the
data. Has to return a tuple of (content-type string, encoded data).
Defaults to _encode_json.
Returns:
requests.Response: Response from the request
"""
logger.debug(f"_request_raw received {locals()}")
if not is_absolute_url(url):
logger.debug(f"Received not absolute url: {url}")
url = urllib.parse.urljoin(self.url_base, url)
logger.debug(f"Created url from base: {url}")
if ".org.uk/search" in url:
url = url.replace(".org.uk/search", ".org.uk/api/catalogue/stac/search")
headers = Headers() if headers is None else headers
encoded_data = None
if data is not None:
headers["Content-Type"], encoded_data = encode(data)
logger.debug(
f"Making request: {method} {url}\nheaders: {headers}\nparams: {params}"
f"\nbody: {encoded_data}"
)
response = self._session.request(
method,
url,
headers=headers,
params=params,
data=encoded_data,
)
logger.debug(
f"Received response {response.status_code}\nheaders: {response.headers}"
f"\ncontent: {response.text}"
)
# TODO consider moving this to _requst_json() and raise own exceptions
# so that we can user _raw in e.g. delete methods where we expect a 409 and
# want to recover
response.raise_for_status()
return response
def _request_json(
self,
method: RequestMethod,
url: str,
headers: Optional[Headers] = None,
params: Optional[Params] = None,
data: Optional[Any] = None,
encode: Callable[[Any], tuple[str, Any]] = _encode_json,
) -> tuple[Headers, Any]:
"""Make a request and return the headers and deserialized JSON data. Input data
can be anything, not just JSON, providing an `encode` function is supplied. Use
when expecting JSON data in the response.
Args:
method (RequestMethod): HTTP method to use
url (str): Target URL
headers (Optional[Headers], optional): Headers to send with the request.
Defaults to None.
params (Optional[Params], optional): Query parameters to send with the
request. Defaults to None.
data (Optional[Any], optional): Raw data to send with the request. Data is
encoded by the `encode` function before sending. Defaults to None.
encode (Callable[[Any], tuple[str, Any]], optional): Function to encode the
data. Has to return a tuple of (content-type string, encoded data).
Defaults to _encode_json.
Returns:
tuple[Headers, Any]: Response headers and deserialized JSON data
"""
response = self._request_raw(method, url, headers, params, data, encode)
if not len(response.text):
return response.headers, None
return response.headers, json.loads(response.text)
[docs]
def get_catalog_service(self) -> CatalogService:
"""Initializes the resource catalog API client.
Calls: GET /api/catalogue/stac
Returns:
CatalogService: Object representing the Resource catalog service.
"""
headers, data = self._request_json("GET", "/api/catalogue/stac")
return CatalogService(self, headers, data)
[docs]
def get_ades(self) -> Ades:
"""Initializes the workflow execution service (ADES) client.
Calls: GET /ades/ogc-api
Returns:
Ades: Object representing the ADES.
"""
if self.username is None or self.token is None:
raise ValueError(
"Valid username and token required for accessing protected API "
"endpoints."
)
data = {
"links": [
{
"rel": "self",
"href": (
f"{self.url_base}/api/catalogue/stac/catalogs/user/catalogs/"
f"{self.username}"
),
}
]
}
return Ades(self, Headers(), data)