Coverage for pds_crawler/load/database.py: 90%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

300 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6""" 

7Module Name: 

8 database 

9 

10Description: 

11 The database module provides capabilities to store and load the PDS insformation. 

12 The stored information comes from the models: 

13 - PdsRegistryModel : collection information 

14 - List[str] : list of precomputed URL to download all PDS data from ODE webservice 

15 - ... 

16 

17Classes: 

18 Database : 

19 Database implementation 

20 StacStorage: 

21 Storage for STAC 

22 PdsStorage: 

23 PDS storage 

24 PdsCollectionStorage 

25 PDS collection storage 

26 

27 

28.. uml:: 

29 

30 class Database { 

31 - __base_directory: str 

32 - __pds_dir: str 

33 - __stac_dir: str 

34 - __hdf5_name: str 

35 - __stac_storage: StacStorage 

36 - __pds_storage: PdsStorage 

37 - __hdf5_storage: Hdf5Storage 

38 

39 + base_directory: str 

40 + pds_dir: str 

41 + stac_dir: str 

42 + hdf5_name: str 

43 + stac_storage: StacStorage 

44 + pds_storage: PdsStorage 

45 + hdf5_storage: Hdf5Storage 

46 

47 + __init__(base_directory: str) -> None 

48 + init_storage() -> None 

49 + reset_storage() -> None 

50 + __repr__() -> str 

51 } 

52 

53 class PdsCollectionStorage { 

54 - __directory: str 

55 +__init__(self, directory: str) -> None 

56 + directory: str 

57 + list_files() : List[str] 

58 + get_volume_description() : VolumeModel 

59 + list_catalogs() : Dict[str, Any] 

60 + get_catalog(file: str, catalogue_type: PdsParserFactory.FileGrammary) : Any 

61 + download(urls: List[str], nb_workers: int = 3, timeout: int = 180, time_sleep: int = 1) -> None 

62 + __repr__() : str 

63 } 

64 

65 class PdsStorage{ 

66 + __init__(base_directory: str) -> None 

67 -__directory: str 

68 +init_storage_directory() 

69 +reset_storage() 

70 +get_pds_storage_for(pds_collection: PdsRegistryModel) -> PdsCollectionStorage 

71 +directory: str 

72 +__repr__() -> str 

73 } 

74 

75 class Hdf5Storage{ 

76 - HDF_SEP: str = "/" 

77 - DS_URLS: str = "urls" 

78 - __name 

79 + name 

80 + init_storage(name:str) 

81 + reset_storage() 

82 - _has_changed(store_db:Any, pds_collection:PdsRegistryModel):bool 

83 - _has_attribute_in_group(node:Any):bool 

84 - _save_collection(pds_collection:PdsRegistryModel, f:Any):bool 

85 - _read_and_convert_attributes(node:Any):Dict[str,Any] 

86 - _save_urls_in_new_dataset(self, pds_collection: PdsRegistryModel, urls: List[str]) 

87 - _save_urls_in_existing_dataset(self, pds_collection: PdsRegistryModel, urls: List[str]) 

88 + save_collection(pds_collection:PdsRegistryModel): bool 

89 + save_collections(collections_pds:List[PdsRegistryModel]): bool 

90 + load_collections(body:Optional[str]=None, dataset_id:Optional[str]=None):List[PdsRegistryModel] 

91 + save_urls(pds_collection: PdsRegistryModel, urls: List[str]) 

92 + load_urls(pds_collection: PdsRegistryModel) -> List[str] 

93 + static define_group_from(words: List[str]) -> str 

94 } 

95 

96 class StacStorage { 

97 -__directory: str 

98 -__root_catalog: pystac.Catalog 

99 -__layout: LargeDataVolumeStrategy 

100 +directory: str 

101 +root_catalog: pystac.Catalog 

102 +__init__(directory: str) 

103 -_load_root_catalog() 

104 +init_storage_directory() 

105 +reset_storage() 

106 +refresh() 

107 +item_exists(record: PdsRecordModel) -> bool 

108 +catalog_normalize_and_save() 

109 +root_normalize_and_save(catalog: pystac.Catalog) 

110 +normalize_and_save(cat_or_coll: Union[pystac.Collection, pystac.Catalog]) 

111 +__repr__() -> str 

112 } 

113 

114 Database *-- PdsStorage 

115 Database *-- Hdf5Storage 

116 Database *-- StacStorage 

117 PdsStorage *-- PdsCollectionStorage 

118 

119Author: 

120 Jean-Christophe Malapert 

121""" 

