import pystac
import requests
import os
from pathlib import Path
COLLECTION_DEFAULT_MODEL = 'DefaultModel'
INGEST_STRATEGIES = {
'catalog': { 'ingest_catalog': True, 'ingest_feature': False},
'feature': { 'ingest_catalog': True, 'ingest_feature': False},
'both': { 'ingest_catalog': True, 'ingest_feature': True},
'none': { 'ingest_catalog': False, 'ingest_feature': False}
}
"""Ingest strategies define what is ingested i.e. "collection", "feature", "both" or "none".
"""
[docs]class Ingestor:
"""Ingestion of STAC catalog or collection into destination STAC API service (eg: PDSSP RESTO).
"""
def __init__(self, stac_api_parent_url='', auth_token='', source_collection=None):
self.stac_api_parent_url = stac_api_parent_url
self.stac_api_url = ''
self.ingested = False
self.stac_url = ''
self.do_not_split_geom = True
self.source_collection = None
self.processed_features = [] # stac2resto `lookup_table`
# set source_schema and collection properties
if stac_api_parent_url and not source_collection:
self.stac_api_parent_url = stac_api_parent_url
elif source_collection and not stac_api_parent_url:
self.set_source_collection(source_collection)
else:
raise ValueError('Only one of the `stac_api_parent_url` or `source_collection` input keyword arguments can be used.')
# set header: resto admin auth token is required for POST
if not auth_token:
auth_token = os.environ.get('RESTO_ADMIN_AUTH_TOKEN')
if not auth_token:
raise Exception(f'RESTO_ADMIN_AUTH_TOKEN enviroment variable, or `auth_token` input argument, is required for ingestion.')
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'pdssp-crawler',
'Authorization': 'Bearer ' + auth_token
}
[docs] def get_target_stac_api_url(self, target):
"""Return STAC API endpoint for a given target name.
"""
target_stac_api_url = ''
# set endpoints url paths
target_stac_api_paths = {
'mars': '/catalogs/mars',
'moon': '/catalogs/moon',
'titan': '/catalogs/titan'
}
if target in target_stac_api_paths.keys():
target_stac_api_url = self.stac_api_parent_url + target_stac_api_paths[target]
return target_stac_api_url
[docs] def set_stac_api_url(self, stac_object_dict):
target = stac_object_dict['ssys:targets'][0]
self.stac_api_url = self.get_target_stac_api_url(target)
[docs] def set_source_collection(self, source_collection):
self.source_collection = source_collection
self.ingested = source_collection.ingested
self.stac_url = source_collection.stac_url
[docs] def update_source_collection(self):
if self.source_collection:
self.source_collection.ingested = self.ingested
self.source_collection.stac_url = self.stac_url
[docs] def post_catalog(self, catalog_dict):
catalog_id = catalog_dict['id']
# post request
url = f'{self.stac_api_url}/catalogs'
parent_id = None
parent_path = '' if not parent_id else f'/{parent_id}'
# print(f'Creating {catalog_id} catalog to {url}{parent_path}...')
ssl_verify = True
response = requests.post(url, json=catalog_dict, params={"pid": parent_id}, headers=self.headers, verify=ssl_verify)
# handling response
if response.status_code == 200:
print(f'{catalog_id} catalog ingested successfully.')
# set ingestor status/attributes
self.ingested = True
self.stac_url = f'{url}{parent_path}/{catalog_id}' # ?
self.update_source_collection()
# TODO: all collections belonging to the ingested catalog should be updated !
return response.json()
# HTTP !== 200 => error
if response.status_code != 200:
raise Exception('Catalog ingestion failed: ' + str(response.json()))
[docs] def update_catalog(self):
return None
[docs] def delete_catalog(self, catalog_id):
return None
[docs] def post_collection(self, collection_dict, update_if_exists=False):
# copy input STAC collection dict
tmp_collection_dict = collection_dict.copy()
# get collection Id
collection_id = tmp_collection_dict['id']
# add collection default model to apply to new collection if not present in collection.json file
if 'model' not in tmp_collection_dict.keys():
tmp_collection_dict['model'] = COLLECTION_DEFAULT_MODEL
# discard summaries and links
if 'summaries' in tmp_collection_dict.keys():
tmp_collection_dict.pop('summaries')
if 'links' in tmp_collection_dict.keys():
tmp_collection_dict.pop('links')
# POST request
url = f'{self.stac_api_url}/collections'
ssl_verify = True
# print(f'Creating {collection_id} collection using `{COLLECTION_DEFAULT_MODEL}` model to {self.stac_api_url} ...')
response = requests.post(url, json=tmp_collection_dict, headers=self.headers, verify=ssl_verify)
# handling response
if response.status_code == 200:
print(f'{collection_id} collection created successfully.')
return response
elif response.status_code == 409:
# HTTP 409 => collection exists. Retry with PUT to update, if `update_if_exists` input argument set to True.
if update_if_exists:
response = self.update_collection(collection_dict, update_if_exists=update_if_exists)
else:
print(f'{collection_id} already exists. Use `update_if_exists=True` to update collection.')
return None
# HTTP !== 200 => error
if response.status_code != 200:
print(f'{collection_id} collection POST failed: ' + str(response.json()))
return response
[docs] def update_collection(self, collection_dict, update_if_exists=False):
# copy input STAC collection dict
tmp_collection_dict = collection_dict.copy()
# set collection Id
collection_id = tmp_collection_dict['id']
# add collection default model to apply to new collection if not present in collection.json file
if 'model' not in tmp_collection_dict.keys():
tmp_collection_dict['model'] = COLLECTION_DEFAULT_MODEL
# discard summaries and links
if 'summaries' in tmp_collection_dict.keys():
tmp_collection_dict.pop('summaries')
if 'links' in tmp_collection_dict.keys():
tmp_collection_dict.pop('links')
# PUT request
url = f'{self.stac_api_url}/collections/{collection_id}'
ssl_verify = True
print(f'Updating existing {collection_id} collection using {COLLECTION_DEFAULT_MODEL} model to {url} ...')
response = requests.put(url, json=tmp_collection_dict, headers=self.headers, verify=ssl_verify)
# handle response
if response.status_code == 200:
print(f'{collection_id} collection created successfully.')
return response
elif response.status_code == 409:
# HTTP 409 => collection exists. Retry with PUT to update, if `update_if_exists` input argument set to True.
if update_if_exists:
response = self.update_collection(collection_dict, update_if_exists=update_if_exists)
else:
print(f'{collection_id} already exists. Use `update_if_exists=True` to update collection.')
return response
# HTTP !== 200 => error
if response.status_code != 200:
print(f'{collection_id} collection PUT failed: ' + str(response.json()))
return response
[docs] def delete_collection(self, collection_id):
# set request
url = f'{self.stac_api_url}/collections/{collection_id}'
ssl_verify = True
# DELETE request
print(f'Deleting {collection_id} collection ...')
response = requests.post(url, headers=self.headers, verify=ssl_verify)
# handle response
if response.status_code == 200:
print(f'{collection_id} collection deleted successfully.')
else:
print(f'{collection_id} collection DELETE failed: {str(response.json())}')
return None
return response
[docs] def post_feature(self, feature_dict, update_if_exists=False):
# set feature ID
feature_id = feature_dict['id']
collection_id = feature_dict['collection']
if feature_id in self.processed_features:
return None
# Set request
#
url = f'{self.stac_api_url}/collections/{collection_id}/items'
ssl_verify = True
# use or not ST_SplitDateLine
params = {}
if self.do_not_split_geom:
params = {'_splitGeom': 0}
# Post request
#
# print(f'Creating {feature_id} feature in {collection_id} collection ...')
response = requests.post(url, json=feature_dict, params=params, headers=self.headers, verify=ssl_verify)
# Handle response
#
# HTTP 409 => feature exists. Retry with PUT to update (self.update_feature)
if response.status_code == 409:
if update_if_exists:
response = self.update_feature(feature_dict)
else:
print(f'{feature_id} already exists. Use `update_if_exists=True` to update feature.')
return None
if response.status_code == 200:
print(f'{feature_id} feature created successfully.')
else:
print(f'{feature_id} feature POST failed: {str(response.json())}')
return None
# # HTTP !== 200 => error
# if response.status_code != 200:
# print('Collection POST failed: ' + str(response.json()))
return response
[docs] def update_feature(self, feature_dict):
# set feature and parent collection ID
feature_id = feature_dict['id']
collection_id = feature_dict['collection']
# set request
url = f'{self.stac_api_url}/collections/{collection_id}/items/{feature_id}'
ssl_verify = True
# post request
print(f'Updating {feature_id} feature in {collection_id} collection ...')
response = requests.put(url, json=feature_dict, headers=self.headers, verify=ssl_verify)
# handle response
if response.status_code == 200:
print(f'{feature_id} feature updated successfully.')
else:
print(f'{feature_id} feature PUT failed: {str(response.json())}')
return None
return response
[docs] def delete_feature(self, feature_id):
return None
[docs] def delete(self, stac_file='', collection_id='', feature_id='', catalog_id=''):
if stac_file:
# read input STAC file
stac_object_dict = pystac.read_file(stac_file).to_dict()
if not stac_object_dict or 'type' not in stac_object_dict.keys():
raise Exception(f'Invalid STAC file: {stac_file}.')
stac_object_type = stac_object_dict['type']
stac_object_id = stac_object_dict['id']
print(f'Found `{stac_object_id}` {stac_object_type.lower()}.')
else:
# get input STAC object type and ID
if collection_id:
stac_object_type = 'Collection'
stac_object_id = collection_id
elif feature_id:
stac_object_type = 'Feature'
stac_object_id = feature_id
elif catalog_id:
stac_object_type = 'Catalog'
stac_object_id = catalog_id
else:
raise ValueError('At least one of the input optional argument is required.')
# delete STAC object
if stac_object_type == 'Collection':
print(stac_object_id)
self.delete_collection(stac_object_id)
elif stac_object_type == 'Feature':
self.delete_feature(stac_object_id)
elif stac_object_type == 'Catalog':
self.delete_catalog(stac_object_id)
else:
raise ValueError('Invalid STAC object type.')
[docs] def ingest(self, stac_file='', stac_api_url='', update_if_exists=False, ingest_strategy='catalog'): # dest_catalog_name='',
"""Ingest input STAC file into destination STAC API catalog service.
Input STAC file can be a catalog, a collection or an item JSON file. Different ingestion strategies define
what is ingested i.e. 'catalog', 'feature', 'both' or 'none'.
~ stac2resto ("process_stuff(url, lookup_table"))
"""
# read input STAC file
stac_object_dict = pystac.read_file(stac_file).to_dict() # "stuff"
if not stac_object_dict or 'type' not in stac_object_dict.keys():
raise Exception(f'Invalid STAC file: {stac_file}.')
# Set destination STAC URL endpoint from collection SSYS targets parameters
if self.stac_api_url == '':
self.set_stac_api_url(stac_object_dict)
print(f'Set STAC API URL: {self.stac_api_url}')
if ingest_strategy not in INGEST_STRATEGIES.keys():
raise Exception(f'Invalid `{ingest_strategy}` ingestion strategy. Allowed values are: {list(INGEST_STRATEGIES.keys())}')
else:
ingest_feature = INGEST_STRATEGIES[ingest_strategy]['ingest_feature']
ingest_catalog = INGEST_STRATEGIES[ingest_strategy]['ingest_catalog']
if stac_object_dict['type'] == 'Feature':
if ingest_feature:
response = self.post_feature(stac_object_dict, update_if_exists=update_if_exists)
if response:
self.processed_features.append(stac_object_dict['id']) # append feature ID
return
if stac_object_dict['type'] == 'Collection':
print(f'Found `{stac_object_dict["id"]}` collection.')
if ingest_catalog:
self.post_collection(stac_object_dict, update_if_exists=update_if_exists)
if stac_object_dict['type'] == 'Catalog':
print(f'Found `{stac_object_dict["id"]}` catalog.')
if ingest_catalog:
self.post_catalog(stac_object_dict)
# no links skip
if isinstance(stac_object_dict['links'], list) is False:
print(' No links to process - skipping')
return
# recursively process/ingest links
size = len(stac_object_dict['links'])
print(" Found %s links" % str(size))
for link in stac_object_dict['links']:
# derive the absolute child url
# child_path = get_absolute_url(stac_file, link['href'])
child_path = str(Path(Path(stac_file).parent, link['href'])) #
if link['rel'] in ['item', 'items'] and ingest_feature:
# print(child_path)
# print(f'before self.ingest(): {self.stac_api_url}')
self.ingest(stac_file=child_path, ingest_strategy=ingest_strategy, update_if_exists=update_if_exists)
# print(f'after self.ingest(): {self.stac_api_url}')
elif link['rel'] == 'child':
print("------------------------------------------------------------------------------------")
print("Process %s" % child_path)
print("------------------------------------------------------------------------------------")
# print(f'before self.ingest(): {self.stac_api_url}')
self.ingest(stac_file=child_path, ingest_strategy=ingest_strategy, update_if_exists=update_if_exists)
# print(f'after self.ingest(): {self.stac_api_url}')
if stac_object_dict['type'] == 'Collection':
self.ingested = True
self.stac_url = f'{self.stac_api_url}/collections/{stac_object_dict["id"]}'