Source code for pyeodh.resource_catalog

from __future__ import annotations

import logging
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Literal, TypeVar

import pystac
import pystac.catalog
from ceda_datapoint.core.cloud import DataPointCloudProduct, DataPointCluster
from ceda_datapoint.core.item import identify_cloud_type
from owslib.wmts import WebMapTileService
from pystac import Extent, RelType, STACObject, STACTypeError, Summaries
from pystac.asset import Asset
from pystac.provider import Provider

from pyeodh import consts
from pyeodh.eodh_object import EodhObject, is_optional
from pyeodh.pagination import PaginatedList
from pyeodh.types import Headers, SearchFields, SearchSortField
from pyeodh.utils import ConformanceError, join_url, remove_null_items

if TYPE_CHECKING:
    # avoids conflicts since there are also kwargs and attrs called `datetime`
    from datetime import datetime as Datetime

    from pyeodh.client import Client

logger = logging.getLogger(__name__)


C = TypeVar("C", bound="STACObject")


[docs] class Conformance(Enum): TRANSACTION_EXTENSION = ( "https://api.stacspec.org/v1.0.0/ogcapi-features/extensions/transaction" )
[docs] class Item(EodhObject): _pystac_object: pystac.Item def __init__(self, client: Client, headers: Headers, data: Any, **kwargs): super().__init__(client, headers, data, pystac.Item, **kwargs) def _set_props(self, obj: pystac.Item) -> None: self.id = obj.id self.geometry = obj.geometry self.bbox = obj.bbox self.datetime = obj.datetime self.properties = obj.properties self.collection = obj.collection_id self.assets = obj.assets
[docs] def delete(self) -> None: """Delete this item. Calls: DELETE /catalogs/{catalog_id}/collections/{collection_id}/items/{item_id} """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) self._client._request_raw("DELETE", self._pystac_object.self_href)
[docs] def update( self, geometry: dict[str, Any] | None = None, bbox: list[float] | None = None, datetime: Datetime | None = None, properties: dict[str, Any] | None = None, collection: str | pystac.Collection | None = None, assets: dict[str, Any] | None = None, ) -> None: """Update this item's attributes with new values. Only provide the values you want to update. Calls: PUT /catalogs/{catalog_id}/collections/{collection_id}/items/{item_id} Args: geometry (dict[str, Any] | None, optional): Geometry. Defaults to None. bbox (list[float] | None, optional): Bounding box. Defaults to None. datetime (Datetime | None, optional): Datetime. Defaults to None. properties (dict[str, Any] | None, optional): Properties. Defaults to None. collection (str | pystac.Collection | None, optional): Collection. Defaults to None. assets (dict[str, Any] | None, optional): Assets. Defaults to None. """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) put_data = remove_null_items( { "id": self.id, "geometry": geometry or self.geometry, # ! getting 500 with this "bbox": bbox or self.bbox, "datetime": datetime or self.datetime, "properties": properties or self.properties, "collection": collection or self.collection, "assets": assets or self.assets, } ) _, resp_data = self._client._request_json( "PUT", self._pystac_object.self_href, data=put_data ) if resp_data: self._set_props(self._pystac_object.from_dict(resp_data))
[docs] def get_cloud_products( self, ) -> DataPointCloudProduct | DataPointCluster | list[DataPointCloudProduct]: """Retrieve the cloud product(s) attributed to this item. Feature added from the CEDA DataPoint library to access cloud products as either an individual product object or a set of products represented by a cluster. See the documentation for using cloud products and clusters at: https://cedadev.github.io/datapoint/usage.html Typical usage: product = item.get_cloud_products() ds = product.open_dataset() # Continue with the `ds` xarray.Dataset """ products = [] # Iterate over assets in this item for id, asset in self.assets.items(): cf = identify_cloud_type(id, asset) if cf is None: continue products.append( DataPointCloudProduct( asset.to_dict(), # Convert Asset to dict id=f"{self.id}-{id}", cf=cf, meta={"bbox": self.bbox}, properties=self.properties, ) ) if len(products) == 0: return [] if len(products) == 1: return products[0] # Could also just return the list. # See https://cedadev.github.io/datapoint/cloud_formats.html # for reasons to use the cluster. return DataPointCluster( products, parent_id=f"{self.id}-cluster", meta={"bbox": self.bbox} )
[docs] def commercial_data_order( self, product_bundle: str, workspace: str | None = None, aoi: list[list[list[float]]] | None = None, extra_data: dict | None = None, ): """Order commercial data. Available only for specific catalogs. Args: workspace (str, optional): The workspace to order the data from. Defaults to the username pyeodh client was initialized with. product_bundle (str): The product bundle to order from the commercial data provider. aoi (list[float] | None, optional): Coordinates to limit the AOI of the item for purchase where possible. Given in the same nested format as STAC. Defaults to None. extra_data (dict | None, optional): A placeholder for future data options to include in the item. Defaults to None. """ if workspace is None: workspace = self._client.username api_url = ( f"/api/catalogue/manage/catalogs/user-datasets/{workspace}/commercial-data" ) payload: dict[str, Any] = { "url": self._pystac_object.self_href, "product_bundle": product_bundle, } if aoi: payload["coordinates"] = aoi if extra_data: payload["extra_data"] = extra_data _, data = self._client._request_json("POST", api_url, data=payload) logger.info(f"Commercial data ordered: {data.get('message')}")
[docs] class Collection(EodhObject): _pystac_object: pystac.Collection def __init__(self, client: Client, headers: Headers, data: Any, **kwargs): super().__init__(client, headers, data, pystac.Collection, **kwargs) def _set_props(self, obj: pystac.Collection) -> None: self.id = obj.id self.description = obj.description self.extent = obj.extent self.title = obj.title self.license = obj.license self.keywords = obj.keywords self.providers = obj.providers self.summaries = obj.summaries self.assets = obj.assets @cached_property def items_href(self) -> str: """URL pointing to items endpoint.""" link = self._pystac_object.get_single_link(RelType.ITEMS) if not link: raise RuntimeError("Object does not have items link!") return link.href
[docs] def get_items(self) -> PaginatedList[Item]: """Fetches all items within a collection. Calls: GET /catalogs/{catalog_id}/collections/{collection_id}/items Returns: PaginatedList[Item]: Iterable list of items. Automatically handles paginated results. """ return PaginatedList( Item, self._client, "GET", self.items_href, "features", params={"limit": consts.PAGINATION_LIMIT}, parent=self, )
[docs] def get_item(self, item_id: str) -> Item: """Fetches a collection item. Calls: GET /catalogs/{catalog_id}/collections/{collection_id}/items/{item_id} Args: item_id (str): ID of a collection item. Returns: Item: Initialized item object. """ url = join_url(self.items_href, item_id) headers, response = self._client._request_json("GET", url) return Item(self._client, headers, response, parent=self)
[docs] def update( self, description: str | None = None, extent: Extent | None = None, title: str | None = None, license: str | None = None, keywords: list[str] | None = None, providers: list[Provider] | None = None, summaries: Summaries | None = None, assets: dict[str, Asset] | None = None, ) -> None: """Update collection attributes with new values. Provide only the values you want to update. Calls: PUT /catalogs/{catalog_id}/collections/{collection_id} Args: description (str | None, optional): Description. Defaults to None. extent (Extent | None, optional): Extent. Defaults to None. title (str | None, optional): Title. Defaults to None. license (str | None, optional): License. Defaults to None. keywords (list[str] | None, optional): Keywords. Defaults to None. providers (list[Provider] | None, optional): Providers. Defaults to None. summaries (Summaries | None, optional): Summaries. Defaults to None. assets (dict[str, Asset] | None, optional): Assets. Defaults to None. """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) assert is_optional(description, str), description assert is_optional(extent, Extent), extent assert is_optional(title, str), title assert is_optional(license, str), license assert is_optional(keywords, list), keywords assert is_optional(providers, list), providers assert is_optional(summaries, Summaries), summaries assert is_optional(assets, dict), assets put_data = remove_null_items( { "id": self.id, "description": description or self.description, "extent": extent or self.extent, "title": title or self.title, "license": license or self.license, "keywords": keywords or self.keywords, "providers": providers or self.providers, "summaries": summaries or self.summaries, "assets": assets or self.assets, } ) _, resp_data = self._client._request_json( "PUT", self._pystac_object.self_href, data=put_data ) if resp_data: self._set_props(self._pystac_object.from_dict(resp_data))
[docs] def delete(self) -> None: """Delete this collection. Calls: DELETE /catalogs/{catalog_id}/collections/{collection_id} """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) self._client._request_raw("DELETE", self._pystac_object.self_href)
[docs] def create_item( self, id: str, geometry: dict[str, Any] | None, bbox: list[float] | None, datetime: Datetime | None, properties: dict[str, Any] | None, collection: str | pystac.Collection | None = None, assets: dict[str, Any] | None = None, ) -> Item: """Create an item as part of this collection. Calls: POST /catalogs/{catalog_id}/collections/{collection_id}/items/{item_id} Args: id (str): A unique ID. geometry (dict[str, Any] | None): Geometry bbox (list[float] | None): Bounding box datetime (Datetime | None): Datetime properties (dict[str, Any] | None): Properties collection (str | pystac.Collection | None, optional): ID of a parent collection. Defaults to None. assets (dict[str, Any] | None, optional): Assets. Defaults to None. Returns: Item: _description_ """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) post_data = remove_null_items( { "id": id, "geometry": geometry, "bbox": bbox, "datetime": datetime, "properties": properties, "collection": collection, "assets": assets, } ) headers, response = self._client._request_json( "POST", self.items_href, data=post_data ) return Item(self._client, headers, response, parent=self)
[docs] class Catalog(EodhObject): _pystac_object: pystac.Catalog def __init__(self, client: Client, headers: Headers, data: Any, **kwargs): super().__init__(client, headers, data, pystac.Catalog, **kwargs) def _set_props(self, obj: pystac.Catalog) -> None: self.id = obj.id self.description = obj.description self.title = obj.title self.conforms_to = obj.extra_fields.get("conformsTo", []).copy() @cached_property def collections_href(self) -> str: """URL pointing to collections endpoint.""" return join_url(self._pystac_object.self_href, "collections") @cached_property def wmts_href(self) -> str: """URL pointing to WMTS endpoint.""" return join_url(self._pystac_object.self_href, "wmts")
[docs] def get_catalogs(self) -> list[Catalog]: """Fetches children catalogs of this parent catalog. Calls: GET /catalogs/{catalog_path}/catalogs Returns: list[Catalog]: List of children catalogs. """ url = join_url(self._pystac_object.self_href, "catalogs") headers, data = self._client._request_json("GET", url) catalogs = [] for cat in data.get("catalogs"): try: catalogs.append(Catalog(self._client, headers, cat, parent=self)) except STACTypeError as e: logger.warning(f"{e} => Skipping") return catalogs
[docs] def get_collections(self) -> list[Collection]: """Fetches all resource catalog collections. Calls: GET /catalogs/{catalog_id}/collections Returns: list[Collection]: List of available collections """ headers, response = self._client._request_json("GET", self.collections_href) if not response: return [] return [ Collection(self._client, headers, item, parent=self) for item in response.get("collections", []) ]
[docs] def get_collection(self, collection_id: str) -> Collection: """Fetches a resource catalog collection. Calls: GET /catalogs/{catalog_id}/collections/{collection_id} Args: collection_id (str): ID of a collection Returns: Collection: Collection for given ID """ url = join_url(self.collections_href, collection_id) headers, response = self._client._request_json("GET", url) return Collection(self._client, headers, response, parent=self)
[docs] def create_collection( self, id: str, description: str, extent: Extent, title: str | None = None, license: str | None = None, keywords: list[str] | None = None, providers: list[Provider] | None = None, summaries: Summaries | None = None, assets: dict[str, Asset] | None = None, ) -> Collection: """Create a new collection inside a catalog. Calls: POST /catalogs/{catalog_id}/collections Args: id (str) description (str) extent (Extent) title (str | None, optional): Defaults to None. license (str | None, optional): Defaults to None. keywords (list[str] | None, optional): Defaults to None. providers (list[Provider] | None, optional): Defaults to None. summaries (Summaries | None, optional): Defaults to None. assets (dict[str, Asset] | None, optional): Defaults to None. Returns: Collection: An initialized collection object. """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) assert isinstance(id, str), id assert isinstance(description, str), description assert isinstance(extent, Extent), extent assert is_optional(title, str), title assert is_optional(license, str), license assert is_optional(keywords, list), keywords assert is_optional(providers, list), providers assert is_optional(summaries, Summaries), summaries assert is_optional(assets, dict), assets post_data = remove_null_items( { "id": id, "description": description, "extent": extent, "title": title, "license": license, "keywords": keywords, "providers": providers, "summaries": summaries, "assets": assets, } ) headers, response = self._client._request_json( "POST", self.collections_href, data=post_data ) return Collection(self._client, headers, response, parent=self)
[docs] def update( self, description: str | None = None, title: str | None = None, ) -> None: """Updates catalog. Calls: PUT /catalogs/{catalog_id} Args: description (str | None, optional): New description. Defaults to None. title (str | None, optional): New title. Defaults to None. """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) assert is_optional(description, str), description assert is_optional(title, str), title put_data = remove_null_items( { "id": self.id, "description": description or self.description, "title": title or self.title, } ) _, resp_data = self._client._request_json( "PUT", self._pystac_object.self_href, data=put_data ) if resp_data: self._set_props(self._pystac_object.from_dict(resp_data))
[docs] def delete(self) -> None: """Delete this catalog. Calls: DELETE /catalogs/{catalog_id} """ if not self.get_root().check_conforms_to( Conformance.TRANSACTION_EXTENSION.value ): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) self._client._request_raw("DELETE", self._pystac_object.self_href)
[docs] def search( self, limit: int = consts.PAGINATION_LIMIT, collections: list[str] | None = None, catalog_paths: list[str] | None = None, ids: list[str] | None = None, bbox: list[Any] | None = None, intersects: dict | None = None, datetime: str | None = None, fields: SearchFields | None = None, query: dict[str, Any] | list[str] | None = None, sort_by: list[SearchSortField] | None = None, filter: dict | None = None, filter_crs: str | None = None, filter_lang: Literal["cql-json", "cql2-json", "cql2-text"] | None = None, ) -> PaginatedList[Item]: assert isinstance(limit, int), limit assert is_optional(collections, list), collections assert is_optional(catalog_paths, list), catalog_paths assert is_optional(ids, list), ids assert is_optional(bbox, list), bbox # assert is_optional(intersects, dict), intersects assert is_optional(datetime, str), datetime assert is_optional(fields, dict), fields assert is_optional(query, (dict, list)), query assert is_optional(sort_by, list), sort_by assert is_optional(filter, dict), filter assert is_optional(filter_crs, str), filter_crs assert filter_lang in ["cql-json", "cql2-json", "cql2-text", None] if self.id == "planet" and catalog_paths is not None: logger.warning( "Catalog paths are not supported for planet catalog, ignoring." ) catalog_paths = None data = remove_null_items( { "limit": limit, "catalog_paths": catalog_paths, "collections": collections, "ids": ids, "bbox": bbox, "intersects": self._format_intersects(intersects), "datetime": datetime, "fields": fields, "query": self._make_query_dict(query), "sortby": sort_by, "filter": filter, "filter_crs": filter_crs, "filter_lang": filter_lang, } ) url = join_url(self._pystac_object.self_href, "search") return PaginatedList( Item, self._client, "POST", url, "features", first_data=data, parent=self )
[docs] def get_wmts(self) -> WebMapTileService: """Initializes the OWSLib WebMapTileService Returns: WebMapTileService: Initialized WMTS """ wmts = WebMapTileService(self.wmts_href) # Patch wmts object attribute error # see https://github.com/geopython/OWSLib/issues/572 for i, op in enumerate(wmts.operations): if not hasattr(op, "name"): wmts.operations[i].name = "" return wmts
[docs] class CatalogService(Catalog):
[docs] def get_collections(self) -> list[Collection]: """Fetches all resource catalog collections. Calls: GET /collections Returns: list[Collection]: List of available collections """ return super().get_collections()
[docs] def get_catalog(self, catalog_id: str) -> Catalog: """Fetches a catalog. Calls: GET /catalogs/{catalog_id} Args: catalog_id (str): Catalog ID Returns: Catalog: An initialized resource catalog object. """ url = join_url(self._pystac_object.self_href, "catalogs", catalog_id) headers, data = self._client._request_json("GET", url) return Catalog(self._client, headers, data, parent=self)
[docs] def get_catalogs(self) -> list[Catalog]: """Fetches all catalogs. Calls: GET /catalogs Returns: list[Catalog]: List of all catalogs available. """ return super().get_catalogs()
[docs] def create_catalog( self, id: str, description: str, title: str | None = None, ) -> Catalog: """Creates a new catalog Calls: POST /catalogs Args: id (str): New catalog ID description (str): Catalog description title (str | None, optional): Catalog title. Defaults to None. Returns: Catalog: An initialized catalog object. """ if not self.check_conforms_to(Conformance.TRANSACTION_EXTENSION.value): raise ConformanceError( f"{Conformance.TRANSACTION_EXTENSION.value}", ) assert isinstance(id, str), id assert isinstance(description, str), description assert is_optional(title, str), title post_data = remove_null_items( { "id": id, "description": description, "title": title, } ) headers, response = self._client._request_json( "POST", self.collections_href, data=post_data ) return Catalog(self._client, headers, response, parent=self)
[docs] def get_conformance(self) -> list[str]: """Fetches list of standards the API conforms to. Calls: GET /conformance Returns: list[str]: Standards. """ url = join_url(self._pystac_object.self_href, "conformance") _, response = self._client._request_json("GET", url) return response.get("conformsTo", [])
[docs] def ping(self) -> str | None: """Pings API. Calls: GET /_mgmt/ping Returns: str | None: Pong. """ headers, response = self._client._request_json( "GET", join_url(self._pystac_object.self_href, "_mgmt/ping") ) return response.get("message")
[docs] def check_conforms_to(self, conformance_uri: str | Conformance) -> bool: """Test if API conforms to a specification. Args: conformance_uri (str | Conformance): URI of the specification Returns: bool """ return conformance_uri in self.get_conformance()