Coverage for pds_crawler/transformer/pds3_objects.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

283 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6""" 

7Module Name: 

8 pds3_objects 

9 

10Description: 

11 Create a STAC output of the PDS collection by the use of PDS3 catalogs. 

12 The code defines an implementation of the Chain of Responsibility design 

13 pattern using the abstract base class Handler to select the handler (mission, 

14 plateform, instrument, dataset) to convert to STAC 

15 

16Classes: 

17 Handler: 

18 Handler interface. 

19 AbstractHandler: 

20 Default chaining behavior. 

21 MissionHandler : 

22 Mission handler. 

23 PlateformHandler : 

24 Plateform handler. 

25 InstrumentHandler : 

26 Instrument handler. 

27 DatasetHandler : 

28 Dataset handler. 

29 StacPdsCollection : 

30 Converts PDS3 object from ODE archive to PDS STAC catalog (without items) 

31 

32Author: 

33 Jean-Christophe Malapert 

34""" 

35from __future__ import annotations 

36 

37import logging 

38import os 

39from abc import ABC 

40from abc import abstractmethod 

41from typing import Any 

42from typing import cast 

43from typing import Dict 

44from typing import List 

45from typing import Optional 

46from typing import Union 

47 

48import pystac 

49 

50from ..load import PdsParserFactory 

51from ..models import DataSetModel 

52from ..models import InstrumentHostModel 

53from ..models import InstrumentModel 

54from ..models import MissionModel 

55from ..models import PdsRegistryModel 

56from ..models import ReferencesModel 

57from ..models import VolumeModel 

58from ..models.pds_models import DataProducerModel 

59from ..models.pds_models import DataSupplierModel 

60 

61logger = logging.getLogger(__name__) 

62 

63 

64class Handler(ABC): 

65 """ 

66 The Handler interface declares a method for building the chain of handlers. 

67 It also declares a method for executing a request. 

68 """ 

69 

70 @abstractmethod 

71 def set_next(self, handler: Handler) -> Handler: 

72 pass 

73 

74 @abstractmethod 

75 def handle(self, request) -> None: 

76 pass 

77 

78 

79class AbstractHandler(Handler): 

80 """ 

81 The default chaining behavior can be implemented inside a base handler 

82 class. 

83 """ 

84 

85 _next_handler: Optional[Handler] = None 

86 

87 def set_next(self, handler: Handler) -> Handler: 

88 self._next_handler = handler 

89 return handler 

90 

91 def _is_exists( 

92 self, mission_stac: Union[pystac.Catalog, pystac.Collection] 

93 ) -> bool: 

94 """Checks if the catalog or collection exists 

95 

96 Args: 

97 catalog (pystac.Catalog): Stac Catalog or collection 

98 

99 Returns: 

100 bool: True when the catalog or collection ID is in the STAC catalog 

101 """ 

102 return mission_stac is not None 

103 

104 @abstractmethod 

105 def handle(self, request: Any) -> None: 

106 if self._next_handler: 

107 self._next_handler.handle(request) 

108 

109 

110class MissionHandler(AbstractHandler): 

111 """A handler for adding mission to a STAC catalog.""" 

112 

113 def __init__( 

114 self, 

115 catalog: pystac.Catalog, 

116 body_id: str, 

117 mission_id, 

118 citations: Optional[ReferencesModel], 

119 ): 

120 self.__catalog: pystac.Catalog = catalog 

121 self.__body_id: str = body_id 

122 self.__mission_id: str = mission_id 

123 self.__citations: Optional[ReferencesModel] = citations 

124 

125 @property 

126 def body_id(self) -> str: 

127 return self.__body_id 

128 

129 @property 

130 def mission_id(self) -> str: 

131 return self.__mission_id 

132 

133 @property 

134 def catalog(self) -> pystac.Catalog: 

135 return self.__catalog 

136 

137 @property 

138 def citations(self) -> Optional[ReferencesModel]: 