122import logging 

123import os 

124import re 

125import shutil 

126from typing import Any 

127from typing import cast 

128from typing import Dict 

129from typing import List 

130from typing import Optional 

131from typing import Union 

132 

133import h5py 

134import numpy as np 

135import pystac 

136 

137from ..models import Labo 

138from ..models import PdsRecordModel 

139from ..models import PdsRegistryModel 

140from ..models import PdsspModel 

141from ..models import VolumeModel 

142from ..utils import Locking 

143from ..utils import parallel_requests 

144from ..utils import UtilsMonitoring 

145from .pds_objects_parser import PdsParserFactory 

146from .strategy import LargeDataVolumeStrategy 

147 

148logger = logging.getLogger(__name__) 

149 

150 

151class StacStorage: 

152 """STAC storage.""" 

153 

154 def __init__(self, directory: str): 

155 """initializes several private properties, including a pystac.Catalog 

156 object representing the root catalog. 

157 

158 Args: 

159 directory (str): base directory 

160 """ 

161 self.__directory = directory 

162 self.__layout = LargeDataVolumeStrategy() 

163 logger.debug( 

164 f"[StacStorage] Initialize StacDorage with diretory={self.__directory}" 

165 ) 

166 self.init_storage_directory() 

167 self._load_root_catalog() 

168 

169 @property 

170 def directory(self) -> str: 

171 """Returns the directory path. 

172 

173 Returns: 

174 str: the directory path 

175 """ 

176 return self.__directory 

177 

178 @property 

179 def root_catalog(self) -> Optional[pystac.Catalog]: 

180 """Returns the root catalog as a pystac.Catalog object. 

181 

182 Returns: 

183 Optional[pystac.Catalog]: the root catalog as a pystac.Catalog object 

184 """ 

185 return self.__root_catalog 

186 

187 def _load_root_catalog(self): 

188 """Load root STAC catalog""" 

189 try: 

190 self.__root_catalog = pystac.Catalog.from_file( 

191 os.path.join(self.directory, "catalog.json") 

192 ) 

193 logger.debug("[StacStorage] root_catalog found") 

194 except FileNotFoundError: 

195 self.__root_catalog = None 

196 logger.debug("[StacStorage] root_catalog not found, create one") 

197 self.__root_catalog = pystac.Catalog( 

198 id=PdsspModel.create_lab_id(Labo.ID), 

199 title="Planetary Data System", 

200 description="Georeferenced data extracted from ode.rsl.wustl.edu", 

201 ) 

202 self.__root_catalog.add_link( 

203 pystac.Link( 

204 rel=pystac.RelType.PREVIEW, 

205 target="https://pdsmgmt.gsfc.nasa.gov/images/PDS_Logo.png", 

206 title="PDS logo", 

207 media_type=pystac.MediaType.PNG, 

208 ) 

209 ) 

210 self.root_normalize_and_save(self.__root_catalog) 

211 

212 def init_storage_directory(self): 

213 """Creates the storage directory if it doesn't exist.""" 

214 os.makedirs(self.directory, exist_ok=True) 

215 

216 def reset_storage(self): 

217 """Removes the storage directory and all its contents.""" 

218 shutil.rmtree(self.directory) 

219 

220 def refresh(self): 

221 """Reloads the root catalog from the catalog.json file.""" 

222 self._load_root_catalog() 

223 

224 @UtilsMonitoring.io_display(level=logging.DEBUG) 

