Coverage for pds_crawler/extractor/pds_ode_ws.py: 95%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""
7Module Name:
8 pds_ode_ws
10Description:
11 the pds_ode_ws provides the metadata for the observations by querying ODE web services.
13Classes:
14 PdsRegistry :
15 Provides the list of georeferenced PDS collections by retrieving
16 a list of Planetary Data System (PDS) data collections rom the
17 PDS ODE (Outer Planets Data Exploration) web service.
18 PdsRecordsWs :
19 Handles the PDS records (download, store and load the records for
20 one of several PDS collection).
22Author:
23 Jean-Christophe Malapert
24"""
25import json
26import logging
27import os
28import urllib.parse
29from contextlib import closing
30from typing import Any
31from typing import cast
32from typing import Dict
33from typing import Iterator
34from typing import List
35from typing import Optional
36from typing import Set
37from typing import Tuple
39from ..exception import PdsCollectionAttributeError
40from ..load import Database
41from ..load import PdsCollectionStorage
42from ..models import PdsRecordsModel
43from ..models import PdsRegistryModel
44from ..report import MessageModel
45from ..utils import Observable
46from ..utils import ProgressLogger
47from ..utils import requests_retry_session
49logger = logging.getLogger(__name__)
52class PdsRegistry(Observable):
53 """Provides the list of georeferenced PDS collections by retrieving
54 a list of Planetary Data System (PDS) data collections rom the
55 PDS ODE (Outer Planets Data Exploration) web service.
57 .. uml::
59 class PdsRegistry {
60 +Database database
61 - build_params(target: str = None) Dict[str, str]
62 - get_response(params: Dict[str, str]) str
63 - parse_response_collection(response: str) Tuple[Dict[str, int], List[PdsRegistryModel]]
64 + get_pds_collections(body: str = None) Tuple[Dict[str, str], List[PdsRegistryModel]]
65 + cache_pds_collections(pds_collections: List[PdsRegistryModel])
66 + load_pds_collections_from_cache() List[PdsRegistryModel]
67 + query_cache(dataset_id: str) Optional[PdsRegistryModel]
68 + distinct_dataset_values() Set
69 }
70 """
72 SERVICE_ODE_END_POINT = "https://oderest.rsl.wustl.edu/live2/?"
74 def __init__(self, database: Database, *args, **kwargs):
75 """Initializes the collection by setting a database to store the content that has
76 been retrieved from the PDS
78 Args:
79 database (Database): database
80 """
81 super().__init__()
82 if kwargs.get("report"):
83 self.__report = kwargs.get("report")
84 self.subscribe(self.__report)
85 self.__database = database
87 @property
88 def database(self) -> Database:
89 """Returns the database
91 Returns:
92 Database: database
93 """
94 return self.__database
96 def _build_request_params(
97 self, target: Optional[str] = None
98 ) -> Dict[str, str]:
99 """Build the query parameters.
101 Args:
102 target (str, optional): target. Defaults to None.
104 Returns:
105 Dict[str, str]: Query parameters
106 """
107 params = {"query": "iipt", "output": "json"}
108 if target is not None:
109 params["odemetadb"] = target
110 return params
112 def _get_response(self, params: Dict[str, str]) -> str:
113 """Returns the content of web service response designed by
114 the SERVICE_ODE_END_POINT and the query params.
116 Args:
117 params (Dict[str, str]): Query parameters
119 Raises:
120 Exception: PDS ODE REST API query error
122 Returns:
123 str: the content of the web service response
124 """
125 content: str
126 with closing(
127 requests_retry_session().get(
128 PdsRegistry.SERVICE_ODE_END_POINT,
129 params=params,
130 timeout=(180, 1800),
131 )
132 ) as response:
133 url: str = response.url
134 logger.debug(
135 f"Requesting {url} to get the collections: \
136 {params['odemetadb'] if 'odemetadb' in params else 'All'}"
137 )
138 if response.ok:
139 content = response.json()
140 else:
141 raise Exception(
142 f"PDS ODE REST API query {response.status_code} \
143 error: url={url}"
144 )
145 return content
147 def _parse_collection_response(
148 self, response: str, dataset_id: Optional[str] = None
149 ) -> Tuple[Dict[str, int], List[PdsRegistryModel]]:
150 """Parses the JSON response and returns a tuple that contains a
151 statistic dictionary and a list of PdsRegistryModel objects.
153 Args:
154 response (str): JSON response
155 dataset_id (Optional[str]): dataset_id parameter, used to filter the response. Defaults to None
157 Returns:
158 Tuple[Dict[str, int], List[PdsRegistryModel]]: statistic dictionary and a list of PdsRegistryModel objects
159 """
160 iiptset_dicts = response["ODEResults"]["IIPTSets"]["IIPTSet"] # type: ignore
161 nb_records: int = 0
162 total: int = len(iiptset_dicts)
163 errors: int = 0
164 skip: int = 0
165 nb_collections: int = 0
166 pds_collections: List[PdsRegistryModel] = list()
167 for iiptset_dict in iiptset_dicts:
168 try:
169 pds_collection: PdsRegistryModel = PdsRegistryModel.from_dict(
170 iiptset_dict
171 )
172 if pds_collection is not None and (
173 dataset_id is None
174 or pds_collection.DataSetId.upper() == dataset_id.upper()
175 ):
176 nb_records += pds_collection.NumberProducts
177 pds_collections.append(pds_collection)
178 except PdsCollectionAttributeError as err:
179 errors += 1
180 logger.error(err)
181 nb_collections: int = len(pds_collections)
182 skip = total - (nb_collections + errors)
183 stats = {
184 "skip": skip,
185 "errors": errors,
186 "count": len(iiptset_dicts),
187 "nb_records": nb_records,
188 }
189 return (stats, pds_collections)
191 def get_pds_collections(
192 self, body: Optional[str] = None, dataset_id: Optional[str] = None
193 ) -> Tuple[Dict[str, int], List[PdsRegistryModel]]:
194 """Retrieve a list of Planetary Data System (PDS) data collections
195 from the PDS ODE (Outer Planets Data Exploration) web service.
197 The method takes an optional body argument that specifies the name
198 of the body for which to retrieve collections. If no argument is
199 provided, the method retrieves collections for all bodys.
200 The method sends an HTTP request to the PDS ODE web service with the
201 appropriate parameters constructed from the body argument and
202 parses the JSON response to extract the data collections.
203 It then returns a tuple containing a dictionary of statistics and
204 a list of PdsRegistryModel objects representing the data collections.
206 Args:
207 body (str, optional): body. Defaults to None.
208 dataset_id (str, optional): dataset ID. Defaults to None.
210 Returns:
211 Tuple[Dict[str, str], List[PdsRegistryModel]]: a dictionary of s
212 tatistics and a list of PdsRegistryModel objects representing the
213 data collections
214 """
215 request_params: Dict[str, str] = self._build_request_params(body)
216 response: str = self._get_response(request_params)
217 (stats, pds_collection) = self._parse_collection_response(
218 response, dataset_id
219 )
220 logger.info(
221 f"""
222 ODE Summary
223 -----------
224 Nb records = {stats['nb_records']}
225 Nb collections : {len(pds_collection)}/{stats['count']}
226 Nb errors : {stats['errors']}
227 Nb skip : {stats['skip']}"""
228 )
229 return (stats, pds_collection)
231 def cache_pds_collections(self, pds_collections: List[PdsRegistryModel]):
232 """Caches the PDS collections information by saving the PDS
233 collections in the database.
235 Args:
236 pds_collections (List[PdsRegistryModel]): the PDS collections information
237 """
238 if not self.database.hdf5_storage.save_collections(pds_collections):
239 logger.info("[PdsRegistry] No new collection to process")
241 def load_pds_collections_from_cache(
242 self, body: Optional[str] = None, dataset_id: Optional[str] = None
243 ) -> List[PdsRegistryModel]:
244 """Loads the PDS collections information from the cache by loading
245 the information from the database.
247 Args:
248 body (Optional[str], optional): name of the body to get. Defaults to None.
249 dataset_id (Optional[str], optional): Dataset ID parameter, used to filter the collection. Defaults to None.
251 Returns:
252 List[PdsRegistryModel]: the PDS collections information
253 """
254 return self.database.hdf5_storage.load_collections(body, dataset_id)
256 def query_cache(self, dataset_id: str) -> Optional[PdsRegistryModel]:
257 """Query a local cache of PDS data collections for a specific
258 dataset identified by its ID.
260 Args:
261 dataset_id (str): ID of thr dataset
263 Returns:
264 Optional[PdsRegistryModel]: PDS data collection
265 """
266 result: Optional[PdsRegistryModel] = None
267 for collection in self.load_pds_collections_from_cache():
268 if collection.DataSetId == dataset_id:
269 result = collection
270 break
271 return result
273 def distinct_dataset_values(self) -> Set:
274 """Gets a set of distinct values for the DataSetId attribute of
275 PdsRegistryModel objects in a local cache of PDS data collections.
277 Returns:
278 Set: Distinct values for the DataSetId attribute
279 """
280 pds_collections: List[
281 PdsRegistryModel
282 ] = self.load_pds_collections_from_cache()
283 return set(
284 [pds_collection.DataSetId for pds_collection in pds_collections]
285 )
288class PdsRecordsWs(Observable):
289 """Handles the PDS records web service.
291 Responsible to download from the web service and stores the JSON response in the database.
292 This class is also responsible to parse the stored JSON and converts each record in the
293 PdsRecordsModel.
295 .. uml::
297 class PdsRecordsWs {
298 +Database database
299 - build_params(target: str, ihid: str, iid: str, pt: str, offset: int, limit: int = 1000,) Dict[str, str]
300 - generate_all_pagination_params(target: str, ihid: str, iid: str, pt: str, total: int, offset: int = 1, limit: int = 1000) List[Tuple[str, Any]]
301 - generate_urls_for_pages(self, all_pagination_params: List[Tuple[str]]) -> List[str]
302 - __repr__(self) str
303 + generate_urls_for_one_collection(pds_collection: PdsRegistryModel, offset: int = 1, limit: int = 5000):
304 + generate_urls_for_all_collections(pds_collection: List[PdsRegistryModel], offset: int = 1, limit: int = 5000)
305 + generate_urls_for_all_collections(pds_collections: List[PdsRegistryModel], offset: int = 1, limit: int = 5000)
306 + download_pds_records_for_one_collection(pds_collection: PdsRegistryModel, limit: Optional[int] = None)
307 + download_pds_records_for_all_collections(pds_collections: List[PdsRegistryModel])
308 + parse_pds_collection_from_cache(pds_collection: PdsRegistryModel, disable_tqdm: bool = False) Iterator[PdsRecordsModel]
309 }
310 """
312 SERVICE_ODE_END_POINT = "https://oderest.rsl.wustl.edu/live2/?"
314 def __init__(self, database: Database, *args, **kwargs):
315 super().__init__()
316 self.__database = database
317 if kwargs.get("report"):
318 self.__report = kwargs.get("report")
319 self.subscribe(self.__report)
321 @property
322 def database(self) -> Database:
323 """Returns the database
325 Returns:
326 Database: database
327 """
328 return self.__database
330 def _build_request_params(
331 self,
332 target: str,
333 ihid: str,
334 iid: str,
335 pt: str,
336 offset: int,
337 limit: int = 1000,
338 ) -> Dict[str, str]:
339 """Builds the query parameters.
341 Args:
342 target (str): body
343 ihid (str): plateforme
344 iid (str): instrument
345 pt (str): product type
346 offset (int): record number where the pagination start
347 limit (int, optional): number of records in the response. Defaults to 1000.
349 Returns:
350 Dict[str, str]: key/value to prepare the query
351 """
352 params = {
353 "query": "product",
354 "target": target,
355 "results": "copmf",
356 "ihid": ihid,
357 "iid": iid,
358 "pt": pt,
359 "offset": offset,
360 "limit": limit,
361 "output": "json",
362 }
363 return params
365 def _generate_all_pagination_params(
366 self,
367 target: str,
368 ihid: str,
369 iid: str,
370 pt: str,
371 total: int,
372 offset: int = 1,
373 limit: int = 1000,
374 ) -> List[Tuple[str, Any]]:
375 """Generates all pagination parameters
377 Args:
378 target (str): body
379 ihid (str): plateform
380 iid (str): instrument
381 pt (str): product type
382 total (int): total number of records that we want
383 offset (int, optional): record number where the pagination starts. Defaults to 1.
384 limit (int, optional): maximum number of records in the response. Defaults to 1000.
386 Returns:
387 List[Tuple[str]]: List of (target, ihid, iid, pt, total, pagination_start, limit)
388 """
389 params_paginations: List[Tuple[str, Any]] = list()
390 pagination_start = offset
391 while pagination_start <= total:
392 params_paginations.append(
393 (target, ihid, iid, pt, total, pagination_start, limit) # type: ignore
394 )
395 pagination_start = pagination_start + limit
396 return params_paginations
398 def _generate_urls_for_pages(
399 self, all_pagination_params: List[Tuple[str]]
400 ) -> List[str]:
401 """Generates all URLs based on all pagination parameters of the pages
403 Args:
404 all_pagination_params (List[Tuple[str]]): pargination parameters for all pages
406 Returns:
407 List[str]: all URLs
408 """
409 urls: List[str] = list()
410 for pagination_params in all_pagination_params:
411 param_url = [*pagination_params[0:4], *pagination_params[5:7]]
412 request_params: Dict[str, str] = self._build_request_params(
413 *param_url # type: ignore
414 )
415 url = PdsRecordsWs.SERVICE_ODE_END_POINT + urllib.parse.urlencode(
416 request_params
417 )
418 urls.append(url)
419 return urls
421 def generate_urls_for_one_collection(
422 self,
423 pds_collection: PdsRegistryModel,
424 offset: int = 1,
425 limit: int = 5000,
426 ):
427 """Generates the URLs for one collection and save them in the database.
429 There Urls will be used to dowload massively the records.
431 Args:
432 pds_collection (PdsRegistryModel): pds collection
433 offset (int, optional): record number from which the response starts. Defaults to 1.
434 limit (int, optional): maximum number of records per page. Defaults to 5000.
435 """
436 all_pagination_params: List[
437 Tuple[str]
438 ] = self._generate_all_pagination_params(
439 pds_collection.ODEMetaDB.lower(), # type: ignore
440 pds_collection.IHID,
441 pds_collection.IID,
442 pds_collection.PT,
443 pds_collection.NumberProducts,
444 offset,
445 limit,
446 )
447 urls = self._generate_urls_for_pages(all_pagination_params)
448 self.database.hdf5_storage.save_urls(pds_collection, urls)
450 def generate_urls_for_all_collections(
451 self,
452 pds_collections: List[PdsRegistryModel],
453 offset: int = 1,
454 limit: int = 5000,
455 ):
456 """Generates the URLs for all the collections and save them in the database.
458 There Urls will be used to dowload massively the records.
460 Args:
461 pds_collections (List[PdsRegistryModel]): PDS collections
462 offset (int, optional): record number. Defaults to 1.
463 limit (int, optional): limit per page. Defaults to 5000.
464 """
465 for pds_collection in pds_collections:
466 self.generate_urls_for_one_collection(
467 pds_collection, offset, limit
468 )
470 def download_pds_records_for_one_collection(
471 self,
472 pds_collection: PdsRegistryModel,
473 limit: Optional[int] = None,
474 nb_workers: int = 3,
475 time_sleep: int = 1,
476 progress_bar: bool = True,
477 ):
478 """Download records for a given PDS collection based on the set of
479 URLs loaded from the database.
481 Args:
482 pds_collection (PdsRegistryModel): PDS collection
483 limit (int, optional): Number of pages. Defaults to None.
484 nb_workers (int, optional): Number of workers in parallel. Defaults to 3.
485 time_sleep (int, optional): Time to way between two download series. Defaults to 1.
486 progress_bar (False, optional): Set progress_bar. Defaults to True.
487 """
488 urls: List[str] = self.database.hdf5_storage.load_urls(pds_collection)
489 if len(urls) == 0:
490 # No URL in the cache, generate now
491 self.generate_urls_for_one_collection(pds_collection)
492 urls = self.database.hdf5_storage.load_urls(pds_collection)
494 # if a sample is needed
495 if limit is not None:
496 urls = urls[0:limit]
498 # Get the storage for the collection
499 file_storage: PdsCollectionStorage = (
500 self.database.pds_storage.get_pds_storage_for(pds_collection)
501 )
503 # Download files in the storage
504 file_storage.download(
505 urls=urls,
506 time_sleep=time_sleep,
507 nb_workers=nb_workers,
508 progress_bar=progress_bar,
509 )
511 def download_pds_records_for_all_collections(
512 self,
513 pds_collections: List[PdsRegistryModel],
514 limit: Optional[int] = None,
515 nb_workers: int = 3,
516 time_sleep: int = 1,
517 progress_bar: bool = True,
518 ):
519 """Download PDS records for all collections
521 Args:
522 pds_collections (List[PdsRegistryModel]): _description_
523 limit (Optional[int], optional): Number of pages. Defaults to None.
524 nb_workers (int, optional): Number of workers in parallel. Defaults to 3.
525 time_sleep (int, optional): Time to way between two download series. Defaults to 1.
526 progress_bar (bool, optional): Set progress bar. Defaults to True.
527 """
528 with ProgressLogger(
529 total=len(pds_collections),
530 iterable=pds_collections,
531 logger=logger,
532 description="Getting collections",
533 position=0,
534 disable_tqdm=not progress_bar,
535 ) as progress_logger:
536 for pds_collection in progress_logger:
537 self.download_pds_records_for_one_collection(
538 cast(PdsRegistryModel, pds_collection),
539 limit,
540 nb_workers=nb_workers,
541 time_sleep=time_sleep,
542 progress_bar=progress_bar,
543 )
545 def parse_pds_collection_from_cache(
546 self, pds_collection: PdsRegistryModel, progress_bar: bool = True
547 ) -> Iterator[PdsRecordsModel]:
548 """Parses the PDS records from cache.
550 Responsible for parsing PDS records from the local cache of the PDS catalogs.
552 It takes a PdsRegistryModel object as input, which contains the metadata needed
553 to locate the downloaded PDS records in the local cache.
555 If the parsing is successful and the resulting PdsRecordsModel object is not None,
556 the method yields the object. If the parsing fails, the method logs an error message
557 and notifies its observers with a MessageModel object containing information about
558 the file and the error.
560 Args:
561 pds_collection (PdsRegistryModel): PDS collection of the registry
562 progress_bar (bool, optional): use progress bar. Defaults to True.
564 Yields:
565 Iterator[PdsRecordsModel]: Iterator on the list of records_
566 """
568 def dsRecordsModelDecoder(dct):
569 if "ODEResults" in dct and dct["ODEResults"]["Count"] != "0":
570 products_dict = dct["ODEResults"]["Products"]["Product"]
571 if not isinstance(products_dict, list):
572 products_dict = [products_dict]
573 return PdsRecordsModel.from_dict(products_dict)
574 elif "ODEResults" in dct and dct["ODEResults"]["Count"] == "0":
575 return None
576 return dct
578 collection_storage: PdsCollectionStorage = (
579 self.database.pds_storage.get_pds_storage_for(pds_collection)
580 )
581 records_files: List[str] = collection_storage.list_records_files()
582 with ProgressLogger(
583 total=len(records_files),
584 iterable=records_files,
585 logger=logger,
586 description=f"Downloaded responses from {pds_collection.DataSetId}",
587 position=1,
588 leave=False,
589 disable_tqdm=not progress_bar,
590 ) as progress_logger:
591 for file_in_records in progress_logger:
592 file = os.path.join(
593 collection_storage.directory, cast(str, file_in_records)
594 )
595 content: str
596 with open(file, encoding="utf8", errors="ignore") as f:
597 content = f.read()
598 try:
599 result = json.loads(
600 content, object_hook=dsRecordsModelDecoder
601 )
602 if result is not None:
603 yield result
604 except json.JSONDecodeError as err:
605 message = f"{file} must be deleted !"
606 logger.error(message)
607 self.notify_observers(MessageModel(file, err))
609 def __repr__(self) -> str:
610 return f"PdsRecordsWs({self.database})"