139 return self.__citations 

140 

141 def _is_must_be_updated( 

142 self, mission_stac: pystac.Catalog, mission: MissionModel 

143 ) -> bool: 

144 """Check if mission_stac must be updated. 

145 

146 To check if mission_stac must be updated, we need to check : 

147 - mission_stac is alreay on disk 

148 - the description is shorter than the mission description 

149 

150 Args: 

151 mission_stac (pystac.Catalog): mission in memory or in disk 

152 mission (MissionModel): new mission information 

153 

154 Returns: 

155 bool: True of the mission_stac on disk must be updated by mission information 

156 """ 

157 return os.path.exists(mission_stac.self_href) and len( 

158 mission_stac.description 

159 ) < len(mission.MISSION_INFORMATION.MISSION_DESC) 

160 

161 def _update(self, mission_stac: pystac.Catalog, mission: MissionModel): 

162 """Update the STAC catalog with mission model. 

163 

164 Args: 

165 mission_stac (pystac.Catalog): STAC catalog to update 

166 mission (MissionModel): New information 

167 """ 

168 mission_stac_new = mission.create_stac_catalog( 

169 self.body_id, self.citations 

170 ) 

171 mission_stac.title = mission_stac_new.title 

172 mission_stac.description = mission_stac_new.description 

173 mission_stac.stac_extensions = mission_stac_new.stac_extensions 

174 mission_stac.extra_fields = mission_stac_new.extra_fields 

175 mission_stac.save_object(include_self_link=False) 

176 

177 def handle(self, request: Any) -> None: 

178 if isinstance(request, MissionModel): 

179 # Get the mission from the STAC catalog 

180 mission: MissionModel = request 

181 mission_stac = cast( 

182 pystac.Catalog, 

183 self.catalog.get_child(self.mission_id, recursive=True), 

184 ) 

185 

186 if not self._is_exists(mission_stac): 

187 # mission is not found, so create it 

188 stac_mission = mission.create_stac_catalog( 

189 self.body_id, self.citations 

190 ) 

191 

192 # use mission_id of PDS collection to avoid interop problem 

193 stac_mission.id = self.mission_id 

194 

195 # Get the parent of mission: the solar body 

196 body_cat = cast( 

197 pystac.Catalog, self.catalog.get_child(self.body_id) 

198 ) 

199 

200 # Add the mission to the planet 

201 body_cat.add_child(stac_mission) 

202 logger.info(f"{stac_mission.id} added to {body_cat.id}") 

203 elif self._is_must_be_updated(mission_stac, mission): 

204 logger.info(f"{mission_stac.self_href} has been updated") 

205 self._update(mission_stac, mission) 

206 else: 

207 super().handle(request) 

208 

209 

210class PlateformHandler(AbstractHandler): 

211 """A handler for adding instrument host platforms to a STAC catalog. 

212 

213 Attributes: 

214 catalog (pystac.Catalog): The root catalog to add platforms to. 

215 body_id (str): The ID of the celestial body the platforms are associated with. 

216 mission_id (str): The ID of the mission the platforms are associated with. 

217 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms. 

218 """ 

219 

220 def __init__( 

221 self, 

222 catalog: pystac.Catalog, 

223 body_id: str, 

224 mission_id: str, 

225 citations: Optional[ReferencesModel], 

226 ): 

227 self.__catalog: pystac.Catalog = catalog 

228 self.__body_id: str = body_id 

229 self.__mission_id: str = mission_id 

230 self.__citations: Optional[ReferencesModel] = citations 

231 

232 @property 

233 def catalog(self) -> pystac.Catalog: 

234 """Gets the pystac Catalog object associated with this handler. 

235 

236 Returns: 

237 A pystac Catalog object. 

238 """ 

239 return self.__catalog 

240 

241 @property 

242 def body_id(self) -> str: 

243 """Gets the ID of the celestial body associated with this handler. 

244 

245 Returns: 

246 A string containing the celestial body ID. 

247 """ 