225 def item_exists(self, record: PdsRecordModel) -> bool: 

226 """Returns True if an item (represented by a PdsRecordModel object) exists in 

227 the storage directory, otherwise False. 

228 

229 Args: 

230 record (PdsRecordModel): item 

231 

232 Returns: 

233 bool: True if an item (represented by a PdsRecordModel object) exists in 

234 the storage directory, otherwise False 

235 """ 

236 directory: str = os.path.join( 

237 self.directory, 

238 record.get_body_id(), 

239 record.get_mission_id(), 

240 record.get_plateform_id(), 

241 record.get_instrument_id(), 

242 record.get_collection_id(), 

243 record.get_id(), 

244 ) 

245 logger.debug(f"[StacStorage] item_exists in {directory}") 

246 return os.path.exists(directory) 

247 

248 @UtilsMonitoring.io_display(level=logging.DEBUG) 

249 def catalog_normalize_and_save(self): 

250 """Normalizes the root catalog and saves it to disk.""" 

251 if self.root_catalog is None: 

252 logger.debug( 

253 "[StacStorage] catalog_normalize_and_save - root_catalog not found" 

254 ) 

255 self.refresh() 

256 if self.root_catalog is None: 

257 logger.debug( 

258 "[StacStorage] catalog_normalize_and_save - root_catalog not found" 

259 ) 

260 raise ValueError("Cannot load STAC root") 

261 self.root_catalog.normalize_and_save( 

262 self.directory, 

263 catalog_type=pystac.CatalogType.SELF_CONTAINED, 

264 strategy=self.__layout.get_strategy(), 

265 ) 

266 

267 @UtilsMonitoring.io_display(level=logging.DEBUG) 

268 def root_normalize_and_save(self, catalog: pystac.Catalog): 

269 """Normalizes the given catalog and saves it to disk using the root 

270 directory as the output directory.""" 

271 catalog.normalize_and_save( 

272 self.directory, 

273 catalog_type=pystac.CatalogType.SELF_CONTAINED, 

274 strategy=self.__layout.get_strategy(), 

275 ) 

276 

277 @UtilsMonitoring.io_display(level=logging.DEBUG) 

278 def normalize_and_save( 

279 self, cat_or_coll: Union[pystac.Collection, pystac.Catalog] 

280 ): 

281 """Normalizes the given catalog or collection and saves it to disk.""" 

282 cat_or_coll.normalize_hrefs( 

283 cat_or_coll.self_href, 

284 strategy=self.__layout.get_strategy(), 

285 ) 

286 cat_or_coll.save() 

287 

288 def __repr__(self) -> str: 

289 """Returns a string representation of the StacStorage object. 

290 

291 Returns: 

292 str: a string representation of the StacStorage object. 

293 """ 

294 return f"StacStorage({self.directory})" 

295 

296 

297class PdsCollectionStorage: 

298 """Storage for a Planetary Data System (PDS) collection..""" 

299 

300 def __init__(self, directory: str): 

301 """Initializes the class instance with the provided directory string argument. 

302 

303 This directory is created if it does not already exist. 

304 

305 Args: 

306 directory (str): diretory 

307 """ 

308 self.__directory = directory 

309 os.makedirs(directory, exist_ok=True) 

310 

311 @property 

312 def directory(self) -> str: 

313 """Returns the directory path. 

314 

315 Returns: 

316 str: the directory path 

317 """ 

318 return self.__directory 

319 

320 def list_files(self) -> List[str]: 

321 """Returns a list of filenames in the directory that are regular files. 

322 

323 Returns: 

324 List[str]: list of filenames 

325 """ 

326 files = os.listdir(self.directory) 

327 return [ 

328 f for f in files if os.path.isfile(os.path.join(self.directory, f)) 

329 ] 

330 

331 def list_records_files(self) -> List[str]: 

332 """Returns a list of filenames in the directory that are regular files coming from PdsRecordsWs. 

333 

334 Returns: 

335 List[str]: list of filenames 

336 """ 

