Coverage for pds_crawler/etl.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""Provides a ETL API.
8.. code:: python
10 from pds_crawler.etl import PdsSourceEnum, PdsDataEnum, StacETL
11 etl = StacETL("work/database")
12 etl.extract(PdsSourceEnum.COLLECTIONS_INDEX)
15"""
16import logging
17from abc import ABC
18from abc import abstractmethod
19from typing import Any
20from typing import cast
21from typing import List
22from typing import Optional
23from typing import Tuple
25from pystac import Catalog
26from pystac import Collection
28from .extractor import PDSCatalogsDescription
29from .extractor import PdsRecordsWs
30from .extractor import PdsRegistry
31from .load import Database
32from .models import PdsRegistryModel
33from .report import CrawlerReport
34from .transformer import StacCatalogTransformer
35from .transformer import StacRecordsTransformer
36from .utils import DocEnum
37from .utils import UtilsMonitoring
39logger = logging.getLogger(__name__)
42class AbstractSourceEnum(DocEnum):
43 pass
46class AbstractDataEnum(DocEnum):
47 pass
50class CheckUpdateEnum(AbstractSourceEnum):
51 CHECK_PDS = (
52 "pds",
53 "Check updates at PDS",
54 )
55 CHECK_CACHE = (
56 "cache",
57 "Check if some collections in cache are not transformed",
58 )
60 @staticmethod
61 def find_enum(name: str):
62 """Find enum based on its value
64 Args:
65 name (str): enum value
67 Raises:
68 ValuError: Unknown value
70 Returns:
71 CheckUpdateEnum: Enum
72 """
73 result = None
74 for pf_name in CheckUpdateEnum.__members__:
75 val = str(CheckUpdateEnum[pf_name].value)
76 if val == name:
77 result = CheckUpdateEnum[pf_name]
78 break
79 if result is None:
80 raise ValueError(f"Unknown enum value for {name}")
81 return result
84class PdsSourceEnum(AbstractSourceEnum):
85 COLLECTIONS_INDEX = (
86 "ode_collections",
87 "Get the georeferenced products",
88 )
89 COLLECTIONS_INDEX_SAVE = (
90 "ode_collections_save",
91 "Get and save the PDS collections for the georeferenced products",
92 )
93 PDS_CATALOGS = (
94 "pds_objects",
95 "PDS objects describing a catalog (mission, instrument, plateform, ...)",
96 )
97 PDS_RECORDS = ("ode_records", "Records metadata for a given collection")
99 @staticmethod
100 def find_enum(name: str):
101 """Find enum based on its value
103 Args:
104 name (str): enum value
106 Raises:
107 UnknownPFEnum: Unknown value
109 Returns:
110 PdsSourceEnum: Enum
111 """
112 result = None
113 for pf_name in PdsSourceEnum.__members__:
114 val = str(PdsSourceEnum[pf_name].value)
115 if val == name:
116 result = PdsSourceEnum[pf_name]
117 break
118 if result is None:
119 raise ValueError(f"Unknown enum value for {name}")
120 return result
123class PdsDataEnum(AbstractDataEnum):
124 PDS_CATALOGS = (
125 "pds_objects",
126 "PDS objects describing a catalog (mission, instrument, plateform, ...)",
127 )
128 PDS_RECORDS = ("ode_records", "Records metadata for a given collection")
130 @staticmethod
131 def find_enum(name: str):
132 """Find enum based on its value
134 Args:
135 name (str): enum value
137 Raises:
138 UnknownPFEnum: Unknown value
140 Returns:
141 PdsDataEnum: Enum
142 """
143 result = None
144 for pf_name in PdsDataEnum.__members__:
145 val = str(PdsDataEnum[pf_name].value)
146 if val == name:
147 result = PdsDataEnum[pf_name]
148 break
149 if result is None:
150 raise ValueError(f"Unknown enum value for {name}")
151 return result
154class ETL(ABC):
155 @abstractmethod
156 def extract(self, source: AbstractSourceEnum, *args, **kwargs):
157 raise NotImplementedError()
159 @abstractmethod
160 def transform(self, data: AbstractDataEnum, *args, **kwargs):
161 raise NotImplementedError()
163 @abstractmethod
164 def load(self, data: PdsDataEnum, target: Any, *args, **kwargs):
165 raise NotImplementedError()
168class StacETL(ETL):
169 """ETL to extract and transform PDS information to STAC"""
171 def __init__(self, full_path_database_name: str) -> None:
172 """Initialize the class.
174 Args:
175 full_path_database_name (str): the full path to the database
176 """
177 # Create a new instance of the Database class
178 db = Database(full_path_database_name)
179 self.__report = CrawlerReport(db)
180 self.__pds_registry = PdsRegistry(db, report=self.__report)
181 self.__pds_records = PdsRecordsWs(db, report=self.__report)
182 self.__pds_ode_catalogs = PDSCatalogsDescription(
183 db, report=self.__report
184 )
185 self.__stac_catalog_transformer = StacCatalogTransformer(
186 db, report=self.__report
187 )
188 self.__stac_records_transformer = StacRecordsTransformer(
189 db, report=self.__report
190 )
191 self.__body: Optional[str] = None
193 self.__dataset_id: Optional[str] = None
194 self.__nb_workers: int = 3
195 self.__time_sleep: int = 1
196 self.__progress_bar: bool = True
197 self.__is_sample: bool = False
198 self.__nb_records_per_page: int = 5000
199 self.__parser_timeout: int = 60
201 @property
202 def nb_records_per_page(self) -> int:
203 """Getter for the number of records per page."""
204 return self.__nb_records_per_page
206 @nb_records_per_page.setter
207 def nb_records_per_page(self, value: int):
208 """Setter for the number of records per page.
210 Args:
211 value (int): the new value for the number of records per page
212 """
213 self.__nb_records_per_page = value
215 @property
216 def report(self) -> CrawlerReport:
217 """Getter for the CrawlerReport instance."""
218 return self.__report
220 @report.setter
221 def report(self, value: CrawlerReport):
222 """Setter for the CrawlerReport instance.
224 Args:
225 value (CrawlerReport): the new CrawlerReport instance
226 """
227 self.__report = value
229 @property
230 def pds_registry(self) -> PdsRegistry:
231 """Getter for the PdsRegistry instance."""
232 return self.__pds_registry
234 @property
235 def pds_records(self) -> PdsRecordsWs:
236 """Getter for the PdsRecordsWs instance."""
237 return self.__pds_records
239 @property
240 def pds_ode_catalogs(self) -> PDSCatalogsDescription:
241 """Getter for the PDSCatalogsDescription instance."""
242 return self.__pds_ode_catalogs
244 @property
245 def stac_catalog_transformer(self) -> StacCatalogTransformer:
246 """Getter for the StacCatalogTransformer instance."""
247 return self.__stac_catalog_transformer
249 @property
250 def stac_records_transformer(self) -> StacRecordsTransformer:
251 """Getter for the StacRecordsTransformer instance."""
252 return self.__stac_records_transformer
254 @property
255 def body(self) -> Optional[str]:
256 """Getter for the body attribute."""
257 return self.__body
259 @body.setter
260 def body(self, name: str):
261 """Setter for the body attribute.
263 Args:
264 name (str): the new value for the solar body attribute
265 """
266 self.__body = name
268 @property
269 def dataset_id(self) -> Optional[str]:
270 """Getter for the dataset_id attribute."""
271 return self.__dataset_id
273 @dataset_id.setter
274 def dataset_id(self, value: str):
275 self.__dataset_id = value
277 @property
278 def nb_workers(self) -> int:
279 """Getter for the nbumber of parallel workers."""
280 return self.__nb_workers
282 @nb_workers.setter
283 def nb_workers(self, value: int):
284 self.__nb_workers = value
286 @property
287 def time_sleep(self) -> int:
288 """Getter for the time sleep attribute."""
289 return self.__time_sleep
291 @time_sleep.setter
292 def time_sleep(self, value: int):
293 self.__time_sleep = value
295 @property
296 def progress_bar(self) -> bool:
297 """Getter for the progress_bar attribute."""
298 return self.__progress_bar
300 @progress_bar.setter
301 def progress_bar(self, value: bool):
302 self.__progress_bar = value
304 @property
305 def is_sample(self) -> bool:
306 """Getter for the is_sample attribute."""
307 return self.__is_sample
309 @is_sample.setter
310 def is_sample(self, value: bool):
311 self.__is_sample = value
313 @property
314 def parser_timeout(self) -> int:
315 """Getter for the parser timeout attribute."""
316 return self.__parser_timeout
318 @parser_timeout.setter
319 def parser_timeout(self, value: int):
320 self.__parser_timeout = value
322 @UtilsMonitoring.timeit
323 def extract(self, source: PdsSourceEnum, *args, **kwargs):
324 """Extract the PDS information.
326 It exists different types of extraction:
328 * COLLECTIONS_INDEX : query the PDS to get the georefereced collections
329 * COLLECTIONS_INDEX_SAVE : like COLLECTIONS_INDEX but save the result in cache
330 * PDS_RECORDS : Load the PDS collections from cache, build and cache all the URLs to get all pages of the PdsRecordsWs and download all pages
331 * PDS_CATALOGS : Load the PDS collections from cache, and download the PDS3 objects
333 Args:
334 source (PdsSourceEnum): Type of extraction
336 Raises:
337 NotImplementedError: Extraction type is not implemented
338 """
339 match source:
340 case PdsSourceEnum.COLLECTIONS_INDEX:
341 (
342 stats,
343 collections_pds,
344 ) = self.pds_registry.get_pds_collections(
345 self.body, self.dataset_id
346 )
347 for collection in collections_pds:
348 print(collection)
349 case PdsSourceEnum.COLLECTIONS_INDEX_SAVE:
350 (
351 stats,
352 collections_pds,
353 ) = self.pds_registry.get_pds_collections(
354 self.body, self.dataset_id
355 )
356 self.pds_registry.cache_pds_collections(collections_pds)
357 case PdsSourceEnum.PDS_RECORDS:
358 pds_collections: List[
359 PdsRegistryModel
360 ] = self.pds_registry.load_pds_collections_from_cache(
361 self.body, self.dataset_id
362 )
363 self.pds_records.generate_urls_for_all_collections(
364 pds_collections=pds_collections,
365 limit=self.nb_records_per_page,
366 )
367 limit: Optional[int] = 1 if self.is_sample else None
368 self.pds_records.download_pds_records_for_all_collections(
369 pds_collections=pds_collections,
370 progress_bar=self.progress_bar,
371 limit=limit,
372 )
373 case PdsSourceEnum.PDS_CATALOGS:
374 pds_collections: List[
375 PdsRegistryModel
376 ] = self.pds_registry.load_pds_collections_from_cache(
377 self.body, self.dataset_id
378 )
379 self.pds_ode_catalogs.download(
380 pds_collections=pds_collections,
381 nb_workers=self.nb_workers,
382 time_sleep=self.time_sleep,
383 progress_bar=self.progress_bar,
384 )
385 case _:
386 raise NotImplementedError(
387 f"Extraction is not implemented for {source}"
388 )
390 def _check_collections_to_ingest(
391 self, cached_pds_collections: List[PdsRegistryModel]
392 ) -> int:
393 """This method checks the number of PDS collections that need to be ingested into the
394 database from the cache.
396 Args:
397 cached_pds_collections (List[PdsRegistryModel]): List of PDS collections in the cache.
399 Returns:
400 int: Number of collections to ingest from cache.
401 """
402 db: Database = self.stac_records_transformer.database
403 root_stac_catalog: Catalog = cast(
404 Catalog, db.stac_storage.root_catalog
405 )
406 nb_to_ingest: int = 0
407 for pds_collection in cached_pds_collections:
408 coll = cast(
409 Collection,
410 root_stac_catalog.get_child(
411 pds_collection.get_collection_id(), recursive=True
412 ),
413 )
414 if coll is None:
415 nb_to_ingest += 1
416 logger.info(f"{pds_collection} was not ingested")
417 return nb_to_ingest
419 def _check_updates_from_PDS(
420 self,
421 pds_collections: List[PdsRegistryModel],
422 pds_collections_cache: List[PdsRegistryModel],
423 ) -> int:
424 """
425 This method checks the number of PDS collections that need to be updated in the database from the PDS.
427 Args:
428 pds_collections (List[PdsRegistryModel]): List of PDS collections.
429 pds_collections_cache (List[PdsRegistryModel]): List of PDS collections in the cache.
431 Returns:
432 int: Number of collections to update in the database.
433 """
434 nb_to_update: int = 0
435 for pds_collection_cache in pds_collections_cache:
436 try:
437 pds_collections.index(pds_collection_cache)
438 except ValueError:
439 nb_to_update += 1
440 logger.info(f"{pds_collections_cache} has been updated at PDS")
441 return nb_to_update
443 def check_collections_to_ingest_from_pds(
444 self,
445 pds_collections: List[PdsRegistryModel],
446 pds_collections_cache: List[PdsRegistryModel],
447 ) -> Tuple[int, int]:
448 """
449 This method checks the number of PDS collections that need to be ingested into the database from the PDS.
451 Args:
452 pds_collections (List[PdsRegistryModel]): List of PDS collections.
453 pds_collections_cache (List[PdsRegistryModel]): List of PDS collections in the cache.
455 Returns:
456 Tuple[int, int]: Tuple containing the number of collections to ingest and the total number of records.
457 """
458 nb_to_ingest: int = 0
459 nb_records: int = 0
460 for pds_collection in pds_collections:
461 try:
462 pds_collections_cache.index(pds_collection)
463 except ValueError:
464 nb_records += pds_collection.NumberProducts
465 nb_to_ingest += 1
466 logger.info(f"{pds_collection} must be ingested")
467 return (nb_to_ingest, nb_records)
469 @UtilsMonitoring.timeit
470 def check_update(self, source: CheckUpdateEnum, *args, **kwargs):
471 """
472 This method checks if there are any updates from the PDS or in the cache.
474 Args:
475 source (CheckUpdateEnum): The source of the check (cache or PDS).
476 *args: Variable length argument list.
477 **kwargs: Arbitrary keyword arguments.
479 Raises:
480 NotImplementedError: If the source is not supported.
481 """
482 match source:
483 case CheckUpdateEnum.CHECK_CACHE:
484 pds_collections: List[
485 PdsRegistryModel
486 ] = self.pds_registry.load_pds_collections_from_cache(
487 self.body, self.dataset_id
488 )
489 nb_to_ingest: int = self._check_collections_to_ingest(
490 pds_collections
491 )
492 logger.info(
493 f"""Summary:
494 {nb_to_ingest} collection(s) to ingest from cache"""
495 )
497 case CheckUpdateEnum.CHECK_PDS:
498 pds_collections_cache: List[
499 PdsRegistryModel
500 ] = self.pds_registry.load_pds_collections_from_cache(
501 self.body, self.dataset_id
502 )
503 _, pds_collections = self.pds_registry.get_pds_collections(
504 self.body, self.dataset_id
505 )
506 nb_to_update: int = self._check_updates_from_PDS(
507 pds_collections, pds_collections_cache
508 )
509 (
510 nb_to_ingest,
511 nb_records,
512 ) = self.check_collections_to_ingest_from_pds(
513 pds_collections, pds_collections_cache
514 )
515 logger.info(
516 f"""Summary:
517 - {nb_to_ingest} collection(s) to ingest from PDS, i.e {nb_records} products
518 - {nb_to_update} collection(s) to update"""
519 )
521 case _:
522 raise NotImplementedError(
523 f"Check update is not implemented for {source}"
524 )
526 @UtilsMonitoring.timeit
527 def transform(self, data: PdsDataEnum, *args, **kwargs):
528 """Transform the downloaded information as STAC
530 It exists different types of extraction:
532 * PDS_RECORDS : Load the PDS collections from cache, convert to STAC items and parents if parents are not available
533 * PDS_CATALOGS : Load the PDS collections from cache, convert/update to STAC the various catalogs/collections
535 Args:
536 data (PdsDataEnum): Type of transformation
538 Raises:
539 NotImplementedError: Transformation type is not implemented
540 """
541 pds_collections: List[
542 PdsRegistryModel
543 ] = self.pds_registry.load_pds_collections_from_cache(
544 self.body, self.dataset_id
545 )
546 match data:
547 case PdsDataEnum.PDS_CATALOGS:
548 self.report.name = PdsDataEnum.PDS_CATALOGS.name
549 self.report.start_report()
550 self.stac_catalog_transformer.init()
551 self.stac_catalog_transformer.to_stac(
552 self.pds_ode_catalogs,
553 pds_collections,
554 timeout=self.parser_timeout,
555 )
556 self.stac_catalog_transformer.save()
557 self.report.close_report()
558 case PdsDataEnum.PDS_RECORDS:
559 self.report.name = PdsDataEnum.PDS_RECORDS.name
560 self.report.start_report()
561 self.stac_records_transformer.init()
562 self.stac_records_transformer.load_root_catalog()
563 self.stac_records_transformer.to_stac(
564 self.pds_records, pds_collections
565 )
566 self.stac_records_transformer.save()
567 self.report.close_report()
568 case _:
569 raise NotImplementedError(
570 f"Transformation is not implemented for {data}"
571 )
573 def load(self, data: PdsDataEnum, target: str, *args, **kwargs):
574 pass