248 return self.__body_id 

249 

250 @property 

251 def mission_id(self) -> str: 

252 """Gets the ID of the mission associated with this handler. 

253 

254 Returns: 

255 A string containing the mission ID. 

256 """ 

257 return self.__mission_id 

258 

259 @property 

260 def citations(self) -> Optional[ReferencesModel]: 

261 """Gets the ReferencesModel object containing references for the mission and the platform. 

262 

263 Returns: 

264 An optional ReferencesModel object. 

265 """ 

266 return self.__citations 

267 

268 def _is_must_be_updated( 

269 self, plateform_stac: pystac.Catalog, plateform: InstrumentHostModel 

270 ) -> bool: 

271 """Determines whether the STAC catalog for the platform must be updated. 

272 

273 Args: 

274 plateform_stac: The pystac Catalog object associated with the platform. 

275 plateform: An InstrumentHostModel object representing the platform. 

276 

277 Returns: 

278 A boolean value indicating whether the STAC catalog for the platform must be updated. 

279 """ 

280 # Check if the STAC catalog for the platform exists and has a shorter description than 

281 # the one in the InstrumentHostModel object. 

282 return os.path.exists(plateform_stac.self_href) and len( 

283 plateform_stac.description 

284 ) < len(plateform.INSTRUMENT_HOST_INFORMATION.INSTRUMENT_HOST_DESC) 

285 

286 def _update( 

287 self, plateform_stac: pystac.Catalog, plateform: InstrumentHostModel 

288 ): 

289 """Updates the STAC catalog for the platform. 

290 

291 Args: 

292 plateform_stac: The pystac Catalog object associated with the platform. 

293 plateform: An InstrumentHostModel object representing the platform. 

294 """ 

295 # Create a new STAC catalog for the platform using the InstrumentHostModel object. 

296 plateform_stac_new = plateform.create_stac_catalog( 

297 self.body_id, self.citations 

298 ) 

299 

300 # Update the STAC catalog for the platform with the information from the new catalog. 

301 plateform_stac.title = plateform_stac_new.title 

302 plateform_stac.description = plateform_stac_new.description 

303 plateform_stac.stac_extensions = plateform_stac_new.stac_extensions 

304 plateform_stac.extra_fields = plateform_stac_new.extra_fields 

305 plateform_stac.save_object(include_self_link=False) 

306 

307 def _add_plateform_to_mission(self, plateform: InstrumentHostModel): 

308 """Adds a platform to the STAC catalog. 

309 

310 Args: 

311 plateform (InstrumentHostModel): The platform to be added to the STAC catalog. 

312 """ 

313 # Get the platform ID 

314 plateform_id: str = plateform.get_plateform_id() 

315 

316 # Get the platform STAC catalog, if it exists 

317 plateform_stac = cast( 

318 pystac.Catalog, 

319 self.catalog.get_child(plateform_id, recursive=True), 

320 ) 

321 

322 # Check if the platform exists in the catalog 

323 if not self._is_exists(plateform_stac): 

324 # Get the mission STAC catalog 

325 stac_mission = cast( 

326 pystac.Catalog, 

327 self.catalog.get_child(self.mission_id, recursive=True), 

328 ) 

329 logger.debug(f"Looking for {self.mission_id}: {stac_mission}") 

330 

331 # Create a STAC catalog for the platform 

332 stac_plateform = plateform.create_stac_catalog( 

333 self.body_id, self.citations 

334 ) 

335 

336 # Add the platform STAC catalog as a child of the mission STAC catalog 

337 stac_mission.add_child(stac_plateform) 

338 logger.debug(f"{stac_plateform.id} added to {stac_mission.id}") 

339 elif self._is_must_be_updated(plateform_stac, plateform): 

340 logger.info(f"{plateform_stac.self_href} has been updated") 

341 self._update(plateform_stac, plateform) 

342 