337 files = os.listdir(self.directory) 

338 return [ 

339 f 

340 for f in files 

341 if os.path.isfile(os.path.join(self.directory, f)) 

342 and f.endswith(".json") 

343 ] 

344 

345 @UtilsMonitoring.io_display(level=logging.DEBUG) 

346 def get_volume_description(self, timeout: int = 30) -> VolumeModel: 

347 """Returns a VolumeModel object containing the parsed contents of the "voldesc.cat" file in the directory 

348 

349 Args: 

350 timeout (int, optional): parser timeout in seconds. Defaults to 30 

351 

352 Returns: 

353 VolumeModel: Voldesc.cat object 

354 """ 

355 voldesc: str = os.path.join(self.directory, "voldesc.cat") 

356 return PdsParserFactory.parse( 

357 uri=voldesc, 

358 type_file=PdsParserFactory.FileGrammary.VOL_DESC, 

359 timeout=timeout, 

360 ) 

361 

362 @UtilsMonitoring.io_display(level=logging.DEBUG) 

363 def list_catalogs(self) -> Dict[str, Any]: 

364 """Returns a dictionary containing the contents of the CATALOG object 

365 in the VolumeModel returned by get_volume_description 

366 

367 Returns: 

368 Dict[str, Any]: a dictionary containing the contents of the CATALOG 

369 object in the VolumeModel 

370 """ 

371 volume: VolumeModel = self.get_volume_description() 

372 return { 

373 key: volume.CATALOG.__dict__[key] 

374 for key in volume.CATALOG.__dict__.keys() 

375 } 

376 

377 @UtilsMonitoring.io_display(level=logging.DEBUG) 

378 def get_catalog( 

379 self, 

380 file: str, 

381 catalogue_type: PdsParserFactory.FileGrammary, 

382 timeout: int = 30, 

383 ) -> Any: 

384 """Returns the parsed contents of a PDS catalogue file specified by 

385 the file argument, using the catalogue_type argument to specify the 

386 type of catalogue parser to use. 

387 

388 Args: 

389 file (str): PDS catalog 

390 catalogue_type (PdsParserFactory.FileGrammary): Information about the catalog 

391 timeout (int, optional): parser timeout. Defaults to 30 

392 

393 Returns: 

394 Any: the parsed contents of a PDS catalogue file 

395 """ 

396 filename: str = os.path.join(self.directory, file.lower()) 

397 return PdsParserFactory.parse( 

398 uri=filename, type_file=catalogue_type, timeout=timeout 

399 ) 

400 

401 @UtilsMonitoring.io_display(level=logging.DEBUG) 

402 def download( 

403 self, 

404 urls: List[str], 

405 nb_workers: int = 3, 

406 timeout: int = 180, 

407 time_sleep: int = 1, 

408 progress_bar: bool = True, 

409 ): 

410 """Download URLs in parallel in the collection storage. 

411 

412 Args: 

413 urls (List[str]): List of URLs to download 

414 nb_workers (int, optional): nb workers. Defaults to 3. 

415 timeout (int, optional): timeout in seconds. Defaults to 180 

416 time_sleep (int, optional): sleep. Defaults to 1. 

417 progress_bar (bool, optional): Set progress_bar. Defaults to True. 

418 """ 

419 parallel_requests( 

420 self.directory, 

421 urls, 

422 nb_workers=nb_workers, 

423 timeout=timeout, 

424 time_sleep=time_sleep, 

425 progress_bar=progress_bar, 

426 ) 

427 

428 def __repr__(self) -> str: 

429 """Returns a string representation of the class instance.""" 

430 return f"PdsCollectionStorage({self.directory})" 

431 

432 

433class PdsStorage: 

434 """Main PDS storage.""" 

435 

436 def __init__(self, directory: str): 

437 self.__directory: str = directory 

438 self.init_storage_directory() 

439 

440 def init_storage_directory(self): 

441 os.makedirs(self.directory, exist_ok=True) 

442 

443 def reset_storage(self): 

