Source code for crawler.datastore

"""PDSSP Crawler data store module."""

from pydantic import BaseModel
from typing import List, Union, Optional

from .registry import Service, ExternalService

from pathlib import Path
from datetime import datetime
import json

import copy

COLLECTIONS_JSON_TYPE = 'SourceCollections'
"""JSON Source Collections file type"""

[docs]class SourceCollectionModel(BaseModel): collection_id: str service: Optional[Union[Service, ExternalService]] source_schema: Optional[str] target: Optional[str] stac_extensions: Optional[list[str]] n_products: Optional[int] extracted: Optional[bool] = False extracted_files: Optional[list] = [] # should be changed/renamed to `source_dir` transformed: Optional[bool] = False stac_dir: Optional[str] = '' ingested: Optional[bool] = False stac_url: Optional[str] = ''
[docs]class DataStore: """DataStore class. """ def __init__(self, source_data_dir='', stac_data_dir='', collections=None): # TODO: check that source and STAC data directories exist. self.source_data_dir = source_data_dir self.stac_data_dir = stac_data_dir self.collections_index_file = Path(source_data_dir, 'collections_index.json') # load data store collections if collections index file exists if self.collections_index_file.is_file(): print('Loading data store source collections...') collections = self.load_collections(self.collections_index_file) print(f'{len(collections)} source collections loaded in the data store.') self.source_collections = collections else: print('Source collections index file not found.') if collections: print('Creating collections index file from input collections...') else: print('Creating empty collections index file...') self.reset_source_collections(collections=collections)
[docs] def reset_source_collections(self, collections=None): """Reset source collections index to an empty collections or an input collections list. """ self.source_collections = [] if collections: self.source_collections = collections self.save_source_collections(overwrite=True)
[docs] def list_source_collections(self, collection_id='', service_type=None, target=None, extracted=None, transformed=None, ingested=None): """Display all, or a filtered list of source collections indexed in the data store. """ print('ds list ingested', ingested) collections = self.get_source_collections(collection_id=collection_id, service_type=service_type, target=target, extracted=extracted, transformed=transformed, ingested=ingested) if len(collections) == 0: print('No collections matching input filters.') return print(f'{len(collections)} collections matching input filters:') print() print(f'{"ID":<30} {"service type":<12} {"source schema":<13} {"nb of products":<15} {"extracted":<10} {"transformed":<12} {"ingested":<9} {"target":<12}') # {"metadata schema":<18} {"stac extensions":<20}') print(f'{"-" * 30} {"-" * 12} {"-" * 13} {"-" * 15} {"-" * 10} {"-" * 12} {"-" * 9} {"-" * 9}') for collection in collections: extracted_str = 'Y' if collection.extracted else 'N' transformed_str = 'Y' if collection.transformed else 'N' ingested_str = 'Y' if collection.ingested else 'N' source_schema_str = collection.source_schema if collection.source_schema else 'UNDEFINED' print(f'{collection.collection_id:<30} {collection.service.type.name:<12} {source_schema_str:<13} ' f'{collection.n_products:<15} {extracted_str:<10} {transformed_str:<12} {ingested_str:<9} {collection.target.lower():<12}') print()
[docs] def get_source_collection(self, collection_id: str) -> SourceCollectionModel: """Returns the source collection corresponding to input identifier. """ collection = None for source_collection in self.source_collections: if source_collection.collection_id == collection_id: return source_collection return collection
[docs] def get_source_collections(self, collection_id='', service_type=None, target=None, extracted=None, transformed=None, ingested=None) -> [SourceCollectionModel]: """Returns source collections matching input filters. """ # set initial list of filtered source collections filtered_collections = copy.copy(self.source_collections) if collection_id: source_collections = [] for collection in filtered_collections: if collection_id.lower() in collection.collection_id.lower(): source_collections.append(collection) filtered_collections = source_collections if service_type: source_collections = [] for collection in filtered_collections: if service_type == collection.service.type.name: source_collections.append(collection) filtered_collections = source_collections if target: source_collections = [] for collection in filtered_collections: if target.lower() in collection.target.lower(): source_collections.append(collection) # add if extracted filtered_collections = source_collections if extracted is not None: if extracted: source_collections = [] for collection in filtered_collections: if collection.extracted: source_collections.append(collection) # add if not extracted filtered_collections = source_collections else: source_collections = [] for collection in filtered_collections: if not collection.extracted: source_collections.append(collection) filtered_collections = source_collections if transformed is not None: if transformed: source_collections = [] for collection in filtered_collections: if collection.transformed: source_collections.append(collection) # add if transformed filtered_collections = source_collections else: source_collections = [] for collection in filtered_collections: if not collection.transformed: source_collections.append(collection) # add if not transformed filtered_collections = source_collections if ingested is not None: if ingested: source_collections = [] for collection in filtered_collections: if collection.ingested: source_collections.append(collection) # add if ingested filtered_collections = source_collections else: source_collections = [] for collection in filtered_collections: if not collection.ingested: source_collections.append(collection) # add if not ingested filtered_collections = source_collections return filtered_collections
# TODO: add_source_collections
[docs] def add_source_collections(self, collections: [SourceCollectionModel]): """Add collections to the source collections index table""" pass
# TODO: update_source_collections
[docs] def update_source_collections(self, collections: [SourceCollectionModel]): """Update collections in the source collections index table""" pass
# TODO: delete_source_collections
[docs] def delete_source_collections(self, collections: [SourceCollectionModel]): """Delete collections to the source collections index table""" pass
[docs] def save_source_collections(self, overwrite=False): """Save loaded source collections into the JSON collections index file. """ if not Path.is_file(self.collections_index_file) or overwrite: self.save_collections(self.source_collections, filepath=self.collections_index_file)
[docs] def save_collections(self, collections, basename='', filepath=None): if filepath: collections_filepath = filepath else: if not basename: basename = 'collections' # set output JSON Source Collections file path collections_filename = f'{basename}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' # datetime tag collections_filepath = Path(self.source_data_dir, collections_filename) json_dict = { 'type': COLLECTIONS_JSON_TYPE, 'collections': [] } for collection in collections: json_dict['collections'].append(json.loads(collection.json(by_alias=True))) with open(collections_filepath, 'w') as f: f.write(json.dumps(json_dict))
# print(f'{len(collections)} collections saved in {collections_filepath} file.') TODO: To be logged instead.
[docs] def load_collections(self, filepath): with open(filepath, 'r') as f: data = json.load(f) if 'type' in data.keys(): if data['type'] != COLLECTIONS_JSON_TYPE: raise Exception(f'Error loading input {filepath} file: not a JSON `{COLLECTIONS_JSON_TYPE}` file ("type"="{data["type"]}".') else: raise Exception(f'Error loading input {filepath} file: missing JSON "type" attribute.') if 'collections' not in data.keys(): raise Exception(f'Error loading input {filepath} file: missing JSON "collections" attribute.') # get collections collections_dicts = data['collections'] collections = [] for collection_dict in collections_dicts: try: # attempt to load collection dict to a SourceCollectionModel object collection = SourceCollectionModel(**collection_dict) except Exception as e: print(e) collection = None # add to list if valid if collection: collections.append(collection) else: print(f'[WARNING] The following collection dictionary could not be loaded in a SourceCollectionModel object: {collection_dict}') print() return collections