343 def _add_plateforms_to_mission( 

344 self, plateforms: List[InstrumentHostModel] 

345 ): 

346 """ 

347 Adds a list of platforms to the STAC catalog. 

348 

349 Args: 

350 plateforms (List[InstrumentHostModel]): The list of platforms to be added to the STAC catalog. 

351 """ 

352 # Iterate over the list of platforms and add each platform to the mission STAC catalog 

353 

354 for plateform in plateforms: 

355 self._add_plateform_to_mission(plateform) 

356 

357 def handle(self, request: Any) -> None: 

358 """ 

359 Handles the request to add platforms to the STAC catalog. 

360 

361 Args: 

362 request (Any): The request to add platforms to the STAC catalog. 

363 """ 

364 if isinstance(request, InstrumentHostModel): 

365 # If the request is a single platform, add it to the mission STAC catalog 

366 plateform: InstrumentHostModel = request 

367 self._add_plateform_to_mission(plateform) 

368 elif isinstance(request, list) and isinstance( 

369 request[0], InstrumentHostModel 

370 ): 

371 # If the request is a list of platforms, add all of them to the mission STAC catalog 

372 plateforms: List[InstrumentHostModel] = request 

373 self._add_plateforms_to_mission(plateforms) 

374 else: 

375 # If the request is neither a single platform nor a list of platforms, call the base class's handle method 

376 super().handle(request) 

377 

378 

379class InstrumentHandler(AbstractHandler): 

380 """A handler for adding instrument to a STAC catalog. 

381 

382 Attributes: 

383 catalog (pystac.Catalog): The root catalog to add platforms to. 

384 body_id (str): The ID of the celestial body the platforms are associated with. 

385 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms. 

386 """ 

387 

388 def __init__( 

389 self, 

390 catalog: pystac.Catalog, 

391 body_id: str, 

392 citations: Optional[ReferencesModel], 

393 ): 

394 """A class representing an Instrument handler that can add Instruments to a STAC catalog. 

395 

396 Args: 

397 catalog (pystac.Catalog): A Catalog object representing the STAC catalog. 

398 body_id (str): The ID of the celestial body. 

399 citations (Optional[ReferencesModel]): A ReferencesModel object representing the citations for the mission. 

400 """ 

401 self.__catalog: pystac.Catalog = catalog 

402 self.__body_id: str = body_id 

403 self.__citations: Optional[ReferencesModel] = citations 

404 

405 @property 

406 def catalog(self) -> pystac.Catalog: 

407 """Get the STAC catalog.""" 

408 return self.__catalog 

409 

410 @property 

411 def body_id(self) -> str: 

412 """Get the ID of the solar body.""" 

413 return self.__body_id 

414 

415 @property 

416 def citations(self) -> Optional[ReferencesModel]: 

417 """Get the citations for the mission.""" 

418 return self.__citations 

419 

420 def _is_must_be_updated( 

421 self, instrument_stac: pystac.Catalog, instrument: InstrumentModel 

422 ) -> bool: 

423 """Check if the given STAC Catalog for the Instrument must be updated. 

424 

425 Args: 

426 instrument_stac (pystac.Catalog): A Catalog object representing the STAC catalog for the Instrument. 

427 instrument (InstrumentModel): An InstrumentModel object representing the Instrument. 

428 

429 Returns: 

430 bool: True if the given STAC Catalog for the Instrument must be updated, False otherwise. 

431 """ 

432 return os.path.exists(instrument_stac.self_href) and len( 

433 instrument_stac.description 

434 ) < len(instrument.INSTRUMENT_INFORMATION.INSTRUMENT_DESC) 

435 

436 def _update( 

437 self, instrument_stac: pystac.Catalog, instrument: InstrumentModel 

438 ): 

439 """Update the given STAC Catalog for the Instrument with the provided InstrumentModel. 

440 

441 Args: 

442 instrument_stac (pystac.Catalog): A Catalog object representing the STAC catalog for the Instrument. 

443 instrument (InstrumentModel): An InstrumentModel object representing the Instrument. 

444 """ 