444 shutil.rmtree(self.directory) 

445 

446 @UtilsMonitoring.io_display(level=logging.DEBUG) 

447 def get_pds_storage_for( 

448 self, pds_collection: PdsRegistryModel 

449 ) -> PdsCollectionStorage: 

450 relative_dir: str = Hdf5Storage.define_group_from( 

451 [ 

452 pds_collection.ODEMetaDB.lower(), 

453 pds_collection.IHID, 

454 pds_collection.IID, 

455 pds_collection.PT, 

456 pds_collection.DataSetId, 

457 ] 

458 ) 

459 directory = os.path.join(self.directory, relative_dir) 

460 return PdsCollectionStorage(directory) 

461 

462 @property 

463 def directory(self) -> str: 

464 return self.__directory 

465 

466 def __repr__(self) -> str: 

467 return f"PdsStorage({self.directory})" 

468 

469 

470class Hdf5Storage: 

471 HDF_SEP: str = "/" 

472 DS_URLS: str = "urls" 

473 

474 def __init__(self, name: str): 

475 self.__name = name 

476 self.init_storage(self.__name) 

477 

478 @property 

479 def name(self): 

480 return self.__name 

481 

482 def init_storage(self, name: str): 

483 Locking.lock_file(name) 

484 with h5py.File(name, "a") as f: 

485 metadata = f.require_group("metadata") 

486 if "author" not in metadata.attrs.keys(): 

487 metadata.attrs["author"] = "Jean-Christophe Malapert" 

488 Locking.unlock_file(name) 

489 

490 def reset_storage(self): 

491 os.remove(self.name) 

492 

493 @UtilsMonitoring.io_display(level=logging.DEBUG) 

494 def _has_changed( 

495 self, store_db: Any, pds_collection: PdsRegistryModel 

496 ) -> bool: 

497 """Check if the same version of the PDS collection has already been stored. 

498 

499 To check the if the version is an update or not, the comparison is performed based on 

500 the number of products and the existing keywords `NumberProducts` in store_db. 

501 

502 Args: 

503 store_db (Any): node in HDF5 

504 pds_collection (PdsRegistryModel): PDS collection 

505 

506 Returns: 

507 bool: True when the collection is new or must be updated otherwise False 

508 """ 

509 return not ( 

510 "NumberProducts" in store_db.attrs 

511 and pds_collection.NumberProducts 

512 == store_db.attrs["NumberProducts"] 

513 ) 

514 

515 def _has_attribute_in_group(self, node: Any) -> bool: 

516 """Check if it exists attributes in the node. 

517 

518 Args: 

519 node (Any): the HDF5 node 

520 

521 Returns: 

522 bool: True when both the node is a h5py.Group and node has attributes 

523 """ 

524 return isinstance(node, h5py.Group) and node.attrs # type: ignore 

525 

526 @UtilsMonitoring.io_display(level=logging.DEBUG) 

527 def _save_collection( 

528 self, pds_collection: PdsRegistryModel, f: Any 

529 ) -> bool: 

530 is_saved: bool 

531 group_path: str = Hdf5Storage.define_group_from( 

532 [ 

533 pds_collection.ODEMetaDB.lower(), 

534 pds_collection.IHID, 

535 pds_collection.IID, 

536 pds_collection.PT, 

537 pds_collection.DataSetId, 

538 ] 

539 ) 

540 store_hdf = f.get(group_path, None) 

541 if store_hdf is None: 

542 logger.debug( 

543 "[Hdf5Storage] _save_collection - store_hdf does not exist" 

544 ) 

545 store_hdf = f.create_group(group_path) 

546 pds_collection.to_hdf5(store_hdf) 

547 is_saved = True 

548 elif self._has_changed(store_hdf, pds_collection): 

549 logger.info("[Hdf5Storage] _save_collection - Update HDF5") 

550 pds_collection.to_hdf5(store_hdf) 

551 is_saved = True 

552 else: 

553 logger.warning( 

554 f"{pds_collection} has not changed, skip the record in the database" 

555 ) 

