Coverage for pds_crawler/transformer/pds_to_stac.py: 78%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""
7Module Name:
8 pds_to_stac
10Description:
11 the pds_to_stac module convert the PDS3 objects and records from ODE web service to a unique
12 STAC PDS catalog.
14Classes:
15 StacTransformer:
16 Abstract class.
17 StacRecordsTransformer :
18 Converts records from ODE webservice to PDS STAC catalog.
19 StacCatalogTransformer :
20 Converts PDS3 object from ODE archive to PDS STAC catalog (without items)
22Author:
23 Jean-Christophe Malapert
24"""
25import logging
26from abc import ABC
27from typing import Any
28from typing import cast
29from typing import Dict
30from typing import Iterable
31from typing import Iterator
32from typing import List
33from typing import Optional
34from typing import Union
36import pystac
38from ..exception import CrawlerError
39from ..extractor import PDSCatalogsDescription
40from ..extractor import PdsRecordsWs
41from ..load import Database
42from ..models import PdsRecordModel
43from ..models import PdsRecordsModel
44from ..models import PdsRegistryModel
45from ..report import MessageModel
46from ..utils import Observable
47from ..utils import ProgressLogger
48from .pds3_objects import StacPdsCollection
50logger = logging.getLogger(__name__)
53class StacTransformer(ABC, Observable):
54 """Abstract class for STAC transformation"""
56 def __init__(self, database: Database):
57 super().__init__()
58 if not isinstance(database, Database):
59 raise TypeError(
60 "[StacTransformer] must be initialized with database attribute of type Dtabase."
61 )
62 self.__database = database
64 @property
65 def database(self) -> Database:
66 """Returns the database
68 Returns:
69 Database: Database
70 """
71 return self.__database
74class StacRecordsTransformer(StacTransformer):
75 """Convert the records to STAC."""
77 def __init__(
78 self,
79 database: Database,
80 *args,
81 **kwargs,
82 ):
83 """Init the transformer of :class:`pds_crawler.extractor.PdsRecordModel`
84 provided by the response :class:`pds_crawler.extractor.PdsRecordsWs`
86 In addition, it is possible to pass a class by *report* keyword
87 to notify information to this class
89 Args:
90 database (Database): Database
91 """
92 super().__init__(database)
93 if kwargs.get("report"):
94 self.__report = kwargs.get("report")
95 self.subscribe(self.__report)
96 self.init()
98 def init(self):
99 self.__catalog: pystac.Catalog
101 @property
102 def catalog(self) -> pystac.Catalog:
103 """Return a pySTAC catalog
105 Returns:
106 pystac.Catalog: pySTAC catalog
107 """
108 return self.__catalog
110 def load_root_catalog(self):
111 """Loads the root catalog"""
112 self.database.stac_storage.refresh()
113 self.__catalog = cast(
114 pystac.Catalog, self.database.stac_storage.root_catalog
115 )
117 def _create_items_stac(
118 self,
119 pds_records: PdsRecordsWs,
120 pds_collection: PdsRegistryModel,
121 progress_bar: bool = True,
122 ) -> pystac.ItemCollection:
123 """Creates a collection of STAC items of records from a PDS collection.
125 The records are loaded from the local storage, handled by `PdsRecord`
127 Args:
128 pds_records (PdsRecordsWs): Object that handle Records
129 pds_collection (PdsRegistryModel): PDS collection data
130 progress_bar (bool, optional) : Set progress bar. Defaults to True
132 Returns:
133 pystac.ItemCollection: Collection of items
134 """
136 def create_items(
137 pages: Iterator[PdsRecordsModel],
138 pds_collection: PdsRegistryModel,
139 progress_bar: bool = True,
140 ) -> Iterable[pystac.Item]:
141 """Creates items
143 Args:
144 pages (Iterator[PdsRecordsModel]): the different pages of the web service response
145 pds_collection (PdsRegistryModel): information about the collection
146 progress_bar (bool, optional) : Set progress bar. Defaults to True
148 Returns:
149 Iterable[pystac.Item]: Items
151 Yields:
152 Iterator[Iterable[pystac.Item]]: Items
153 """
154 for page in pages:
155 pds_records_model = page.pds_records_model
156 with ProgressLogger(
157 total=len(pds_records_model),
158 iterable=pds_records_model,
159 logger=logger,
160 description="STAC Item objects creation",
161 position=2,
162 leave=False,
163 disable_tqdm=not progress_bar,
164 ) as pds_records_model_pbar:
165 for record in cast(
166 List[PdsRecordModel], pds_records_model_pbar
167 ):
168 if self.database.stac_storage.item_exists(record):
169 logger.warning(
170 f"this {record} exists in STAC directory, skip it"
171 )
172 continue
173 try:
174 yield record.to_stac_item(pds_collection)
175 except CrawlerError as err:
176 self.notify_observers(
177 MessageModel(record.ode_id, err)
178 )
180 pages: Iterator[
181 PdsRecordsModel
182 ] = pds_records.parse_pds_collection_from_cache(pds_collection)
183 return pystac.ItemCollection(
184 create_items(pages, pds_collection, progress_bar=progress_bar)
185 )
187 def _is_exist(
188 self, catlog_or_collection: Union[pystac.Catalog, pystac.Collection]
189 ) -> bool:
190 """Check if catlog_or_collection exists.
192 Args:
193 catlog_or_collection (Union[pystac.Catalog, pystac.Collection]): STAC catalog or collection
195 Returns:
196 bool: True when the catalog or the collection exists otherwise False
197 """
198 return catlog_or_collection is not None
200 def to_stac(
201 self,
202 pds_records: PdsRecordsWs,
203 pds_collections: List[PdsRegistryModel],
204 progress_bar: bool = True,
205 ):
206 """Create STAC catalogs with its children for all collections.
208 Args:
209 pds_records (PdsRecordsWs): Web service that handles the query to get the responses for a given collection
210 pds_collections (List[PdsRegistryModel]): All PDS collections data
211 progress_bar (bool, optional): Set progress bar. Defaults to True
212 """
213 # Create a progress logger to track the processing of collections
214 with ProgressLogger(
215 total=len(pds_collections),
216 iterable=pds_collections,
217 logger=logger,
218 description="Processing collection",
219 position=0,
220 disable_tqdm=not progress_bar,
221 ) as progress_logger:
222 # Iterate over each PDS collection and process it
223 for pds_collection in cast(
224 List[PdsRegistryModel], progress_logger
225 ):
226 progress_logger.write_msg(
227 f"Processing the collection {pds_collection}"
228 )
230 # Create items for the current collection
231 items_stac = self._create_items_stac(
232 pds_records, pds_collection, progress_bar=progress_bar
233 )
234 if len(items_stac.items) == 0:
235 progress_logger.write_msg(
236 "No new item, skip the STAC catalogs creation"
237 )
238 continue
239 else:
240 progress_logger.write_msg(
241 f"{len(items_stac.items)} items to add"
242 )
244 # load STAC collection if it exists
245 stac_collection = cast(
246 pystac.Collection,
247 self.catalog.get_child(
248 pds_collection.get_collection_id(), recursive=True
249 ),
250 )
252 # load STAC instrument if it exists
253 stac_instru = cast(
254 pystac.Catalog,
255 self.catalog.get_child(
256 pds_collection.get_instrument_id(), recursive=True
257 ),
258 )
260 # load STAC plateform if it exists
261 stac_host = cast(
262 pystac.Catalog,
263 self.catalog.get_child(
264 pds_collection.get_plateform_id(), recursive=True
265 ),
266 )
268 # load STAC mission if it exists
269 stac_mission: pystac.Catalog = cast(
270 pystac.Catalog,
271 self.catalog.get_child(
272 pds_collection.get_mission_id(), recursive=True
273 ),
274 )
276 # load STAC body if it exists
277 stac_body: pystac.Catalog = cast(
278 pystac.Catalog,
279 self.catalog.get_child(pds_collection.get_body_id()),
280 )
282 new_catalog: Optional[
283 Union[pystac.Catalog, pystac.Collection]
284 ] = None
286 # Create the STAC catalog for the body if it doesn't exist
287 if not self._is_exist(stac_body):
288 stac_body: pystac.Catalog = (
289 pds_collection.create_stac_body_catalog()
290 )
291 if new_catalog is None:
292 new_catalog = stac_body
293 self.catalog.add_child(stac_body)
295 # Create the STAC catalog for the mission if it doesn't exist
296 if not self._is_exist(stac_mission):
297 stac_mission: pystac.Catalog = (
298 pds_collection.create_stac_mission_catalog()
299 )
300 if new_catalog is None:
301 new_catalog = stac_mission
302 stac_body.add_child(stac_mission)
304 if not self._is_exist(stac_host):
305 stac_host: pystac.Catalog = (
306 pds_collection.create_stac_platform_catalog()
307 )
308 if new_catalog is None:
309 new_catalog = stac_host
310 stac_mission.add_child(stac_host)
312 if not self._is_exist(stac_instru):
313 stac_instru: pystac.Catalog = (
314 pds_collection.create_stac_instru_catalog()
315 )
316 if new_catalog is None:
317 new_catalog = stac_instru
318 stac_host.add_child(stac_instru)
320 if not self._is_exist(stac_collection):
321 stac_collection: pystac.Catalog = (
322 pds_collection.create_stac_collection()
323 )
324 if new_catalog is None:
325 new_catalog = stac_collection
326 stac_instru.add_child(stac_collection)
328 stac_collection.add_items(items_stac)
330 if new_catalog is None:
331 self.database.stac_storage.normalize_and_save(
332 stac_collection
333 )
334 parent = cast(
335 pystac.Collection, stac_collection.get_parent()
336 )
337 parent.save_object(include_self_link=False)
338 else:
339 self.database.stac_storage.normalize_and_save(new_catalog)
340 parent = cast(pystac.Catalog, new_catalog.get_parent())
341 parent.save_object(include_self_link=False)
343 def describe(self):
344 """Describes the STAC catalog and its children as a tree"""
345 self.catalog.describe()
347 def save(self):
348 """Nothing happens.
349 Averything is saved in to_stac method"""
350 pass
353class StacCatalogTransformer(StacTransformer):
354 """Converts the catalogs to STAC."""
356 def __init__(self, database: Database, *args, **kwargs):
357 """Initialises the object with database to get access to the data."""
358 super().__init__(database)
359 if kwargs.get("report"):
360 self.__report = kwargs.get("report")
361 self.subscribe(self.__report)
362 self.init()
364 def init(self):
365 """Initialise the catalog"""
366 self.database.stac_storage._load_root_catalog()
367 self.__catalog = cast(
368 pystac.Catalog, self.database.stac_storage.root_catalog
369 )
370 self.__stac_pds_collection = StacPdsCollection(self.__catalog)
372 @property
373 def catalog(self) -> pystac.Catalog:
374 """Returns the root catalog
376 Returns:
377 pystac.Catalog: the root catalog
378 """
379 return self.__catalog
381 def _build_stac_cats_and_colls_for_all_pds_catalogs(
382 self, catalogs_pds_collections: Iterator[Dict[str, Any]]
383 ):
384 """Builds STAC catalogs and collections for all PDS collections
386 Args:
387 catalogs_pds_collections (Iterator[Dict[str, Any]]): Catalogs for all PDS collections
388 """
389 for catalogs_pds_collection in catalogs_pds_collections:
390 logger.info(f"Creating STAC catalog for {catalogs_pds_collection}")
391 self.__stac_pds_collection.catalogs = catalogs_pds_collection
392 self.__stac_pds_collection.to_stac()
394 def to_stac(
395 self,
396 pds_ode_catalogs: PDSCatalogsDescription,
397 pds_collections: List[PdsRegistryModel],
398 **kwargs,
399 ):
400 """Creates the STAC catalog and its children.
402 Args:
403 pds_ode_catalogs (PDSCatalogsDescription): PDS3 objects
404 pds_collections (List[PdsRegistryModel]): PDS Collection data
405 """
406 catalogs: Iterator[Dict[str, Any]] = pds_ode_catalogs.get_ode_catalogs(
407 pds_collections, kwargs.get("timeout", 30)
408 )
409 self._build_stac_cats_and_colls_for_all_pds_catalogs(catalogs)
411 def describe(self):
412 """Describe the catalog"""
413 self.catalog.describe()
415 def save(self):
416 """Save on disk the catalog"""
417 self.database.stac_storage.root_normalize_and_save(self.catalog)