445 instrument_stac_new = instrument.create_stac_catalog( 

446 self.body_id, self.citations 

447 ) 

448 instrument_stac.title = instrument_stac_new.title 

449 instrument_stac.description = instrument_stac_new.description 

450 instrument_stac.stac_extensions = instrument_stac_new.stac_extensions 

451 instrument_stac.extra_fields = instrument_stac_new.extra_fields 

452 instrument_stac.save_object(include_self_link=False) 

453 

454 def _add_instrument_to_mission(self, instrument: InstrumentModel): 

455 """Add an InstrumentModel to the STAC Catalog of the mission. 

456 

457 Args: 

458 instrument (InstrumentModel): The InstrumentModel instance to add. 

459 """ 

460 # Retrieve the ID of the instrument 

461 instrument_id: str = instrument.get_instrument_id() 

462 # Retrieve the STAC Catalog for this instrument 

463 instrument_stac = cast( 

464 pystac.Catalog, 

465 self.catalog.get_child(instrument_id, recursive=True), 

466 ) 

467 # If the instrument doesn't exist yet in the Catalog, create it 

468 if not self._is_exists(instrument_stac): 

469 # Retrieve the ID of the platform on which the instrument is mounted 

470 plateform_id: str = instrument.get_plateform_id() 

471 

472 # Retrieve the STAC Catalog for this platform 

473 stac_plateform = cast( 

474 pystac.Catalog, 

475 self.catalog.get_child(plateform_id, recursive=True), 

476 ) 

477 logger.debug(f"Looking for {plateform_id}: {stac_plateform}") 

478 

479 # Create the STAC Catalog for the instrument 

480 stac_instrument = instrument.create_stac_catalog( 

481 self.body_id, self.citations 

482 ) 

483 

484 # Add the instrument Catalog to the platform Catalog 

485 stac_plateform.add_child(stac_instrument) 

486 logger.debug(f"{stac_instrument.id} added to {stac_plateform.id}") 

487 elif self._is_must_be_updated(instrument_stac, instrument): 

488 # If the instrument already exists in the Catalog, check if it needs to be updated 

489 logger.info(f"{instrument_stac.self_href} has been updated") 

490 self._update(instrument_stac, instrument) 

491 

492 def _add_instruments_to_mission(self, instruments: List[InstrumentModel]): 

493 """Add multiple InstrumentModel instances to the STAC Catalog of the mission. 

494 

495 Args: 

496 instruments (List[InstrumentModel]): A list of InstrumentModel instances to add. 

497 """ 

498 # Iterate over each InstrumentModel instance and add them to the Catalog 

499 for instrument in instruments: 

500 self._add_instrument_to_mission(instrument) 

501 

502 def handle(self, request: Any) -> None: 

503 """Handle the request to add an InstrumentModel or multiple InstrumentModel instances to the mission. 

504 

505 Args: 

506 request (Any): The request object. 

507 """ 

508 if isinstance(request, InstrumentModel): 

509 instrument: InstrumentModel = request 

510 self._add_instrument_to_mission(instrument) 

511 elif isinstance(request, list) and isinstance( 

512 request[0], InstrumentModel 

513 ): 

514 instruments: List[InstrumentModel] = request 

515 self._add_instruments_to_mission(instruments) 

516 else: 

517 super().handle(request) 

518 

519 

520class DatasetHandler(AbstractHandler): 

521 """A handler for adding collection to a STAC catalog. 

522 

523 Attributes: 

524 catalog (pystac.Catalog): The root catalog to add platforms to. 

525 body_id (str): The ID of the celestial body the platforms are associated with. 

526 citations (Optional[ReferencesModel]): An optional model of references to add to the platforms. 

527 data_supplier (Optional[DataSupplierModel]): An optional model of data supplier to add to the platforms. 

528 data_producer (Optional[DataProducerModel]): An optional model of data producer to add to the platforms. 

529 """ 