556 is_saved = False 

557 return is_saved 

558 

559 def _read_and_convert_attributes(self, node: Any) -> Dict[str, Any]: 

560 """Read and converts convert attributs. 

561 

562 Args: 

563 node (Any): HDF5 group or dataset 

564 

565 Returns: 

566 Dict[str, Any]: attributs as dictionary 

567 """ 

568 group_attributes = dict(node.attrs) 

569 for key in group_attributes.keys(): 

570 if isinstance(group_attributes[key], np.bytes_): 

571 group_attributes[key] = eval( 

572 group_attributes[key].decode("utf-8") 

573 ) 

574 return group_attributes 

575 

576 @UtilsMonitoring.io_display(level=logging.DEBUG) 

577 def save_collection(self, pds_collection: PdsRegistryModel) -> bool: 

578 """Save the PDS collection in the database. 

579 

580 Args: 

581 pds_collection (PdsRegistryModel): the PDS collection 

582 

583 Returns: 

584 bool: True is the collection is saved otherwise False 

585 """ 

586 is_saved: bool 

587 Locking.lock_file(self.name) 

588 with h5py.File(self.name, "a") as f: 

589 is_saved = self._save_collection(pds_collection, f) 

590 Locking.unlock_file(self.name) 

591 return is_saved 

592 

593 @UtilsMonitoring.io_display(level=logging.DEBUG) 

594 def save_collections( 

595 self, collections_pds: List[PdsRegistryModel] 

596 ) -> bool: 

597 """Save all the PDS collections in the database. 

598 

599 Args: 

600 collections_pds (List[PdsRegistryModel]): the collections. 

601 

602 Returns: 

603 bool: True is the collection is saved otherwise False 

604 """ 

605 is_saved = True 

606 Locking.lock_file(self.name) 

607 with h5py.File(self.name, "a") as f: 

608 for pds_collection in collections_pds: 

609 is_saved = is_saved & self._save_collection(pds_collection, f) 

610 Locking.unlock_file(self.name) 

611 return is_saved 

612 

613 @UtilsMonitoring.io_display(level=logging.DEBUG) 

614 def load_collections( 

615 self, body: Optional[str] = None, dataset_id: Optional[str] = None 

616 ) -> List[PdsRegistryModel]: 

617 """Load all collections metadata from the database. 

618 

619 Args: 

620 body (Optional[str]): name of the body to get. Defaults to None 

621 dataset_id (Optional[str]): Dataset ID , used to filtr the collection. Defaults to None 

622 

623 Returns: 

624 List[PdsRegistryModel]: All PDS collections metadata 

625 """ 

626 pds_collections: List[PdsRegistryModel] = list() 

627 

628 def extract_attributes(name: str, node: Any): 

629 if name != "metadata" and self._has_attribute_in_group(node): 

630 dico: Dict[str, Any] = self._read_and_convert_attributes(node) 

631 pds_collections.append(PdsRegistryModel.from_dict(dico)) 

632 

633 Locking.lock_file(self.name) 

634 with h5py.File(self.name, "r") as f: 

635 f.visititems(extract_attributes) 

636 Locking.unlock_file(self.name) 

637 

638 # filter pds_collection by body name 

639 pds_registry_models = [ 

640 pds_registry_model 

641 for pds_registry_model in pds_collections 

642 if body is None 

643 or pds_registry_model.get_body().upper() == body.upper() 

644 ] 

645 

646 # filter pds_collections by dataset ID 

647 pds_registry_model = [ 

648 pds_registry_model 

649 for pds_registry_model in pds_registry_models 

650 if dataset_id is None 

651 or pds_registry_model.DataSetId.upper() == dataset_id.upper() 

652 ] 

653 

654 return pds_registry_model 

655 

656 def _save_urls_in_new_dataset( 

657 self, pds_collection: PdsRegistryModel, urls: List[str] 

658 ): 

