Coverage for pds_crawler/load/database.py: 90%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""
7Module Name:
8 database
10Description:
11 The database module provides capabilities to store and load the PDS insformation.
12 The stored information comes from the models:
13 - PdsRegistryModel : collection information
14 - List[str] : list of precomputed URL to download all PDS data from ODE webservice
15 - ...
17Classes:
18 Database :
19 Database implementation
20 StacStorage:
21 Storage for STAC
22 PdsStorage:
23 PDS storage
24 PdsCollectionStorage
25 PDS collection storage
28.. uml::
30 class Database {
31 - __base_directory: str
32 - __pds_dir: str
33 - __stac_dir: str
34 - __hdf5_name: str
35 - __stac_storage: StacStorage
36 - __pds_storage: PdsStorage
37 - __hdf5_storage: Hdf5Storage
39 + base_directory: str
40 + pds_dir: str
41 + stac_dir: str
42 + hdf5_name: str
43 + stac_storage: StacStorage
44 + pds_storage: PdsStorage
45 + hdf5_storage: Hdf5Storage
47 + __init__(base_directory: str) -> None
48 + init_storage() -> None
49 + reset_storage() -> None
50 + __repr__() -> str
51 }
53 class PdsCollectionStorage {
54 - __directory: str
55 +__init__(self, directory: str) -> None
56 + directory: str
57 + list_files() : List[str]
58 + get_volume_description() : VolumeModel
59 + list_catalogs() : Dict[str, Any]
60 + get_catalog(file: str, catalogue_type: PdsParserFactory.FileGrammary) : Any
61 + download(urls: List[str], nb_workers: int = 3, timeout: int = 180, time_sleep: int = 1) -> None
62 + __repr__() : str
63 }
65 class PdsStorage{
66 + __init__(base_directory: str) -> None
67 -__directory: str
68 +init_storage_directory()
69 +reset_storage()
70 +get_pds_storage_for(pds_collection: PdsRegistryModel) -> PdsCollectionStorage
71 +directory: str
72 +__repr__() -> str
73 }
75 class Hdf5Storage{
76 - HDF_SEP: str = "/"
77 - DS_URLS: str = "urls"
78 - __name
79 + name
80 + init_storage(name:str)
81 + reset_storage()
82 - _has_changed(store_db:Any, pds_collection:PdsRegistryModel):bool
83 - _has_attribute_in_group(node:Any):bool
84 - _save_collection(pds_collection:PdsRegistryModel, f:Any):bool
85 - _read_and_convert_attributes(node:Any):Dict[str,Any]
86 - _save_urls_in_new_dataset(self, pds_collection: PdsRegistryModel, urls: List[str])
87 - _save_urls_in_existing_dataset(self, pds_collection: PdsRegistryModel, urls: List[str])
88 + save_collection(pds_collection:PdsRegistryModel): bool
89 + save_collections(collections_pds:List[PdsRegistryModel]): bool
90 + load_collections(body:Optional[str]=None, dataset_id:Optional[str]=None):List[PdsRegistryModel]
91 + save_urls(pds_collection: PdsRegistryModel, urls: List[str])
92 + load_urls(pds_collection: PdsRegistryModel) -> List[str]
93 + static define_group_from(words: List[str]) -> str
94 }
96 class StacStorage {
97 -__directory: str
98 -__root_catalog: pystac.Catalog
99 -__layout: LargeDataVolumeStrategy
100 +directory: str
101 +root_catalog: pystac.Catalog
102 +__init__(directory: str)
103 -_load_root_catalog()
104 +init_storage_directory()
105 +reset_storage()
106 +refresh()
107 +item_exists(record: PdsRecordModel) -> bool
108 +catalog_normalize_and_save()
109 +root_normalize_and_save(catalog: pystac.Catalog)
110 +normalize_and_save(cat_or_coll: Union[pystac.Collection, pystac.Catalog])
111 +__repr__() -> str
112 }
114 Database *-- PdsStorage
115 Database *-- Hdf5Storage
116 Database *-- StacStorage
117 PdsStorage *-- PdsCollectionStorage
119Author:
120 Jean-Christophe Malapert
121"""
122import logging
123import os
124import re
125import shutil
126from typing import Any
127from typing import cast
128from typing import Dict
129from typing import List
130from typing import Optional
131from typing import Union
133import h5py
134import numpy as np
135import pystac
137from ..models import Labo
138from ..models import PdsRecordModel
139from ..models import PdsRegistryModel
140from ..models import PdsspModel
141from ..models import VolumeModel
142from ..utils import Locking
143from ..utils import parallel_requests
144from ..utils import UtilsMonitoring
145from .pds_objects_parser import PdsParserFactory
146from .strategy import LargeDataVolumeStrategy
148logger = logging.getLogger(__name__)
151class StacStorage:
152 """STAC storage."""
154 def __init__(self, directory: str):
155 """initializes several private properties, including a pystac.Catalog
156 object representing the root catalog.
158 Args:
159 directory (str): base directory
160 """
161 self.__directory = directory
162 self.__layout = LargeDataVolumeStrategy()
163 logger.debug(
164 f"[StacStorage] Initialize StacDorage with diretory={self.__directory}"
165 )
166 self.init_storage_directory()
167 self._load_root_catalog()
169 @property
170 def directory(self) -> str:
171 """Returns the directory path.
173 Returns:
174 str: the directory path
175 """
176 return self.__directory
178 @property
179 def root_catalog(self) -> Optional[pystac.Catalog]:
180 """Returns the root catalog as a pystac.Catalog object.
182 Returns:
183 Optional[pystac.Catalog]: the root catalog as a pystac.Catalog object
184 """
185 return self.__root_catalog
187 def _load_root_catalog(self):
188 """Load root STAC catalog"""
189 try:
190 self.__root_catalog = pystac.Catalog.from_file(
191 os.path.join(self.directory, "catalog.json")
192 )
193 logger.debug("[StacStorage] root_catalog found")
194 except FileNotFoundError:
195 self.__root_catalog = None
196 logger.debug("[StacStorage] root_catalog not found, create one")
197 self.__root_catalog = pystac.Catalog(
198 id=PdsspModel.create_lab_id(Labo.ID),
199 title="Planetary Data System",
200 description="Georeferenced data extracted from ode.rsl.wustl.edu",
201 )
202 self.__root_catalog.add_link(
203 pystac.Link(
204 rel=pystac.RelType.PREVIEW,
205 target="https://pdsmgmt.gsfc.nasa.gov/images/PDS_Logo.png",
206 title="PDS logo",
207 media_type=pystac.MediaType.PNG,
208 )
209 )
210 self.root_normalize_and_save(self.__root_catalog)
212 def init_storage_directory(self):
213 """Creates the storage directory if it doesn't exist."""
214 os.makedirs(self.directory, exist_ok=True)
216 def reset_storage(self):
217 """Removes the storage directory and all its contents."""
218 shutil.rmtree(self.directory)
220 def refresh(self):
221 """Reloads the root catalog from the catalog.json file."""
222 self._load_root_catalog()
224 @UtilsMonitoring.io_display(level=logging.DEBUG)
225 def item_exists(self, record: PdsRecordModel) -> bool:
226 """Returns True if an item (represented by a PdsRecordModel object) exists in
227 the storage directory, otherwise False.
229 Args:
230 record (PdsRecordModel): item
232 Returns:
233 bool: True if an item (represented by a PdsRecordModel object) exists in
234 the storage directory, otherwise False
235 """
236 directory: str = os.path.join(
237 self.directory,
238 record.get_body_id(),
239 record.get_mission_id(),
240 record.get_plateform_id(),
241 record.get_instrument_id(),
242 record.get_collection_id(),
243 record.get_id(),
244 )
245 logger.debug(f"[StacStorage] item_exists in {directory}")
246 return os.path.exists(directory)
248 @UtilsMonitoring.io_display(level=logging.DEBUG)
249 def catalog_normalize_and_save(self):
250 """Normalizes the root catalog and saves it to disk."""
251 if self.root_catalog is None:
252 logger.debug(
253 "[StacStorage] catalog_normalize_and_save - root_catalog not found"
254 )
255 self.refresh()
256 if self.root_catalog is None:
257 logger.debug(
258 "[StacStorage] catalog_normalize_and_save - root_catalog not found"
259 )
260 raise ValueError("Cannot load STAC root")
261 self.root_catalog.normalize_and_save(
262 self.directory,
263 catalog_type=pystac.CatalogType.SELF_CONTAINED,
264 strategy=self.__layout.get_strategy(),
265 )
267 @UtilsMonitoring.io_display(level=logging.DEBUG)
268 def root_normalize_and_save(self, catalog: pystac.Catalog):
269 """Normalizes the given catalog and saves it to disk using the root
270 directory as the output directory."""
271 catalog.normalize_and_save(
272 self.directory,
273 catalog_type=pystac.CatalogType.SELF_CONTAINED,
274 strategy=self.__layout.get_strategy(),
275 )
277 @UtilsMonitoring.io_display(level=logging.DEBUG)
278 def normalize_and_save(
279 self, cat_or_coll: Union[pystac.Collection, pystac.Catalog]
280 ):
281 """Normalizes the given catalog or collection and saves it to disk."""
282 cat_or_coll.normalize_hrefs(
283 cat_or_coll.self_href,
284 strategy=self.__layout.get_strategy(),
285 )
286 cat_or_coll.save()
288 def __repr__(self) -> str:
289 """Returns a string representation of the StacStorage object.
291 Returns:
292 str: a string representation of the StacStorage object.
293 """
294 return f"StacStorage({self.directory})"
297class PdsCollectionStorage:
298 """Storage for a Planetary Data System (PDS) collection.."""
300 def __init__(self, directory: str):
301 """Initializes the class instance with the provided directory string argument.
303 This directory is created if it does not already exist.
305 Args:
306 directory (str): diretory
307 """
308 self.__directory = directory
309 os.makedirs(directory, exist_ok=True)
311 @property
312 def directory(self) -> str:
313 """Returns the directory path.
315 Returns:
316 str: the directory path
317 """
318 return self.__directory
320 def list_files(self) -> List[str]:
321 """Returns a list of filenames in the directory that are regular files.
323 Returns:
324 List[str]: list of filenames
325 """
326 files = os.listdir(self.directory)
327 return [
328 f for f in files if os.path.isfile(os.path.join(self.directory, f))
329 ]
331 def list_records_files(self) -> List[str]:
332 """Returns a list of filenames in the directory that are regular files coming from PdsRecordsWs.
334 Returns:
335 List[str]: list of filenames
336 """
337 files = os.listdir(self.directory)
338 return [
339 f
340 for f in files
341 if os.path.isfile(os.path.join(self.directory, f))
342 and f.endswith(".json")
343 ]
345 @UtilsMonitoring.io_display(level=logging.DEBUG)
346 def get_volume_description(self, timeout: int = 30) -> VolumeModel:
347 """Returns a VolumeModel object containing the parsed contents of the "voldesc.cat" file in the directory
349 Args:
350 timeout (int, optional): parser timeout in seconds. Defaults to 30
352 Returns:
353 VolumeModel: Voldesc.cat object
354 """
355 voldesc: str = os.path.join(self.directory, "voldesc.cat")
356 return PdsParserFactory.parse(
357 uri=voldesc,
358 type_file=PdsParserFactory.FileGrammary.VOL_DESC,
359 timeout=timeout,
360 )
362 @UtilsMonitoring.io_display(level=logging.DEBUG)
363 def list_catalogs(self) -> Dict[str, Any]:
364 """Returns a dictionary containing the contents of the CATALOG object
365 in the VolumeModel returned by get_volume_description
367 Returns:
368 Dict[str, Any]: a dictionary containing the contents of the CATALOG
369 object in the VolumeModel
370 """
371 volume: VolumeModel = self.get_volume_description()
372 return {
373 key: volume.CATALOG.__dict__[key]
374 for key in volume.CATALOG.__dict__.keys()
375 }
377 @UtilsMonitoring.io_display(level=logging.DEBUG)
378 def get_catalog(
379 self,
380 file: str,
381 catalogue_type: PdsParserFactory.FileGrammary,
382 timeout: int = 30,
383 ) -> Any:
384 """Returns the parsed contents of a PDS catalogue file specified by
385 the file argument, using the catalogue_type argument to specify the
386 type of catalogue parser to use.
388 Args:
389 file (str): PDS catalog
390 catalogue_type (PdsParserFactory.FileGrammary): Information about the catalog
391 timeout (int, optional): parser timeout. Defaults to 30
393 Returns:
394 Any: the parsed contents of a PDS catalogue file
395 """
396 filename: str = os.path.join(self.directory, file.lower())
397 return PdsParserFactory.parse(
398 uri=filename, type_file=catalogue_type, timeout=timeout
399 )
401 @UtilsMonitoring.io_display(level=logging.DEBUG)
402 def download(
403 self,
404 urls: List[str],
405 nb_workers: int = 3,
406 timeout: int = 180,
407 time_sleep: int = 1,
408 progress_bar: bool = True,
409 ):
410 """Download URLs in parallel in the collection storage.
412 Args:
413 urls (List[str]): List of URLs to download
414 nb_workers (int, optional): nb workers. Defaults to 3.
415 timeout (int, optional): timeout in seconds. Defaults to 180
416 time_sleep (int, optional): sleep. Defaults to 1.
417 progress_bar (bool, optional): Set progress_bar. Defaults to True.
418 """
419 parallel_requests(
420 self.directory,
421 urls,
422 nb_workers=nb_workers,
423 timeout=timeout,
424 time_sleep=time_sleep,
425 progress_bar=progress_bar,
426 )
428 def __repr__(self) -> str:
429 """Returns a string representation of the class instance."""
430 return f"PdsCollectionStorage({self.directory})"
433class PdsStorage:
434 """Main PDS storage."""
436 def __init__(self, directory: str):
437 self.__directory: str = directory
438 self.init_storage_directory()
440 def init_storage_directory(self):
441 os.makedirs(self.directory, exist_ok=True)
443 def reset_storage(self):
444 shutil.rmtree(self.directory)
446 @UtilsMonitoring.io_display(level=logging.DEBUG)
447 def get_pds_storage_for(
448 self, pds_collection: PdsRegistryModel
449 ) -> PdsCollectionStorage:
450 relative_dir: str = Hdf5Storage.define_group_from(
451 [
452 pds_collection.ODEMetaDB.lower(),
453 pds_collection.IHID,
454 pds_collection.IID,
455 pds_collection.PT,
456 pds_collection.DataSetId,
457 ]
458 )
459 directory = os.path.join(self.directory, relative_dir)
460 return PdsCollectionStorage(directory)
462 @property
463 def directory(self) -> str:
464 return self.__directory
466 def __repr__(self) -> str:
467 return f"PdsStorage({self.directory})"
470class Hdf5Storage:
471 HDF_SEP: str = "/"
472 DS_URLS: str = "urls"
474 def __init__(self, name: str):
475 self.__name = name
476 self.init_storage(self.__name)
478 @property
479 def name(self):
480 return self.__name
482 def init_storage(self, name: str):
483 Locking.lock_file(name)
484 with h5py.File(name, "a") as f:
485 metadata = f.require_group("metadata")
486 if "author" not in metadata.attrs.keys():
487 metadata.attrs["author"] = "Jean-Christophe Malapert"
488 Locking.unlock_file(name)
490 def reset_storage(self):
491 os.remove(self.name)
493 @UtilsMonitoring.io_display(level=logging.DEBUG)
494 def _has_changed(
495 self, store_db: Any, pds_collection: PdsRegistryModel
496 ) -> bool:
497 """Check if the same version of the PDS collection has already been stored.
499 To check the if the version is an update or not, the comparison is performed based on
500 the number of products and the existing keywords `NumberProducts` in store_db.
502 Args:
503 store_db (Any): node in HDF5
504 pds_collection (PdsRegistryModel): PDS collection
506 Returns:
507 bool: True when the collection is new or must be updated otherwise False
508 """
509 return not (
510 "NumberProducts" in store_db.attrs
511 and pds_collection.NumberProducts
512 == store_db.attrs["NumberProducts"]
513 )
515 def _has_attribute_in_group(self, node: Any) -> bool:
516 """Check if it exists attributes in the node.
518 Args:
519 node (Any): the HDF5 node
521 Returns:
522 bool: True when both the node is a h5py.Group and node has attributes
523 """
524 return isinstance(node, h5py.Group) and node.attrs # type: ignore
526 @UtilsMonitoring.io_display(level=logging.DEBUG)
527 def _save_collection(
528 self, pds_collection: PdsRegistryModel, f: Any
529 ) -> bool:
530 is_saved: bool
531 group_path: str = Hdf5Storage.define_group_from(
532 [
533 pds_collection.ODEMetaDB.lower(),
534 pds_collection.IHID,
535 pds_collection.IID,
536 pds_collection.PT,
537 pds_collection.DataSetId,
538 ]
539 )
540 store_hdf = f.get(group_path, None)
541 if store_hdf is None:
542 logger.debug(
543 "[Hdf5Storage] _save_collection - store_hdf does not exist"
544 )
545 store_hdf = f.create_group(group_path)
546 pds_collection.to_hdf5(store_hdf)
547 is_saved = True
548 elif self._has_changed(store_hdf, pds_collection):
549 logger.info("[Hdf5Storage] _save_collection - Update HDF5")
550 pds_collection.to_hdf5(store_hdf)
551 is_saved = True
552 else:
553 logger.warning(
554 f"{pds_collection} has not changed, skip the record in the database"
555 )
556 is_saved = False
557 return is_saved
559 def _read_and_convert_attributes(self, node: Any) -> Dict[str, Any]:
560 """Read and converts convert attributs.
562 Args:
563 node (Any): HDF5 group or dataset
565 Returns:
566 Dict[str, Any]: attributs as dictionary
567 """
568 group_attributes = dict(node.attrs)
569 for key in group_attributes.keys():
570 if isinstance(group_attributes[key], np.bytes_):
571 group_attributes[key] = eval(
572 group_attributes[key].decode("utf-8")
573 )
574 return group_attributes
576 @UtilsMonitoring.io_display(level=logging.DEBUG)
577 def save_collection(self, pds_collection: PdsRegistryModel) -> bool:
578 """Save the PDS collection in the database.
580 Args:
581 pds_collection (PdsRegistryModel): the PDS collection
583 Returns:
584 bool: True is the collection is saved otherwise False
585 """
586 is_saved: bool
587 Locking.lock_file(self.name)
588 with h5py.File(self.name, "a") as f:
589 is_saved = self._save_collection(pds_collection, f)
590 Locking.unlock_file(self.name)
591 return is_saved
593 @UtilsMonitoring.io_display(level=logging.DEBUG)
594 def save_collections(
595 self, collections_pds: List[PdsRegistryModel]
596 ) -> bool:
597 """Save all the PDS collections in the database.
599 Args:
600 collections_pds (List[PdsRegistryModel]): the collections.
602 Returns:
603 bool: True is the collection is saved otherwise False
604 """
605 is_saved = True
606 Locking.lock_file(self.name)
607 with h5py.File(self.name, "a") as f:
608 for pds_collection in collections_pds:
609 is_saved = is_saved & self._save_collection(pds_collection, f)
610 Locking.unlock_file(self.name)
611 return is_saved
613 @UtilsMonitoring.io_display(level=logging.DEBUG)
614 def load_collections(
615 self, body: Optional[str] = None, dataset_id: Optional[str] = None
616 ) -> List[PdsRegistryModel]:
617 """Load all collections metadata from the database.
619 Args:
620 body (Optional[str]): name of the body to get. Defaults to None
621 dataset_id (Optional[str]): Dataset ID , used to filtr the collection. Defaults to None
623 Returns:
624 List[PdsRegistryModel]: All PDS collections metadata
625 """
626 pds_collections: List[PdsRegistryModel] = list()
628 def extract_attributes(name: str, node: Any):
629 if name != "metadata" and self._has_attribute_in_group(node):
630 dico: Dict[str, Any] = self._read_and_convert_attributes(node)
631 pds_collections.append(PdsRegistryModel.from_dict(dico))
633 Locking.lock_file(self.name)
634 with h5py.File(self.name, "r") as f:
635 f.visititems(extract_attributes)
636 Locking.unlock_file(self.name)
638 # filter pds_collection by body name
639 pds_registry_models = [
640 pds_registry_model
641 for pds_registry_model in pds_collections
642 if body is None
643 or pds_registry_model.get_body().upper() == body.upper()
644 ]
646 # filter pds_collections by dataset ID
647 pds_registry_model = [
648 pds_registry_model
649 for pds_registry_model in pds_registry_models
650 if dataset_id is None
651 or pds_registry_model.DataSetId.upper() == dataset_id.upper()
652 ]
654 return pds_registry_model
656 def _save_urls_in_new_dataset(
657 self, pds_collection: PdsRegistryModel, urls: List[str]
658 ):
659 """Save URLS in a new dataset, which is represented by pds_collection
661 Args:
662 pds_collection (PdsRegistryModel): PDS collection, used to define the name of the dataset
663 urls (List[str]): URLs to save
664 """
665 Locking.lock_file(self.name)
666 with h5py.File(self.name, mode="a") as f:
667 group_path: str = Hdf5Storage.define_group_from(
668 [
669 pds_collection.ODEMetaDB.lower(),
670 pds_collection.IHID,
671 pds_collection.IID,
672 pds_collection.PT,
673 pds_collection.DataSetId,
674 ]
675 )
677 max_shape = (None,)
678 shape = (len(urls),)
679 dset = f.create_dataset(
680 name=group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS,
681 shape=shape,
682 maxshape=max_shape,
683 dtype=h5py.special_dtype(vlen=str),
684 chunks=True,
685 )
686 dset[:] = urls
687 logger.info(f"Writing {len(urls)} URLs in hdf5:{group_path}/urls")
688 Locking.unlock_file(self.name)
690 def _save_urls_in_existing_dataset(
691 self, pds_collection: PdsRegistryModel, urls: List[str]
692 ):
693 """Save URLs in existing dataset, which is represented by pds_collection
695 Args:
696 pds_collection (PdsRegistryModel): PDS collections used to define the name of the dataset to load
697 urls (List[str]): urls to save
698 """
699 Locking.lock_file(self.name)
700 with h5py.File(self.name, mode="r+") as f:
701 group_path: str = Hdf5Storage.define_group_from(
702 [
703 pds_collection.ODEMetaDB.lower(),
704 pds_collection.IHID,
705 pds_collection.IID,
706 pds_collection.PT,
707 pds_collection.DataSetId,
708 ]
709 )
710 dset_name: str = (
711 group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS
712 )
713 dset: h5py.Dataset = cast(h5py.Dataset, f[dset_name])
715 # Update the Urls
716 dset.resize((len(urls),))
717 dset[:] = urls
718 logger.info(f"Writing {len(urls)} URLs in hdf5:{group_path}/urls")
719 Locking.unlock_file(self.name)
721 @UtilsMonitoring.io_display(level=logging.DEBUG)
722 def save_urls(self, pds_collection: PdsRegistryModel, urls: List[str]):
723 """Save URLs in the "urls" dataset in a given group where the group name is built from pds_collection.
724 The dataset can be an existing dataset or a new one
726 Args:
727 pds_collection (PdsRegistryModel): PDS collection
728 urls (List[str]): all pregenerated url to download the data
729 """
730 old_urls: List[str] = self.load_urls(pds_collection)
731 if len(old_urls) == 0:
732 self._save_urls_in_new_dataset(pds_collection, urls)
733 elif sorted(urls) == sorted(old_urls):
734 logger.debug(
735 f"These urls {urls} are already stored for {pds_collection}, skip to save the URLs dataset"
736 )
737 else:
738 logger.info(
739 f"""
740 Updates the Urls in dataset for {pds_collection}:
741 old_urls: {sorted(old_urls)}
742 new_urls: {sorted(urls)}
743 """
744 )
745 self._save_urls_in_existing_dataset(pds_collection, urls)
747 @UtilsMonitoring.io_display(level=logging.DEBUG)
748 def load_urls(self, pds_collection: PdsRegistryModel) -> List[str]:
749 """Loads pregenerated URLs for a given PDS collection.
751 Args:
752 pds_collection (PdsRegistryModel): PDS collection
754 Returns:
755 List[str]: List of URLs
756 """
757 urls: List[str] = list()
758 Locking.lock_file(self.name)
759 with h5py.File(self.name, "r") as f:
760 group_path: str = Hdf5Storage.define_group_from(
761 [
762 pds_collection.ODEMetaDB.lower(),
763 pds_collection.IHID,
764 pds_collection.IID,
765 pds_collection.PT,
766 pds_collection.DataSetId,
767 ]
768 )
769 dset = f.get(
770 group_path + Hdf5Storage.HDF_SEP + Hdf5Storage.DS_URLS, None
771 )
772 if dset is not None:
773 urls = [item.decode("utf-8") for item in list(dset)] # type: ignore
774 Locking.unlock_file(self.name)
775 return urls
777 @staticmethod
778 def define_group_from(words: List[str]) -> str:
779 """Create a valid name for HDF5 node based on words.
781 Args:
782 words (List[str]): Words
784 Returns:
785 str: Valid name for HDF5 node
786 """
787 return Hdf5Storage.HDF_SEP.join(
788 [re.sub(r"[^a-zA-Z0-9_]", "_", word) for word in words]
789 )
791 def __repr__(self) -> str:
792 return f"Hdf5({self.name})"
795class Database:
796 """Provides the database implementation using a HDF5 file."""
798 PDS_STORAGE_DIR = "files"
799 STAC_STORAGE_DIR = "stac"
800 HDF5_STORAGE_NAME = "pds.h5"
802 def __init__(self, base_directory: str) -> None:
803 self.__base_directory = base_directory
804 self.__pds_dir: str = Database.PDS_STORAGE_DIR
805 self.__stac_dir: str = Database.STAC_STORAGE_DIR
806 self.__hdf5_name: str = Database.HDF5_STORAGE_NAME
807 self.init_storage()
809 logger.debug(
810 f"""
811 Path to the database : {base_directory}
812 Path to HDF5 STorage : {self.hdf5_storage}
813 Path to PDS Storage : {self.pds_storage}
814 Path to STAC Storage: {self.stac_storage}"""
815 )
817 @property
818 def base_directory(self) -> str:
819 return self.__base_directory
821 @property
822 def pds_dir(self) -> str:
823 return self.__pds_dir
825 @pds_dir.setter
826 def pds_dir(self, value: str):
827 self.__pds_dir = value
829 @property
830 def stac_dir(self) -> str:
831 return self.__stac_dir
833 @stac_dir.setter
834 def stac_dir(self, value: str):
835 self.__stac_dir = value
837 @property
838 def hdf5_name(self) -> str:
839 return self.__hdf5_name
841 @hdf5_name.setter
842 def hdf5_name(self, value: str):
843 self.__hdf5_name = value
845 @property
846 def stac_storage(self) -> StacStorage:
847 return self.__stac_storage
849 @property
850 def pds_storage(self) -> PdsStorage:
851 return self.__pds_storage
853 @property
854 def hdf5_storage(self) -> Hdf5Storage:
855 return self.__hdf5_storage
857 def init_storage(self):
858 os.makedirs(self.base_directory, exist_ok=True)
859 files_directory = os.path.join(self.base_directory, self.pds_dir)
860 stac_directory = os.path.join(self.base_directory, self.stac_dir)
861 hdf5_file = os.path.join(self.base_directory, self.hdf5_name)
862 self.__stac_storage = StacStorage(stac_directory)
863 self.__pds_storage = PdsStorage(files_directory)
864 self.__hdf5_storage = Hdf5Storage(hdf5_file)
866 def reset_storage(self):
867 self.stac_storage.reset_storage()
868 self.pds_storage.reset_storage()
869 self.hdf5_storage.reset_storage()
871 def __repr__(self) -> str:
872 return f"Database({self.base_directory}, {self.hdf5_storage}, {self.pds_storage}, {self.stac_storage})"