530 

531 def __init__( 

532 self, 

533 catalog: pystac.Catalog, 

534 body_id: str, 

535 volume_desc: VolumeModel, 

536 citations: Optional[ReferencesModel], 

537 ): 

538 """Initializes DatasetHandler. 

539 

540 Args: 

541 catalog: The root catalog to add collections to. 

542 body_id: The ID of the celestial body the collections are associated with. 

543 volume_desc: A model of the volume description for the collections. 

544 citations: An optional model of references to add to the collections. 

545 """ 

546 self.__catalog: pystac.Catalog = catalog 

547 self.__body_id: str = body_id 

548 self.__citations: Optional[ReferencesModel] = citations 

549 self.__data_supplier: Optional[ 

550 DataSupplierModel 

551 ] = volume_desc.DATA_SUPPLIER 

552 self.__data_producer: DataProducerModel = volume_desc.DATA_PRODUCER 

553 

554 @property 

555 def catalog(self) -> pystac.Catalog: 

556 """pystac.Catalog: The root catalog to add collections to.""" 

557 return self.__catalog 

558 

559 @property 

560 def body_id(self) -> str: 

561 """The ID of the celestial body, the collections are associated with.""" 

562 return self.__body_id 

563 

564 @property 

565 def data_supplier(self) -> Optional[DataSupplierModel]: 

566 """A model of the data supplier for the collections.""" 

567 return self.__data_supplier 

568 

569 @property 

570 def data_producer(self) -> DataProducerModel: 

571 """A model of the data producer for the collections.""" 

572 return self.__data_producer 

573 

574 @property 

575 def citations(self) -> Optional[ReferencesModel]: 

576 """A model of references to add to the collections.""" 

577 return self.__citations 

578 

579 def _is_must_be_updated( 

580 self, dataset_stac: pystac.Collection, dataset: DataSetModel 

581 ) -> bool: 

582 """Check whether a dataset in the catalog must be updated based on its description. 

583 

584 Args: 

585 dataset_stac: A pystac Collection object representing the dataset. 

586 dataset: A DataSetModel object representing the dataset. 

587 

588 Returns: 

589 True if the dataset's description has changed and the dataset must be updated, False otherwise. 

590 """ 

591 # Get the description of the dataset from its DATA_SET_INFORMATION object 

592 description: Optional[ 

593 str 

594 ] = dataset.DATA_SET_INFORMATION._get_description() 

595 # Check if the STAC collection already exists and if the dataset's description is longer than the existing one 

596 return ( 

597 os.path.exists(dataset_stac.self_href) 

598 and description is not None 

599 and len(dataset_stac.description) < len(description) 

600 ) 

601 

602 def _update(self, dataset_stac: pystac.Collection, dataset: DataSetModel): 

603 """ 

604 Update a dataset in the catalog with new metadata. 

605 

606 Args: 

607 dataset_stac: A pystac Collection object representing the dataset to be updated. 

608 dataset: A DataSetModel object representing the new metadata for the dataset. 

609 """ 

610 # Create a new STAC collection for the dataset 

611 dataset_stac_new = dataset.create_stac_collection( 

612 self.body_id, 

613 self.citations, 

614 self.data_supplier, 

615 self.data_producer, 

616 ) 

617 

618 # Update the existing STAC collection with the new information 

619 dataset_stac.title = dataset_stac_new.title 

620 dataset_stac.description = dataset_stac_new.description 

621 dataset_stac.stac_extensions = dataset_stac_new.stac_extensions 

622 dataset_stac.extra_fields = dataset_stac_new.extra_fields 

623 dataset_stac.save_object(include_self_link=False) 

624 

625 def _add_dataset_to_instrument(self, dataset: DataSetModel): 