659 """Save URLS in a new dataset, which is represented by pds_collection 

660 

661 Args: 

662 pds_collection (PdsRegistryModel): PDS collection, used to define the name of the dataset 

663 urls (List[str]): URLs to save 

664 """ 

665 Locking.lock_file(self.name) 

666 with h5py.File(self.name, mode="a") as f: 

667 group_path: str = Hdf5Storage.define_group_from( 

668 [ 

669 pds_collection.ODEMetaDB.lower(), 

670 pds_collection.IHID, 

671 pds_collection.IID, 

672 pds_collection.PT, 

673 pds_collection.DataSetId, 

674 ] 

675 ) 

676 

677 max_shape = (None,) 

678 shape = (len(urls),) 

679 dset = f.create_dataset( 

680 name=group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS, 

681 shape=shape, 

682 maxshape=max_shape, 

683 dtype=h5py.special_dtype(vlen=str), 

684 chunks=True, 

685 ) 

686 dset[:] = urls 

687 logger.info(f"Writing {len(urls)} URLs in hdf5:{group_path}/urls") 

688 Locking.unlock_file(self.name) 

689 

690 def _save_urls_in_existing_dataset( 

691 self, pds_collection: PdsRegistryModel, urls: List[str] 

692 ): 

693 """Save URLs in existing dataset, which is represented by pds_collection 

694 

695 Args: 

696 pds_collection (PdsRegistryModel): PDS collections used to define the name of the dataset to load 

697 urls (List[str]): urls to save 

698 """ 

699 Locking.lock_file(self.name) 

700 with h5py.File(self.name, mode="r+") as f: 

701 group_path: str = Hdf5Storage.define_group_from( 

702 [ 

703 pds_collection.ODEMetaDB.lower(), 

704 pds_collection.IHID, 

705 pds_collection.IID, 

706 pds_collection.PT, 

707 pds_collection.DataSetId, 

708 ] 

709 ) 

710 dset_name: str = ( 

711 group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS 

712 ) 

713 dset: h5py.Dataset = cast(h5py.Dataset, f[dset_name]) 

714 

715 # Update the Urls 

716 dset.resize((len(urls),)) 

717 dset[:] = urls 

718 logger.info(f"Writing {len(urls)} URLs in hdf5:{group_path}/urls") 

719 Locking.unlock_file(self.name) 

720 

721 @UtilsMonitoring.io_display(level=logging.DEBUG) 

722 def save_urls(self, pds_collection: PdsRegistryModel, urls: List[str]): 

723 """Save URLs in the "urls" dataset in a given group where the group name is built from pds_collection. 

724 The dataset can be an existing dataset or a new one 

725 

726 Args: 

727 pds_collection (PdsRegistryModel): PDS collection 

728 urls (List[str]): all pregenerated url to download the data 

729 """ 

730 old_urls: List[str] = self.load_urls(pds_collection) 

731 if len(old_urls) == 0: 

732 self._save_urls_in_new_dataset(pds_collection, urls) 

733 elif sorted(urls) == sorted(old_urls): 

734 logger.debug( 

735 f"These urls {urls} are already stored for {pds_collection}, skip to save the URLs dataset" 

736 ) 

737 else: 

738 logger.info( 

739 f""" 

740 Updates the Urls in dataset for {pds_collection}: 

741 old_urls: {sorted(old_urls)} 

742 new_urls: {sorted(urls)} 

743 """ 

744 ) 

745 self._save_urls_in_existing_dataset(pds_collection, urls) 

746 

747 @UtilsMonitoring.io_display(level=logging.DEBUG) 

748 def load_urls(self, pds_collection: PdsRegistryModel) -> List[str]: 

749 """Loads pregenerated URLs for a given PDS collection. 

750 

751 Args: 

752 pds_collection (PdsRegistryModel): PDS collection 

753 

754 Returns: 

755 List[str]: List of URLs 

756 """ 

757 urls: List[str] = list() 

758 Locking.lock_file(self.name) 

759 with h5py.File(self.name, "r") as f: 

