Coverage for pds_crawler/transformer/pds3_objects.py: 78%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""
7Module Name:
8 pds3_objects
10Description:
11 Create a STAC output of the PDS collection by the use of PDS3 catalogs.
12 The code defines an implementation of the Chain of Responsibility design
13 pattern using the abstract base class Handler to select the handler (mission,
14 plateform, instrument, dataset) to convert to STAC
16Classes:
17 Handler:
18 Handler interface.
19 AbstractHandler:
20 Default chaining behavior.
21 MissionHandler :
22 Mission handler.
23 PlateformHandler :
24 Plateform handler.
25 InstrumentHandler :
26 Instrument handler.
27 DatasetHandler :
28 Dataset handler.
29 StacPdsCollection :
30 Converts PDS3 object from ODE archive to PDS STAC catalog (without items)
32Author:
33 Jean-Christophe Malapert
34"""
35from __future__ import annotations
37import logging
38import os
39from abc import ABC
40from abc import abstractmethod
41from typing import Any
42from typing import cast
43from typing import Dict
44from typing import List
45from typing import Optional
46from typing import Union
48import pystac
50from ..load import PdsParserFactory
51from ..models import DataSetModel
52from ..models import InstrumentHostModel
53from ..models import InstrumentModel
54from ..models import MissionModel
55from ..models import PdsRegistryModel
56from ..models import ReferencesModel
57from ..models import VolumeModel
58from ..models.pds_models import DataProducerModel
59from ..models.pds_models import DataSupplierModel
61logger = logging.getLogger(__name__)
64class Handler(ABC):
65 """
66 The Handler interface declares a method for building the chain of handlers.
67 It also declares a method for executing a request.
68 """
70 @abstractmethod
71 def set_next(self, handler: Handler) -> Handler:
72 pass
74 @abstractmethod
75 def handle(self, request) -> None:
76 pass
79class AbstractHandler(Handler):
80 """
81 The default chaining behavior can be implemented inside a base handler
82 class.
83 """
85 _next_handler: Optional[Handler] = None
87 def set_next(self, handler: Handler) -> Handler:
88 self._next_handler = handler
89 return handler
91 def _is_exists(
92 self, mission_stac: Union[pystac.Catalog, pystac.Collection]
93 ) -> bool:
94 """Checks if the catalog or collection exists
96 Args:
97 catalog (pystac.Catalog): Stac Catalog or collection
99 Returns:
100 bool: True when the catalog or collection ID is in the STAC catalog
101 """
102 return mission_stac is not None
104 @abstractmethod
105 def handle(self, request: Any) -> None:
106 if self._next_handler:
107 self._next_handler.handle(request)
110class MissionHandler(AbstractHandler):
111 """A handler for adding mission to a STAC catalog."""
113 def __init__(
114 self,
115 catalog: pystac.Catalog,
116 body_id: str,
117 mission_id,
118 citations: Optional[ReferencesModel],
119 ):
120 self.__catalog: pystac.Catalog = catalog
121 self.__body_id: str = body_id
122 self.__mission_id: str = mission_id
123 self.__citations: Optional[ReferencesModel] = citations
125 @property
126 def body_id(self) -> str:
127 return self.__body_id
129 @property
130 def mission_id(self) -> str:
131 return self.__mission_id
133 @property
134 def catalog(self) -> pystac.Catalog:
135 return self.__catalog
137 @property
138 def citations(self) -> Optional[ReferencesModel]:
139 return self.__citations
141 def _is_must_be_updated(
142 self, mission_stac: pystac.Catalog, mission: MissionModel
143 ) -> bool:
144 """Check if mission_stac must be updated.
146 To check if mission_stac must be updated, we need to check :
147 - mission_stac is alreay on disk
148 - the description is shorter than the mission description
150 Args:
151 mission_stac (pystac.Catalog): mission in memory or in disk
152 mission (MissionModel): new mission information
154 Returns:
155 bool: True of the mission_stac on disk must be updated by mission information
156 """
157 return os.path.exists(mission_stac.self_href) and len(
158 mission_stac.description
159 ) < len(mission.MISSION_INFORMATION.MISSION_DESC)
161 def _update(self, mission_stac: pystac.Catalog, mission: MissionModel):
162 """Update the STAC catalog with mission model.
164 Args:
165 mission_stac (pystac.Catalog): STAC catalog to update
166 mission (MissionModel): New information
167 """
168 mission_stac_new = mission.create_stac_catalog(
169 self.body_id, self.citations
170 )
171 mission_stac.title = mission_stac_new.title
172 mission_stac.description = mission_stac_new.description
173 mission_stac.stac_extensions = mission_stac_new.stac_extensions
174 mission_stac.extra_fields = mission_stac_new.extra_fields
175 mission_stac.save_object(include_self_link=False)
177 def handle(self, request: Any) -> None:
178 if isinstance(request, MissionModel):
179 # Get the mission from the STAC catalog
180 mission: MissionModel = request
181 mission_stac = cast(
182 pystac.Catalog,
183 self.catalog.get_child(self.mission_id, recursive=True),
184 )
186 if not self._is_exists(mission_stac):
187 # mission is not found, so create it
188 stac_mission = mission.create_stac_catalog(
189 self.body_id, self.citations
190 )
192 # use mission_id of PDS collection to avoid interop problem
193 stac_mission.id = self.mission_id
195 # Get the parent of mission: the solar body
196 body_cat = cast(
197 pystac.Catalog, self.catalog.get_child(self.body_id)
198 )
200 # Add the mission to the planet
201 body_cat.add_child(stac_mission)
202 logger.info(f"{stac_mission.id} added to {body_cat.id}")
203 elif self._is_must_be_updated(mission_stac, mission):
204 logger.info(f"{mission_stac.self_href} has been updated")
205 self._update(mission_stac, mission)
206 else:
207 super().handle(request)
210class PlateformHandler(AbstractHandler):
211 """A handler for adding instrument host platforms to a STAC catalog.
213 Attributes:
214 catalog (pystac.Catalog): The root catalog to add platforms to.
215 body_id (str): The ID of the celestial body the platforms are associated with.
216 mission_id (str): The ID of the mission the platforms are associated with.
217 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms.
218 """
220 def __init__(
221 self,
222 catalog: pystac.Catalog,
223 body_id: str,
224 mission_id: str,
225 citations: Optional[ReferencesModel],
226 ):
227 self.__catalog: pystac.Catalog = catalog
228 self.__body_id: str = body_id
229 self.__mission_id: str = mission_id
230 self.__citations: Optional[ReferencesModel] = citations
232 @property
233 def catalog(self) -> pystac.Catalog:
234 """Gets the pystac Catalog object associated with this handler.
236 Returns:
237 A pystac Catalog object.
238 """
239 return self.__catalog
241 @property
242 def body_id(self) -> str:
243 """Gets the ID of the celestial body associated with this handler.
245 Returns:
246 A string containing the celestial body ID.
247 """
248 return self.__body_id
250 @property
251 def mission_id(self) -> str:
252 """Gets the ID of the mission associated with this handler.
254 Returns:
255 A string containing the mission ID.
256 """
257 return self.__mission_id
259 @property
260 def citations(self) -> Optional[ReferencesModel]:
261 """Gets the ReferencesModel object containing references for the mission and the platform.
263 Returns:
264 An optional ReferencesModel object.
265 """
266 return self.__citations
268 def _is_must_be_updated(
269 self, plateform_stac: pystac.Catalog, plateform: InstrumentHostModel
270 ) -> bool:
271 """Determines whether the STAC catalog for the platform must be updated.
273 Args:
274 plateform_stac: The pystac Catalog object associated with the platform.
275 plateform: An InstrumentHostModel object representing the platform.
277 Returns:
278 A boolean value indicating whether the STAC catalog for the platform must be updated.
279 """
280 # Check if the STAC catalog for the platform exists and has a shorter description than
281 # the one in the InstrumentHostModel object.
282 return os.path.exists(plateform_stac.self_href) and len(
283 plateform_stac.description
284 ) < len(plateform.INSTRUMENT_HOST_INFORMATION.INSTRUMENT_HOST_DESC)
286 def _update(
287 self, plateform_stac: pystac.Catalog, plateform: InstrumentHostModel
288 ):
289 """Updates the STAC catalog for the platform.
291 Args:
292 plateform_stac: The pystac Catalog object associated with the platform.
293 plateform: An InstrumentHostModel object representing the platform.
294 """
295 # Create a new STAC catalog for the platform using the InstrumentHostModel object.
296 plateform_stac_new = plateform.create_stac_catalog(
297 self.body_id, self.citations
298 )
300 # Update the STAC catalog for the platform with the information from the new catalog.
301 plateform_stac.title = plateform_stac_new.title
302 plateform_stac.description = plateform_stac_new.description
303 plateform_stac.stac_extensions = plateform_stac_new.stac_extensions
304 plateform_stac.extra_fields = plateform_stac_new.extra_fields
305 plateform_stac.save_object(include_self_link=False)
307 def _add_plateform_to_mission(self, plateform: InstrumentHostModel):
308 """Adds a platform to the STAC catalog.
310 Args:
311 plateform (InstrumentHostModel): The platform to be added to the STAC catalog.
312 """
313 # Get the platform ID
314 plateform_id: str = plateform.get_plateform_id()
316 # Get the platform STAC catalog, if it exists
317 plateform_stac = cast(
318 pystac.Catalog,
319 self.catalog.get_child(plateform_id, recursive=True),
320 )
322 # Check if the platform exists in the catalog
323 if not self._is_exists(plateform_stac):
324 # Get the mission STAC catalog
325 stac_mission = cast(
326 pystac.Catalog,
327 self.catalog.get_child(self.mission_id, recursive=True),
328 )
329 logger.debug(f"Looking for {self.mission_id}: {stac_mission}")
331 # Create a STAC catalog for the platform
332 stac_plateform = plateform.create_stac_catalog(
333 self.body_id, self.citations
334 )
336 # Add the platform STAC catalog as a child of the mission STAC catalog
337 stac_mission.add_child(stac_plateform)
338 logger.debug(f"{stac_plateform.id} added to {stac_mission.id}")
339 elif self._is_must_be_updated(plateform_stac, plateform):
340 logger.info(f"{plateform_stac.self_href} has been updated")
341 self._update(plateform_stac, plateform)
343 def _add_plateforms_to_mission(
344 self, plateforms: List[InstrumentHostModel]
345 ):
346 """
347 Adds a list of platforms to the STAC catalog.
349 Args:
350 plateforms (List[InstrumentHostModel]): The list of platforms to be added to the STAC catalog.
351 """
352 # Iterate over the list of platforms and add each platform to the mission STAC catalog
354 for plateform in plateforms:
355 self._add_plateform_to_mission(plateform)
357 def handle(self, request: Any) -> None:
358 """
359 Handles the request to add platforms to the STAC catalog.
361 Args:
362 request (Any): The request to add platforms to the STAC catalog.
363 """
364 if isinstance(request, InstrumentHostModel):
365 # If the request is a single platform, add it to the mission STAC catalog
366 plateform: InstrumentHostModel = request
367 self._add_plateform_to_mission(plateform)
368 elif isinstance(request, list) and isinstance(
369 request[0], InstrumentHostModel
370 ):
371 # If the request is a list of platforms, add all of them to the mission STAC catalog
372 plateforms: List[InstrumentHostModel] = request
373 self._add_plateforms_to_mission(plateforms)
374 else:
375 # If the request is neither a single platform nor a list of platforms, call the base class's handle method
376 super().handle(request)
379class InstrumentHandler(AbstractHandler):
380 """A handler for adding instrument to a STAC catalog.
382 Attributes:
383 catalog (pystac.Catalog): The root catalog to add platforms to.
384 body_id (str): The ID of the celestial body the platforms are associated with.
385 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms.
386 """
388 def __init__(
389 self,
390 catalog: pystac.Catalog,
391 body_id: str,
392 citations: Optional[ReferencesModel],
393 ):
394 """A class representing an Instrument handler that can add Instruments to a STAC catalog.
396 Args:
397 catalog (pystac.Catalog): A Catalog object representing the STAC catalog.
398 body_id (str): The ID of the celestial body.
399 citations (Optional[ReferencesModel]): A ReferencesModel object representing the citations for the mission.
400 """
401 self.__catalog: pystac.Catalog = catalog
402 self.__body_id: str = body_id
403 self.__citations: Optional[ReferencesModel] = citations
405 @property
406 def catalog(self) -> pystac.Catalog:
407 """Get the STAC catalog."""
408 return self.__catalog
410 @property
411 def body_id(self) -> str:
412 """Get the ID of the solar body."""
413 return self.__body_id
415 @property
416 def citations(self) -> Optional[ReferencesModel]:
417 """Get the citations for the mission."""
418 return self.__citations
420 def _is_must_be_updated(
421 self, instrument_stac: pystac.Catalog, instrument: InstrumentModel
422 ) -> bool:
423 """Check if the given STAC Catalog for the Instrument must be updated.
425 Args:
426 instrument_stac (pystac.Catalog): A Catalog object representing the STAC catalog for the Instrument.
427 instrument (InstrumentModel): An InstrumentModel object representing the Instrument.
429 Returns:
430 bool: True if the given STAC Catalog for the Instrument must be updated, False otherwise.
431 """
432 return os.path.exists(instrument_stac.self_href) and len(
433 instrument_stac.description
434 ) < len(instrument.INSTRUMENT_INFORMATION.INSTRUMENT_DESC)
436 def _update(
437 self, instrument_stac: pystac.Catalog, instrument: InstrumentModel
438 ):
439 """Update the given STAC Catalog for the Instrument with the provided InstrumentModel.
441 Args:
442 instrument_stac (pystac.Catalog): A Catalog object representing the STAC catalog for the Instrument.
443 instrument (InstrumentModel): An InstrumentModel object representing the Instrument.
444 """
445 instrument_stac_new = instrument.create_stac_catalog(
446 self.body_id, self.citations
447 )
448 instrument_stac.title = instrument_stac_new.title
449 instrument_stac.description = instrument_stac_new.description
450 instrument_stac.stac_extensions = instrument_stac_new.stac_extensions
451 instrument_stac.extra_fields = instrument_stac_new.extra_fields
452 instrument_stac.save_object(include_self_link=False)
454 def _add_instrument_to_mission(self, instrument: InstrumentModel):
455 """Add an InstrumentModel to the STAC Catalog of the mission.
457 Args:
458 instrument (InstrumentModel): The InstrumentModel instance to add.
459 """
460 # Retrieve the ID of the instrument
461 instrument_id: str = instrument.get_instrument_id()
462 # Retrieve the STAC Catalog for this instrument
463 instrument_stac = cast(
464 pystac.Catalog,
465 self.catalog.get_child(instrument_id, recursive=True),
466 )
467 # If the instrument doesn't exist yet in the Catalog, create it
468 if not self._is_exists(instrument_stac):
469 # Retrieve the ID of the platform on which the instrument is mounted
470 plateform_id: str = instrument.get_plateform_id()
472 # Retrieve the STAC Catalog for this platform
473 stac_plateform = cast(
474 pystac.Catalog,
475 self.catalog.get_child(plateform_id, recursive=True),
476 )
477 logger.debug(f"Looking for {plateform_id}: {stac_plateform}")
479 # Create the STAC Catalog for the instrument
480 stac_instrument = instrument.create_stac_catalog(
481 self.body_id, self.citations
482 )
484 # Add the instrument Catalog to the platform Catalog
485 stac_plateform.add_child(stac_instrument)
486 logger.debug(f"{stac_instrument.id} added to {stac_plateform.id}")
487 elif self._is_must_be_updated(instrument_stac, instrument):
488 # If the instrument already exists in the Catalog, check if it needs to be updated
489 logger.info(f"{instrument_stac.self_href} has been updated")
490 self._update(instrument_stac, instrument)
492 def _add_instruments_to_mission(self, instruments: List[InstrumentModel]):
493 """Add multiple InstrumentModel instances to the STAC Catalog of the mission.
495 Args:
496 instruments (List[InstrumentModel]): A list of InstrumentModel instances to add.
497 """
498 # Iterate over each InstrumentModel instance and add them to the Catalog
499 for instrument in instruments:
500 self._add_instrument_to_mission(instrument)
502 def handle(self, request: Any) -> None:
503 """Handle the request to add an InstrumentModel or multiple InstrumentModel instances to the mission.
505 Args:
506 request (Any): The request object.
507 """
508 if isinstance(request, InstrumentModel):
509 instrument: InstrumentModel = request
510 self._add_instrument_to_mission(instrument)
511 elif isinstance(request, list) and isinstance(
512 request[0], InstrumentModel
513 ):
514 instruments: List[InstrumentModel] = request
515 self._add_instruments_to_mission(instruments)
516 else:
517 super().handle(request)
520class DatasetHandler(AbstractHandler):
521 """A handler for adding collection to a STAC catalog.
523 Attributes:
524 catalog (pystac.Catalog): The root catalog to add platforms to.
525 body_id (str): The ID of the celestial body the platforms are associated with.
526 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms.
527 data_supplier (Optional[DataSupplierModel]): An optional model of data supplier to add to the platforms.
528 data_producer (Optional[DataProducerModel]): An optional model of data producer to add to the platforms.
529 """
531 def __init__(
532 self,
533 catalog: pystac.Catalog,
534 body_id: str,
535 volume_desc: VolumeModel,
536 citations: Optional[ReferencesModel],
537 ):
538 """Initializes DatasetHandler.
540 Args:
541 catalog: The root catalog to add collections to.
542 body_id: The ID of the celestial body the collections are associated with.
543 volume_desc: A model of the volume description for the collections.
544 citations: An optional model of references to add to the collections.
545 """
546 self.__catalog: pystac.Catalog = catalog
547 self.__body_id: str = body_id
548 self.__citations: Optional[ReferencesModel] = citations
549 self.__data_supplier: Optional[
550 DataSupplierModel
551 ] = volume_desc.DATA_SUPPLIER
552 self.__data_producer: DataProducerModel = volume_desc.DATA_PRODUCER
554 @property
555 def catalog(self) -> pystac.Catalog:
556 """pystac.Catalog: The root catalog to add collections to."""
557 return self.__catalog
559 @property
560 def body_id(self) -> str:
561 """The ID of the celestial body, the collections are associated with."""
562 return self.__body_id
564 @property
565 def data_supplier(self) -> Optional[DataSupplierModel]:
566 """A model of the data supplier for the collections."""
567 return self.__data_supplier
569 @property
570 def data_producer(self) -> DataProducerModel:
571 """A model of the data producer for the collections."""
572 return self.__data_producer
574 @property
575 def citations(self) -> Optional[ReferencesModel]:
576 """A model of references to add to the collections."""
577 return self.__citations
579 def _is_must_be_updated(
580 self, dataset_stac: pystac.Collection, dataset: DataSetModel
581 ) -> bool:
582 """Check whether a dataset in the catalog must be updated based on its description.
584 Args:
585 dataset_stac: A pystac Collection object representing the dataset.
586 dataset: A DataSetModel object representing the dataset.
588 Returns:
589 True if the dataset's description has changed and the dataset must be updated, False otherwise.
590 """
591 # Get the description of the dataset from its DATA_SET_INFORMATION object
592 description: Optional[
593 str
594 ] = dataset.DATA_SET_INFORMATION._get_description()
595 # Check if the STAC collection already exists and if the dataset's description is longer than the existing one
596 return (
597 os.path.exists(dataset_stac.self_href)
598 and description is not None
599 and len(dataset_stac.description) < len(description)
600 )
602 def _update(self, dataset_stac: pystac.Collection, dataset: DataSetModel):
603 """
604 Update a dataset in the catalog with new metadata.
606 Args:
607 dataset_stac: A pystac Collection object representing the dataset to be updated.
608 dataset: A DataSetModel object representing the new metadata for the dataset.
609 """
610 # Create a new STAC collection for the dataset
611 dataset_stac_new = dataset.create_stac_collection(
612 self.body_id,
613 self.citations,
614 self.data_supplier,
615 self.data_producer,
616 )
618 # Update the existing STAC collection with the new information
619 dataset_stac.title = dataset_stac_new.title
620 dataset_stac.description = dataset_stac_new.description
621 dataset_stac.stac_extensions = dataset_stac_new.stac_extensions
622 dataset_stac.extra_fields = dataset_stac_new.extra_fields
623 dataset_stac.save_object(include_self_link=False)
625 def _add_dataset_to_instrument(self, dataset: DataSetModel):
626 """
627 Add a dataset to the appropriate instrument catalog in the overall catalog.
629 Args:
630 dataset: A DataSetModel object representing the dataset to be added.
631 """
632 # Get the ID of the dataset's STAC collection
633 dataset_id: str = dataset.get_collection_id()
635 # Get the existing STAC collection for the dataset (if it exists)
636 dataset_stac = cast(
637 pystac.Collection,
638 self.catalog.get_child(dataset_id, recursive=True),
639 )
641 # If the dataset doesn't exist in the catalog, create a new STAC collection for it and add it to the appropriate instrument(s)
642 if not self._is_exists(dataset_stac):
643 stac_dataset = dataset.create_stac_collection(
644 self.body_id,
645 self.citations,
646 self.data_supplier,
647 self.data_producer,
648 )
649 # Get the ID(s) of the instrument(s) associated with the dataset
650 instrument_ids: Union[
651 str, List[str]
652 ] = dataset.DATA_SET_HOST.get_instrument_id()
654 # If there is only one instrument ID, add the dataset to that instrument's catalog
655 if isinstance(instrument_ids, str):
656 instrument_id: str = cast(str, instrument_ids)
657 stac_instrument = cast(
658 pystac.Catalog,
659 self.catalog.get_child(instrument_id, recursive=True),
660 )
661 logger.debug(f"Looking for {instrument_id}: {stac_instrument}")
662 stac_instrument.add_child(stac_dataset)
663 logger.debug(
664 f"{stac_dataset.id} added to {stac_instrument.id}"
665 )
667 # If there are multiple instrument IDs, add the dataset to each instrument's catalog
668 else:
669 for instrument_id in instrument_ids:
670 stac_instrument = cast(
671 pystac.Catalog,
672 self.catalog.get_child(instrument_id, recursive=True),
673 )
674 logger.debug(
675 f"Looking for {instrument_id}: {stac_instrument}"
676 )
677 stac_instrument.add_child(stac_dataset)
678 logger.debug(
679 f"{stac_dataset.id} added to {stac_instrument.id}"
680 )
682 # If the dataset already exists in the catalog and needs to be updated, update it
683 elif self._is_must_be_updated(dataset_stac, dataset):
684 logger.info(f"{dataset_stac.self_href} has been updated")
685 self._update(dataset_stac, dataset)
687 def _add_datasets_to_instrument(self, datasets: List[DataSetModel]):
688 """Add multiple DataSetModel instances to the STAC Catalog of the instrument.
690 Args:
691 datasets (List[DataSetModel]): A list of DataSetModel instances to add.
692 """
693 # Iterate over each DataSetModel instance and add them to the Catalog
694 for dataset in datasets:
695 self._add_dataset_to_instrument(dataset)
697 def handle(self, request: Any) -> None:
698 """Handle the request to add an DataSetModel or multiple DataSetModel instances to the instrument.
700 Args:
701 request (Any): The request object.
702 """
703 if isinstance(request, DataSetModel):
704 dataset: DataSetModel = request
705 self._add_dataset_to_instrument(dataset)
706 elif isinstance(request, list) and isinstance(
707 request[0], DataSetModel
708 ):
709 datasets: List[DataSetModel] = request
710 self._add_datasets_to_instrument(datasets)
711 else:
712 super().handle(request)
715class StacPdsCollection:
716 # Dictionary that defines the sort order of PDS catalog files
717 SORT_ORDER = {
718 PdsParserFactory.FileGrammary.MISSION_CATALOG.name: 0,
719 PdsParserFactory.FileGrammary.INSTRUMENT_HOST_CATALOG.name: 1,
720 PdsParserFactory.FileGrammary.INSTRUMENT_CATALOG.name: 2,
721 PdsParserFactory.FileGrammary.DATA_SET_CATALOG.name: 3,
722 PdsParserFactory.FileGrammary.DATA_SET_MAP_PROJECTION_CATALOG.name: 4,
723 PdsParserFactory.FileGrammary.PERSONNEL_CATALOG.name: 5,
724 PdsParserFactory.FileGrammary.REFERENCE_CATALOG.name: 6,
725 PdsParserFactory.FileGrammary.VOL_DESC.name: 7,
726 "collection": 8,
727 }
729 def __init__(self, root_stac: pystac.Catalog):
730 # Initializes the class with a root STAC catalog
731 self.__root_stac: pystac.Catalog = root_stac
732 self.__catalogs: Dict[str, Any] = dict()
734 def _is_already_exists(self, id: str) -> bool:
735 """Checks if the catalog or collection ID is in the STAC catalog
737 Args:
738 id (str): catalog or collection ID
740 Returns:
741 bool: True when the catalog or collection ID is in the STAC catalog
742 """
743 # Returns a boolean indicating if a catalog or collection ID exists in the STAC catalog
744 return self.root_stac.get_child(id, recursive=True) is not None
746 @property
747 def catalogs(self) -> Dict[str, Any]:
748 return self.__catalogs
750 @catalogs.setter
751 def catalogs(self, value: Dict[str, Any] = dict()):
752 self.__catalogs = value
754 @property
755 def root_stac(self) -> pystac.Catalog:
756 return self.__root_stac
758 def to_stac(self):
759 # Get the PDS3 reference catalog
760 ref_catalog_name: str = (
761 PdsParserFactory.FileGrammary.REFERENCE_CATALOG.name
762 )
763 citations: ReferencesModel = cast(
764 ReferencesModel, self.catalogs.get(ref_catalog_name)
765 )
767 # Get the PDS3 collection
768 pds_collection: PdsRegistryModel = cast(
769 PdsRegistryModel, self.catalogs.get("collection")
770 )
772 # Get the volume description catalog that contains a reference
773 # to others catalogs
774 vol_catalog_name: str = PdsParserFactory.FileGrammary.VOL_DESC.name
775 volume_desc: VolumeModel = cast(
776 VolumeModel, self.catalogs.get(vol_catalog_name)
777 )
778 if volume_desc is None:
779 # If volume description is not available, return
780 return
782 # Get the body ID and mission ID from the PDS collection
783 body_id: str = pds_collection.get_body_id()
784 mission_id: str = pds_collection.get_mission_id()
786 if not self._is_already_exists(body_id):
787 # If body catalog does not exist, create it and add it to the root catalog
788 pystac_body_cat = pds_collection.create_stac_body_catalog()
789 self.root_stac.add_child(pystac_body_cat)
791 mission = MissionHandler(
792 self.root_stac, body_id, mission_id, citations
793 )
794 plateform = PlateformHandler(
795 self.root_stac, body_id, mission_id, citations
796 )
797 instrument = InstrumentHandler(self.root_stac, body_id, citations)
798 dataset = DatasetHandler(
799 self.root_stac, body_id, volume_desc, citations
800 )
801 mission.set_next(plateform).set_next(instrument).set_next(dataset)
803 catalogs = list(self.catalogs.keys())
804 catalogs.sort(key=lambda val: StacPdsCollection.SORT_ORDER[val])
805 for catalog_name in catalogs:
806 logger.info(f"\tSTAC transformation of {catalog_name}")
807 catalog = self.catalogs[catalog_name]
808 mission.handle(catalog)