626 """ 

627 Add a dataset to the appropriate instrument catalog in the overall catalog. 

628 

629 Args: 

630 dataset: A DataSetModel object representing the dataset to be added. 

631 """ 

632 # Get the ID of the dataset's STAC collection 

633 dataset_id: str = dataset.get_collection_id() 

634 

635 # Get the existing STAC collection for the dataset (if it exists) 

636 dataset_stac = cast( 

637 pystac.Collection, 

638 self.catalog.get_child(dataset_id, recursive=True), 

639 ) 

640 

641 # If the dataset doesn't exist in the catalog, create a new STAC collection for it and add it to the appropriate instrument(s) 

642 if not self._is_exists(dataset_stac): 

643 stac_dataset = dataset.create_stac_collection( 

644 self.body_id, 

645 self.citations, 

646 self.data_supplier, 

647 self.data_producer, 

648 ) 

649 # Get the ID(s) of the instrument(s) associated with the dataset 

650 instrument_ids: Union[ 

651 str, List[str] 

652 ] = dataset.DATA_SET_HOST.get_instrument_id() 

653 

654 # If there is only one instrument ID, add the dataset to that instrument's catalog 

655 if isinstance(instrument_ids, str): 

656 instrument_id: str = cast(str, instrument_ids) 

657 stac_instrument = cast( 

658 pystac.Catalog, 

659 self.catalog.get_child(instrument_id, recursive=True), 

660 ) 

661 logger.debug(f"Looking for {instrument_id}: {stac_instrument}") 

662 stac_instrument.add_child(stac_dataset) 

663 logger.debug( 

664 f"{stac_dataset.id} added to {stac_instrument.id}" 

665 ) 

666 

667 # If there are multiple instrument IDs, add the dataset to each instrument's catalog 

668 else: 

669 for instrument_id in instrument_ids: 

670 stac_instrument = cast( 

671 pystac.Catalog, 

672 self.catalog.get_child(instrument_id, recursive=True), 

673 ) 

674 logger.debug( 

675 f"Looking for {instrument_id}: {stac_instrument}" 

676 ) 

677 stac_instrument.add_child(stac_dataset) 

678 logger.debug( 

679 f"{stac_dataset.id} added to {stac_instrument.id}" 

680 ) 

681 

682 # If the dataset already exists in the catalog and needs to be updated, update it 

683 elif self._is_must_be_updated(dataset_stac, dataset): 

684 logger.info(f"{dataset_stac.self_href} has been updated") 

685 self._update(dataset_stac, dataset) 

686 

687 def _add_datasets_to_instrument(self, datasets: List[DataSetModel]): 

688 """Add multiple DataSetModel instances to the STAC Catalog of the instrument. 

689 

690 Args: 

691 datasets (List[DataSetModel]): A list of DataSetModel instances to add. 

692 """ 

693 # Iterate over each DataSetModel instance and add them to the Catalog 

694 for dataset in datasets: 

695 self._add_dataset_to_instrument(dataset) 

696 

697 def handle(self, request: Any) -> None: 

698 """Handle the request to add an DataSetModel or multiple DataSetModel instances to the instrument. 

699 

700 Args: 

701 request (Any): The request object. 

702 """ 

703 if isinstance(request, DataSetModel): 

704 dataset: DataSetModel = request 

705 self._add_dataset_to_instrument(dataset) 

706 elif isinstance(request, list) and isinstance( 

707 request[0], DataSetModel 

708 ): 

709 datasets: List[DataSetModel] = request 

710 self._add_datasets_to_instrument(datasets) 

711 else: 

712 super().handle(request) 

713 

714 

715class StacPdsCollection: 

716 # Dictionary that defines the sort order of PDS catalog files 