760 group_path: str = Hdf5Storage.define_group_from( 

761 [ 

762 pds_collection.ODEMetaDB.lower(), 

763 pds_collection.IHID, 

764 pds_collection.IID, 

765 pds_collection.PT, 

766 pds_collection.DataSetId, 

767 ] 

768 ) 

769 dset = f.get( 

770 group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS, None 

771 ) 

772 if dset is not None: 

773 urls = [item.decode("utf-8") for item in list(dset)] # type: ignore 

774 Locking.unlock_file(self.name) 

775 return urls 

776 

777 @staticmethod 

778 def define_group_from(words: List[str]) -> str: 

779 """Create a valid name for HDF5 node based on words. 

780 

781 Args: 

782 words (List[str]): Words 

783 

784 Returns: 

785 str: Valid name for HDF5 node 

786 """ 

787 return Hdf5Storage.HDF_SEP.join( 

788 [re.sub(r"[^a-zA-Z0-9_]", "_", word) for word in words] 

789 ) 

790 

791 def __repr__(self) -> str: 

792 return f"Hdf5({self.name})" 

793 

794 

795class Database: 

796 """Provides the database implementation using a HDF5 file.""" 

797 

798 PDS_STORAGE_DIR = "files" 

799 STAC_STORAGE_DIR = "stac" 

800 HDF5_STORAGE_NAME = "pds.h5" 

801 

802 def __init__(self, base_directory: str) -> None: 

803 self.__base_directory = base_directory 

804 self.__pds_dir: str = Database.PDS_STORAGE_DIR 

805 self.__stac_dir: str = Database.STAC_STORAGE_DIR 

806 self.__hdf5_name: str = Database.HDF5_STORAGE_NAME 

807 self.init_storage() 

808 

809 logger.debug( 

810 f""" 

811 Path to the database : {base_directory} 

812 Path to HDF5 STorage : {self.hdf5_storage} 

813 Path to PDS Storage : {self.pds_storage} 

814 Path to STAC Storage: {self.stac_storage}""" 

815 ) 

816 

817 @property 

818 def base_directory(self) -> str: 

819 return self.__base_directory 

820 

821 @property 

822 def pds_dir(self) -> str: 

823 return self.__pds_dir 

824 

825 @pds_dir.setter 

826 def pds_dir(self, value: str): 

827 self.__pds_dir = value 

828 

829 @property 

830 def stac_dir(self) -> str: 

831 return self.__stac_dir 

832 

833 @stac_dir.setter 

834 def stac_dir(self, value: str): 

835 self.__stac_dir = value 

836 

837 @property 

838 def hdf5_name(self) -> str: 

839 return self.__hdf5_name 

840 

841 @hdf5_name.setter 

842 def hdf5_name(self, value: str): 

843 self.__hdf5_name = value 

844 

845 @property 

846 def stac_storage(self) -> StacStorage: 

847 return self.__stac_storage 

848 

849 @property 

850 def pds_storage(self) -> PdsStorage: 

851 return self.__pds_storage 

852 

853 @property 

854 def hdf5_storage(self) -> Hdf5Storage: 

855 return self.__hdf5_storage 

856 

857 def init_storage(self): 

858 os.makedirs(self.base_directory, exist_ok=True) 

859 files_directory = os.path.join(self.base_directory, self.pds_dir) 

860 stac_directory = os.path.join(self.base_directory, self.stac_dir) 

861 hdf5_file = os.path.join(self.base_directory, self.hdf5_name) 

862 self.__stac_storage = StacStorage(stac_directory) 

863 self.__pds_storage = PdsStorage(files_directory) 

864 self.__hdf5_storage = Hdf5Storage(hdf5_file) 

865 

866 def reset_storage(self): 

867 self.stac_storage.reset_storage() 

868 self.pds_storage.reset_storage() 

869 self.hdf5_storage.reset_storage() 

870 

871 def __repr__(self) -> str: 

872 return f"Database({self.base_directory}, {self.hdf5_storage}, {self.pds_storage}, {self.stac_storage})"