Source code for crawler.transformer

"""Transformer module."""

# import
from pydantic import BaseModel
from typing import Dict, List, Union, Optional

import crawler.schemas as schemas
from .extractor import Extractor
from .datastore import SourceCollectionModel

from pathlib import Path

import shapely.wkt
from datetime import datetime

import pystac

from pymarsseason import PyMarsSeason, Hemisphere
from astropy.time import Time


[docs]def utc_to_iso(utc_time, timespec='auto'): """Convert UTC time string to ISO format string (STAC standard). """ # set valid datatime formats valid_formats = [ '%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', ] for valid_format in valid_formats: try: return datetime.strptime(utc_time, valid_format).isoformat(timespec=timespec) except: continue return None
[docs]def Transformer(collection: SourceCollectionModel = None, source_schema=None, destination_schema='PDSSP_STAC'): """Transformer function serving as Transformer objects factory. """ # set source_schema_str if collection: source_schema_str = collection.source_schema elif source_schema: source_schema_str = source_schema else: raise ValueError( 'At least one of the `service_type` (str) or `service` (Service) keyword argument is required as input.') # check that source_schema is valid and create corresponding Transformer object. if source_schema_str in SOURCE_TRANSFORMERS.keys(): TransformerClass = SOURCE_TRANSFORMERS[source_schema_str] transformer = TransformerClass(collection=collection, source_schema=source_schema, destination_schema=destination_schema) return transformer
[docs]class TransformerSchemaInputError(Exception): """Custom error that is raised when invalid schema name is passed. """ def __int__(self, value: str, message: str) -> None: self.value = value self.message = message super().__init__(message)
[docs]class InvalidModelObjectTypeError(Exception): """Custom error that is raised when invalid STAC object type is passed. """ def __int__(self, object_type: str) -> None: self.object_type = object_type self.message = f"Invalid `{object_type}` type. Allowed object type: 'collection' or 'item'." super().__init__(self.message)
[docs]class AbstractTransformer: def __init__(self, collection=None, source_schema=None, destination_schema='PDSSP_STAC'): # automatically set extractor service type from inheriting Extractor class. self.source_schema = None class_name = self.__class__.__name__ for schema_name in SOURCE_TRANSFORMERS.keys(): if class_name == SOURCE_TRANSFORMERS[schema_name].__name__: self.schema_name = schema_name # if source_schema not in SOURCE_TRANSFORMERS.keys(): # raise TransformerSchemaInputError(value=source_schema, message=f'Allowed schema names: {list(SOURCE_TRANSFORMERS.keys())}') # init transformer properties self.destination_schema = destination_schema self.collection = None self.transformed = False self.stac_dir = '' # set source_schema and collection properties if source_schema and not collection: self.set_source_schema(source_schema) elif collection and not source_schema: self.set_source_collection(collection) else: raise ValueError('Only one of the `source_schema` or `collection` input keyword arguments can be used.') def __repr__(self): return ( f"<{self.__class__.__name__}> " f"source_schema: {self.source_schema} | " f"destination_schema: {self.destination_schema} | " f"transformed: {self.transformed} | " f"stac_dir: {self.stac_dir}" )
[docs] def set_source_schema(self, source_schema): # check that input source metadata matches with the transformer metadata schema. if source_schema == self.source_schema: # check that a schema is defined for the input source schema. schema_names = schemas.get_schema_names() if source_schema in schemas.get_schema_names(): self.source_schema = source_schema else: raise ValueError( f'There is no schema defined for input `{source_schema}`. Defined schemas are: {schema_names}') else: raise ValueError(f'Input `{source_schema}` does not match any Transformer source schema.')
[docs] def set_source_collection(self, collection: SourceCollectionModel): """Set transformed source collection from input SourceCollectionModel object and derive Transfornmer properties from it. """ # set extract collection self.collection = collection # set source metadata self.source_schema = collection.source_schema # set output STAC file path self.stac_dir = collection.stac_dir
[docs] def get_id(self, source_metadata: BaseModel, object_type='item') -> str: pass
[docs] def get_stac_version(self) -> str: return schemas.STAC_VERSION
[docs] def get_assets(self, source_metadata: BaseModel, object_type='item') -> Dict[str, schemas.PDSSP_STAC_Asset]: return {}
[docs] def get_stac_extensions(self, source_metadata: BaseModel) -> list[str]: # collection only pass
[docs] def get_title(self, source_metadata: BaseModel) -> str: pass
[docs] def get_description(self, source_metadata: BaseModel) -> str: pass
[docs] def get_keywords(self, source_metadata: BaseModel) -> list[str]: pass
[docs] def get_geometry(self, source_metadata: BaseModel) -> list[BaseModel]: # GeoJSON Geometry ?? pass
[docs] def get_extent(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_bbox(self, source_metadata: BaseModel) -> list[float]: pass
[docs] def get_providers(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_licence(self, source_metadata: BaseModel) -> str: return 'Default CC-BY-SA-4.0 license ???'
[docs] def get_summaries(self, source_metadata: BaseModel) -> dict: pass
[docs] def get_properties(self, source_metadata: BaseModel) -> BaseModel: # PDSSP_STAC_Properties pass
# EXTENSIONS
[docs] def get_extension_properties(self, source_metadata: BaseModel, stac_extension, object_type='item') -> BaseModel: if stac_extension == 'ssys': return self.get_ssys_properties(source_metadata, object_type=object_type) elif stac_extension == 'proj': return self.get_proj_properties(source_metadata, object_type=object_type) elif stac_extension == 'processing': return self.get_processing_properties(source_metadata, object_type=object_type) else: raise Exception(f'Undefined {stac_extension} STAC extension.')
[docs] def get_extension_fields(self, source_metadata: BaseModel, stac_extension, object_type='item') -> BaseModel: if stac_extension == 'ssys': return self.get_ssys_fields(source_metadata, object_type=object_type) elif stac_extension == 'proj': return self.get_proj_fields(source_metadata, object_type=object_type) elif stac_extension == 'processing': return self.get_processing_fields(source_metadata, object_type=object_type) else: raise Exception(f'Undefined {stac_extension} STAC extension.')
def _default_ext_properties(self, source_metadata: BaseModel, object_type='item') -> BaseModel: if object_type == 'item': return {} elif object_type == 'collection': return {} else: raise InvalidModelObjectTypeError(object_type)
[docs] def get_ssys_properties(self, source_metadata: BaseModel, object_type='item') -> BaseModel: self._default_ext_properties(source_metadata, object_type=object_type)
[docs] def get_ssys_fields(self, source_metadata: BaseModel, object_type='item') -> dict: return {}
[docs] def get_proj_properties(self, source_metadata: BaseModel, object_type='item') -> BaseModel: self._default_ext_properties(source_metadata, object_type=object_type)
[docs] def get_proj_fields(self, source_metadata: BaseModel, object_type='item') -> dict: return {}
[docs] def get_processing_properties(self, source_metadata: BaseModel, object_type='item') -> BaseModel: self._default_ext_properties(source_metadata, object_type=object_type)
[docs] def get_processing_fields(self, source_metadata: BaseModel, object_type='item') -> dict: return {}
[docs] def get_resto_keywords(self, source_metadata: BaseModel) -> dict: """Returns RESTO-specific STAC Items keywords.""" return {}
[docs] def get_stac_collection_dict(self, source_metadata, stac_extensions=[]) -> dict: if not stac_extensions: stac_extensions = self.get_stac_extensions(source_metadata) # from collection metadata, retrieved from collection service `extra_params` JSON attribute. stac_collection_dict = { 'type': 'Collection', # REQUIRED 'stac_version': self.get_stac_version(), # REQUIRED 'stac_extensions': stac_extensions, 'id': self.get_id(source_metadata, object_type='collection'), # REQUIRED 'title': self.get_title(source_metadata), 'description': self.get_description(source_metadata), # REQUIRED 'keywords': self.get_keywords(source_metadata), 'licence': self.get_licence(source_metadata), # REQUIRED 'providers': self.get_providers(source_metadata), 'extent': self.get_extent(source_metadata), # REQUIRED 'summaries': self.get_summaries(source_metadata), # STRONGLY RECOMMENDED 'links': self.get_links(source_metadata, object_type='collection'), # REQUIRED 'assets': self.get_assets(source_metadata, object_type='collection'), 'extra_fields': {} } for stac_extension in stac_extensions: stac_collection_dict['extra_fields'].update(self.get_extension_fields(source_metadata, stac_extension, object_type='collection')) # For example: # { 'ssys_targets': ['Mars'] } return stac_collection_dict
[docs] def get_stac_item_dict(self, source_metadata, stac_extensions=[]) -> dict: stac_item_dict = { 'type': 'Feature', # REQUIRED 'stac_version': self.get_stac_version(), # REQUIRED 'stac_extensions': stac_extensions, 'id': self.get_id(source_metadata, object_type='item'), # REQUIRED 'geometry': self.get_geometry(source_metadata), # REQUIRED 'bbox': self.get_bbox(source_metadata), # REQUIRED 'properties': self.get_properties(source_metadata, stac_extensions=stac_extensions), # REQUIRED 'links': self.get_links(source_metadata, object_type='item'), # REQUIRED 'assets': self.get_assets(source_metadata, object_type='item'), # REQUIRED 'collection': '', 'extra_fields': {} } # add STAC extensions extra fields for stac_extension in stac_extensions: stac_item_dict['extra_fields'].update(self.get_extension_fields(source_metadata, stac_extension, object_type='item')) return stac_item_dict
[docs] def transform_source_metadata(self, source_metadata: BaseModel, object_type='item', stac_extensions=[]) -> Union[schemas.PDSSP_STAC_Item, schemas.PDSSP_STAC_Collection]: """Transform input source metadata into output PDSSP STAC metadata schema object. """ # TODO: `object_type` should be derived from the `source_metadata` object class. if object_type == 'item': stac_dict = self.get_stac_item_dict(source_metadata, stac_extensions=stac_extensions) elif object_type == 'collection': stac_dict = self.get_stac_collection_dict(source_metadata, stac_extensions=stac_extensions) else: raise InvalidModelObjectTypeError(object_type) # Attempt to create destination STAC metadata object # print(stac_dict) stac_metadata = schemas.create_schema_object(stac_dict, self.destination_schema, object_type) # print(stac_metadata) # print() return stac_metadata
[docs] def transform(self, source_collection_file_path='', output_dir_path='', stac_extensions=[], overwrite=False) -> None: """Transform (extracted) source collection files into PDSSP STAC catalog. Destination STAC catalog may contain one or several collections, related to only one reference target. """ # TODO: Some methods currently require a source collection model object. # - properly implement this, # - implement `source_collection_file_path` and `output_dir_path`. # set destination STAC catalog path, based on source collection related target. if not output_dir_path: output_dir_path = self.stac_dir stac_catalog_dirpath = Path(output_dir_path, self.collection.target.lower()) # set destination STAC catalog file stac_catalog_filepath = Path(stac_catalog_dirpath, 'catalog.json') # if destination STAC catalog file exists, create catalog STAC object from file. if Path.is_file(stac_catalog_filepath): stac_catalog = pystac.Catalog.from_file(str(stac_catalog_filepath)) else: # else create STAC catalog from scratch. stac_catalog = pystac.Catalog( id=f'pdssp-{self.collection.target.lower()}-catalog', title=f'{self.collection.target.title()} STAC Catalog holding one or several PDSSP-compliant data products collections.', description=f'This catalog was generated by the PDSSSP Crawler on {datetime.utcnow()}.', stac_extensions=['ssys'], extra_fields={'ssys:targets': [self.collection.target.lower()]} ) # check that input source collection haven't been transformed and exists in destination STAC catalog. stac_catalog_collections = stac_catalog.get_all_collections() for stac_catalog_collection in stac_catalog_collections: if stac_catalog_collection.id == self.collection.collection_id: if overwrite: # remove destination STAC catalog collection print(f'removing child: {self.collection.collection_id}') stac_catalog.remove_child(self.collection.collection_id) else: raise Exception(f'{stac_catalog.id} destination STAC catalog already hold a collection with the {self.collection.collection_id} collection ID.') # set extractor extractor = Extractor(self.collection) # read and transform source collection metadata, into destination `PDSSP_STAC_Collection` metadata. source_collection_metadata = extractor.read_collection_metadata() if source_collection_metadata: stac_collection_metadata = self.transform_source_metadata(source_collection_metadata, object_type='collection', stac_extensions=stac_extensions) else: print(f'WARNING: Could not transform `{self.collection.collection_id}` source collection metadata.') return # create destination PySTAC Collection object # # set STAC collection ID stac_collection_id = stac_collection_metadata.id # set empty collection spatial and temporal extent # stac_extent = pystac.Extent( # pystac.SpatialExtent(bboxes=stac_collection_metadata.extent.spatial.bbox), # pystac.TemporalExtent(intervals=stac_collection_metadata.extent.temporal.interval) # ) stac_extent = pystac.Extent(pystac.SpatialExtent(bboxes=[[]]), pystac.TemporalExtent(intervals=[[]])) # set STAC collection and items extensions if not stac_extensions: stac_extensions = stac_collection_metadata.stac_extensions stac_collection = pystac.Collection( id=stac_collection_id, stac_extensions=stac_extensions, title=stac_collection_metadata.title, description=stac_collection_metadata.description, extent=stac_extent, license=stac_collection_metadata.licence, extra_fields=stac_collection_metadata.extra_fields # {'ssys:targets': 'MARS'} ) # read and transform source collection products metadata, into destination `PDSSP_STAC_Item` metadata, then # create and add the corresponding PySTAC Item object to the PySTAC Collection. # while extractor.file_idx < extractor.n_extracted_files: # TODO: improve mechanism to loop over all products. source_product_metadata = extractor.read_product_metadata() if source_product_metadata: stac_item_metadata = self.transform_source_metadata(source_product_metadata, object_type='item', stac_extensions=stac_extensions) else: print(f'WARNING: Could not transform product metadata in `{self.collection.collection_id}` source collection.') continue # create PySTAC Item stac_item = pystac.Item( id=stac_item_metadata.id, stac_extensions=stac_extensions, geometry=stac_item_metadata.geometry, bbox=stac_item_metadata.bbox, datetime=datetime.fromisoformat(stac_item_metadata.properties['datetime']), properties=stac_item_metadata.properties, extra_fields=stac_item_metadata.extra_fields, # eg: {'ssys:targets': stac_item_metadata.ssys_targets}, collection=stac_collection_id ) # add assets to pySTAC item for key in stac_item_metadata.assets: asset_metadata = stac_item_metadata.assets[key] stac_item.add_asset( key=key, asset=pystac.Asset( href=asset_metadata.href, title=asset_metadata.title, description=asset_metadata.description, media_type=asset_metadata.type, roles=asset_metadata.roles ) ) # add item to PySTAC Collection stac_collection.add_item(stac_item) # Return if no STAC items in collection n_items = 0 for link in stac_collection.links: if link.rel == 'item': n_items += 1 if n_items == 0: print(f'WARNING: No valid STAC Items in {stac_collection_id} collection.') return # update collection extent from items stac_collection.update_extent_from_items() # add collection to the output STAC catalog stac_catalog.add_child(stac_collection) # save STAC catalog files print(f'Writing STAC JSON files in {stac_catalog_dirpath} directory...') Path.mkdir(stac_catalog_dirpath, parents=True, exist_ok=True) # exist_ok=overwrite ? stac_catalog.normalize_hrefs(str(stac_catalog_dirpath)) stac_catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED) # set transformer status attributes for link in stac_collection.links: if link.rel == 'self': stac_dir = Path(link.target).parent self.transformed = True self.stac_dir = stac_dir
def _geometry_from_wkt(self, wkt): pass
[docs]class PDSODE_STAC(AbstractTransformer): def __init__(self, collection=None, source_schema=None, destination_schema='PDSSP_STAC'): super().__init__(collection=collection, source_schema=source_schema, destination_schema=destination_schema)
[docs] def get_id(self, source_metadata: BaseModel, object_type='item') -> str: if object_type == 'item': return source_metadata.pdsid elif object_type == 'collection': return f'{source_metadata.iiptset.IHID}_{source_metadata.iiptset.IID}_{source_metadata.iiptset.PT}' else: raise InvalidModelObjectTypeError(object_type)
# # def get_links(self, source_metadata: BaseModel, object_type='item') -> list[BaseModel]: # pass
[docs] def get_assets(self, source_metadata: BaseModel, object_type='item') -> Dict[str, schemas.PDSSP_STAC_Asset]: if object_type == 'item': assets = {} for product_file in source_metadata.Product_files.Product_file: # set PDSSP_STAC_Asset object input arguments href = product_file.URL title = product_file.FileName description = product_file.Description media_type = '' roles = [] ext = product_file.FileName.split('.')[-1].upper() if ext == 'LBL': media_type = 'text/plain' roles = ['metadata'] elif ext == 'JP2': media_type = 'image/jp2' roles = ['data'] elif ext == 'PNG': media_type = 'image/png' elif ext == 'JPG' or ext == 'JPEG': media_type = 'image/jpeg' if product_file.Type == 'Product': roles = ['data'] elif product_file.Type == 'Browse': roles = ['thumbnail'] else: roles = [product_file.Type] # create PDSSP_STAC_Asset object asset = schemas.PDSSP_STAC_Asset( href=href, title=title, description=description, type=media_type, roles=roles ) # add PDSSP_STAC_Asset object to assets dictionary key = title assets.update({key: asset}) return assets elif object_type == 'collection': return [] else: raise InvalidModelObjectTypeError(object_type)
[docs] def get_stac_extensions(self, source_metadata: BaseModel) -> list[str]: # for PDSODE collection metadata only return source_metadata.stac_extensions
[docs] def get_title(self, source_metadata: BaseModel) -> str: title = f'{source_metadata.iiptset.IHID}/{source_metadata.iiptset.IID} {source_metadata.iiptset.PTName}' return title
[docs] def get_description(self, source_metadata: BaseModel) -> str: description = f'Collection of {source_metadata.iiptset.IHID}/{source_metadata.iiptset.IID}/{source_metadata.iiptset.PT} ({source_metadata.iiptset.PTName}) PDS data products.' return description
[docs] def get_keywords(self, source_metadata: BaseModel) -> list[str]: pass
[docs] def get_geometry(self, source_metadata: BaseModel) -> list[BaseModel]: # GeoJSON Geometry ?? footprint_wkt = source_metadata.Footprint_C0_geometry if not footprint_wkt: return None footprint_shape = shapely.wkt.loads(footprint_wkt) footprint_bbox = list(footprint_shape.bounds) footprint_geometry = shapely.geometry.mapping(footprint_shape) return footprint_geometry
[docs] def get_extent(self, source_metadata: BaseModel) -> schemas.PDSSP_STAC_Extent: collection_extent = schemas.PDSSP_STAC_Extent( spatial=schemas.PDSSP_STAC_SpatialExtent(bbox=[[]]), temporal=schemas.PDSSP_STAC_TemporalExtent(interval=[[]]) ) return collection_extent
[docs] def get_bbox(self, source_metadata: BaseModel) -> list[float]: footprint_wkt = source_metadata.Footprint_C0_geometry if not footprint_wkt: return None footprint_shape = shapely.wkt.loads(footprint_wkt) footprint_bbox = list(footprint_shape.bounds) return footprint_bbox
[docs] def get_providers(self, source_metadata: BaseModel) -> list[BaseModel]: name = f'{source_metadata.iiptset.IID} team via PDS ODE' providers = [schemas.PDSSP_STAC_Provider(name=name)] return providers
[docs] def get_licence(self, source_metadata: BaseModel) -> str: return 'Default CC-BY-SA-4.0 license for PDS ODE collections [TO BE DEFINED]'
[docs] def get_summaries(self, source_metadata: BaseModel) -> dict: pass
[docs] def get_properties(self, source_metadata: BaseModel, stac_extensions=['ssys']) -> dict: properties = schemas.PDSSP_STAC_Properties( datetime=utc_to_iso(source_metadata.UTC_start_time, timespec='milliseconds'), created=utc_to_iso(source_metadata.Product_creation_time, timespec='milliseconds'), start_datetime=utc_to_iso(source_metadata.UTC_start_time, timespec='milliseconds'), end_datetime=utc_to_iso(source_metadata.UTC_stop_time, timespec='milliseconds'), platform=source_metadata.ihid, instruments=[source_metadata.iid], gsd=source_metadata.Map_scale ) properties_dict = properties.dict(exclude_unset=True) for stac_extension in stac_extensions: properties_dict.update(self.get_extension_properties(source_metadata, stac_extension, object_type='item')) # add RESTO-specific "_keyword" properties (mars season) keywords_dict = self.get_resto_keywords(source_metadata) if len(keywords_dict) > 0: properties_dict.update(keywords_dict) # Add source metadata; fields for which value type is either a float, int or str # (field names are prefixed using lower-case schema name, to avoid possible conflicts with STAC Common Metadata). source_metadata_dict = source_metadata.dict(exclude_unset=True, exclude_none=True) filtered_metadata_dict = {} for key in source_metadata_dict.keys(): if isinstance(source_metadata_dict[key], (float, int, str)): filtered_metadata_dict['pdsode:'+key] = source_metadata_dict[key] properties_dict.update(filtered_metadata_dict) return properties_dict
[docs] def get_ssys_properties(self, source_metadata: BaseModel, object_type='item') -> dict: if object_type == 'item': # If target is Mars, then compute solar longitude if not available # Mars solar longitude and season solar_longitude = None if 'Solar_longitude' in source_metadata.__fields__.keys(): if source_metadata.Solar_longitude: # set to source solar longitude, if available solar_longitude = float(source_metadata.Solar_longitude) if not solar_longitude: # derive solar longitude from UTC start time utc_time = utc_to_iso(source_metadata.UTC_start_time, timespec='milliseconds') season = PyMarsSeason().compute_season_from_time(Time(utc_time, format='isot', scale='utc')) solar_longitude = season['ls'] ssys_properties = schemas.PDSSP_STAC_SSYS_Properties( **{ 'ssys:targets': [source_metadata.Target_name.lower()], 'ssys:solar_longitude': solar_longitude, 'ssys:solar_distance': source_metadata.Solar_distance, 'ssys:incidence_angle': source_metadata.Incidence_angle, 'ssys:emission_angle': source_metadata.Emission_angle, 'ssys:phase_angle': source_metadata.Phase_angle, # 'ssys:season': season_str.title() # 'ssys:product_type': source_metadata.pt, # 'ssys:spatial_resolution': source_metadata.Map_scale, } ) elif object_type == 'collection': ssys_properties = schemas.PDSSP_STAC_SSYS_Properties(**{'ssys:targets':[source_metadata.iiptset.ODEMetaDB.lower()]}) else: raise InvalidModelObjectTypeError(object_type) return ssys_properties.dict(by_alias=True, exclude_unset=True, exclude_none=True)
[docs] def get_ssys_fields(self, source_metadata: BaseModel, object_type='item') -> dict: if object_type == 'item': ssys_fields = {} elif object_type == 'collection': ssys_fields = { 'ssys:targets': [source_metadata.iiptset.ODEMetaDB.lower()] } else: raise InvalidModelObjectTypeError(object_type) return ssys_fields
[docs] def get_processing_properties(self, source_metadata: BaseModel, object_type='item') -> BaseModel: if object_type == 'item': # derive processing level from PDSODE Data_Set_Id field processing_level = '' dataset_id = source_metadata.Data_Set_Id for token in dataset_id.split('-'): try: void = int(token) processing_level = token break except: # handle mixed processing level description, eg: 2/3 subtokens = token.split('/') if len(subtokens) > 1: try: void = int(subtokens[0]) processing_level = token break except: pass processing_properties = schemas.PDSSP_STAC_Processing_Properties( **{ # 'processing:expression': None, # 'processing:lineage': '', 'processing:level': processing_level, # 'processing:facility': '', # 'processing:software': {} } ) elif object_type == 'collection': processing_properties = schemas.PDSSP_STAC_Processing_Properties() else: raise InvalidModelObjectTypeError(object_type) return processing_properties.dict(by_alias=True, exclude_unset=True, exclude_none=True)
[docs] def get_processing_fields(self, source_metadata: BaseModel, object_type='item') -> dict: return {}
[docs] def get_resto_keywords(self, source_metadata: BaseModel) -> dict: """Returns RESTO-specific STAC item _keywords property. For example:: "_keywords": [ { "id": "season:summer", "title": "Summer", "type": "season" } ] """ # Add season keyword if target is 'mars'. # target = source_metadata.Target_name.lower() if target != 'mars': print('WARNING: Not computing Mars season.') return {} # init season keyword dict season_keyword = {'id': '', 'title': '', 'type': 'season'} # compute season utc_time = utc_to_iso(source_metadata.UTC_start_time, timespec='milliseconds') season = PyMarsSeason().compute_season_from_time(Time(utc_time, format='isot', scale='utc')) season_str = season[Hemisphere.NORTH].value # spring, summer, autumn, winter season_keyword['id'] = f'season:{season_str}' season_keyword['title'] = season_str.title() # or 'Northern Hemisphere ' + # add season keyword to "_keywords" dict keywords = [season_keyword] keywords_dict = { "_keywords": keywords} return keywords_dict
[docs]class EPNTAP_STAC(AbstractTransformer): def __init__(self, collection=None, source_schema=None, destination_schema='PDSSP_STAC'): super().__init__(collection=collection, source_schema=source_schema, destination_schema=destination_schema)
[docs] def get_id(self, source_metadata: BaseModel, object_type='item') -> str: if object_type == 'item': return source_metadata.granule_uid elif object_type == 'collection': return source_metadata.collection_id # not defined yet in schema else: raise InvalidModelObjectTypeError(object_type)
# def get_stac_extensions(self, source_metadata: BaseModel, object_type='item') -> list[str]: # pass
[docs] def get_assets(self, source_metadata: BaseModel, object_type='item') -> list[BaseModel]: pass
[docs] def get_title(self, source_metadata: BaseModel) -> str: pass
[docs] def get_description(self, source_metadata: BaseModel) -> str: pass
[docs] def get_keywords(self, source_metadata: BaseModel) -> list[str]: pass
[docs] def get_geometry(self, source_metadata: BaseModel) -> list[BaseModel]: # GeoJSON Geometry ?? return self._geometry_from_wkt(source_metadata.s_region)
[docs] def get_extent(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_bbox(self, source_metadata: BaseModel) -> list[float]: return [source_metadata.c1min, source_metadata.c2min, source_metadata.c1max, source_metadata.c2max]
[docs] def get_summaries(self, source_metadata: BaseModel) -> dict: pass
[docs] def get_properties(self, source_metadata: BaseModel) -> BaseModel: # PDSSP_STAC_Properties pass
[docs] def get_providers(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_licence(self, source_metadata: BaseModel) -> str: pass
[docs] def get_collection(self, source_metadata: BaseModel) -> str: pass
[docs]class MARSSI_STAC(AbstractTransformer): def __init__(self, collection=None, source_schema=None, destination_schema='PDSSP_STAC'): super().__init__(collection=collection, source_schema=source_schema, destination_schema=destination_schema)
[docs] def get_id(self, source_metadata: BaseModel, object_type='item') -> str: pass
[docs] def get_stac_extensions(self, source_metadata: BaseModel, object_type='item') -> list[str]: pass
[docs] def get_assets(self, source_metadata: BaseModel, object_type='item') -> list[BaseModel]: pass
[docs] def get_title(self, source_metadata: BaseModel) -> str: pass
[docs] def get_description(self, source_metadata: BaseModel) -> str: pass
[docs] def get_keywords(self, source_metadata: BaseModel) -> list[str]: pass
[docs] def get_geometry(self, source_metadata: BaseModel) -> list[BaseModel]: # GeoJSON Geometry ?? pass
[docs] def get_extent(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_bbox(self, source_metadata: BaseModel) -> list[float]: pass
[docs] def get_summaries(self, source_metadata: BaseModel) -> dict: pass
[docs] def get_properties(self, source_metadata: BaseModel) -> BaseModel: # PDSSP_STAC_Properties pass
[docs] def get_providers(self, source_metadata: BaseModel) -> list[BaseModel]: pass
[docs] def get_licence(self, source_metadata: BaseModel) -> str: pass
[docs] def get_collection(self, source_metadata: BaseModel) -> str: pass
SOURCE_TRANSFORMERS = { 'PDSODE': PDSODE_STAC, 'EPNTAP': EPNTAP_STAC, 'MARSSI_WFS': MARSSI_STAC }