717 SORT_ORDER = { 

718 PdsParserFactory.FileGrammary.MISSION_CATALOG.name: 0, 

719 PdsParserFactory.FileGrammary.INSTRUMENT_HOST_CATALOG.name: 1, 

720 PdsParserFactory.FileGrammary.INSTRUMENT_CATALOG.name: 2, 

721 PdsParserFactory.FileGrammary.DATA_SET_CATALOG.name: 3, 

722 PdsParserFactory.FileGrammary.DATA_SET_MAP_PROJECTION_CATALOG.name: 4, 

723 PdsParserFactory.FileGrammary.PERSONNEL_CATALOG.name: 5, 

724 PdsParserFactory.FileGrammary.REFERENCE_CATALOG.name: 6, 

725 PdsParserFactory.FileGrammary.VOL_DESC.name: 7, 

726 "collection": 8, 

727 } 

728 

729 def __init__(self, root_stac: pystac.Catalog): 

730 # Initializes the class with a root STAC catalog 

731 self.__root_stac: pystac.Catalog = root_stac 

732 self.__catalogs: Dict[str, Any] = dict() 

733 

734 def _is_already_exists(self, id: str) -> bool: 

735 """Checks if the catalog or collection ID is in the STAC catalog 

736 

737 Args: 

738 id (str): catalog or collection ID 

739 

740 Returns: 

741 bool: True when the catalog or collection ID is in the STAC catalog 

742 """ 

743 # Returns a boolean indicating if a catalog or collection ID exists in the STAC catalog 

744 return self.root_stac.get_child(id, recursive=True) is not None 

745 

746 @property 

747 def catalogs(self) -> Dict[str, Any]: 

748 return self.__catalogs 

749 

750 @catalogs.setter 

751 def catalogs(self, value: Dict[str, Any] = dict()): 

752 self.__catalogs = value 

753 

754 @property 

755 def root_stac(self) -> pystac.Catalog: 

756 return self.__root_stac 

757 

758 def to_stac(self): 

759 # Get the PDS3 reference catalog 

760 ref_catalog_name: str = ( 

761 PdsParserFactory.FileGrammary.REFERENCE_CATALOG.name 

762 ) 

763 citations: ReferencesModel = cast( 

764 ReferencesModel, self.catalogs.get(ref_catalog_name) 

765 ) 

766 

767 # Get the PDS3 collection 

768 pds_collection: PdsRegistryModel = cast( 

769 PdsRegistryModel, self.catalogs.get("collection") 

770 ) 

771 

772 # Get the volume description catalog that contains a reference 

773 # to others catalogs 

774 vol_catalog_name: str = PdsParserFactory.FileGrammary.VOL_DESC.name 

775 volume_desc: VolumeModel = cast( 

776 VolumeModel, self.catalogs.get(vol_catalog_name) 

777 ) 

778 if volume_desc is None: 

779 # If volume description is not available, return 

780 return 

781 

782 # Get the body ID and mission ID from the PDS collection 

783 body_id: str = pds_collection.get_body_id() 

784 mission_id: str = pds_collection.get_mission_id() 

785 

786 if not self._is_already_exists(body_id): 

787 # If body catalog does not exist, create it and add it to the root catalog 

788 pystac_body_cat = pds_collection.create_stac_body_catalog() 

789 self.root_stac.add_child(pystac_body_cat) 

790 

791 mission = MissionHandler( 

792 self.root_stac, body_id, mission_id, citations 

793 ) 

794 plateform = PlateformHandler( 

795 self.root_stac, body_id, mission_id, citations 

796 ) 

797 instrument = InstrumentHandler(self.root_stac, body_id, citations) 

798 dataset = DatasetHandler( 

799 self.root_stac, body_id, volume_desc, citations 

800 ) 

801 mission.set_next(plateform).set_next(instrument).set_next(dataset) 

802 

803 catalogs = list(self.catalogs.keys()) 

804 catalogs.sort(key=lambda val: StacPdsCollection.SORT_ORDER[val]) 

805 for catalog_name in catalogs: 

806 logger.info(f"\tSTAC transformation of {catalog_name}") 

807 catalog = self.catalogs[catalog_name] 

808 mission.handle(catalog)