Coverage for pds_crawler/extractor/pds_ode_ws.py: 95%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

166 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6""" 

7Module Name: 

8 pds_ode_ws 

9 

10Description: 

11 the pds_ode_ws provides the metadata for the observations by querying ODE web services. 

12 

13Classes: 

14 PdsRegistry : 

15 Provides the list of georeferenced PDS collections by retrieving 

16 a list of Planetary Data System (PDS) data collections rom the 

17 PDS ODE (Outer Planets Data Exploration) web service. 

18 PdsRecordsWs : 

19 Handles the PDS records (download, store and load the records for 

20 one of several PDS collection). 

21 

22Author: 

23 Jean-Christophe Malapert 

24""" 

25import json 

26import logging 

27import os 

28import urllib.parse 

29from contextlib import closing 

30from typing import Any 

31from typing import cast 

32from typing import Dict 

33from typing import Iterator 

34from typing import List 

35from typing import Optional 

36from typing import Set 

37from typing import Tuple 

38 

39from ..exception import PdsCollectionAttributeError 

40from ..load import Database 

41from ..load import PdsCollectionStorage 

42from ..models import PdsRecordsModel 

43from ..models import PdsRegistryModel 

44from ..report import MessageModel 

45from ..utils import Observable 

46from ..utils import ProgressLogger 

47from ..utils import requests_retry_session 

48 

49logger = logging.getLogger(__name__) 

50 

51 

52class PdsRegistry(Observable): 

53 """Provides the list of georeferenced PDS collections by retrieving 

54 a list of Planetary Data System (PDS) data collections rom the 

55 PDS ODE (Outer Planets Data Exploration) web service. 

56 

57 .. uml:: 

58 

59 class PdsRegistry { 

60 +Database database 

61 - build_params(target: str = None) Dict[str, str] 

62 - get_response(params: Dict[str, str]) str 

63 - parse_response_collection(response: str) Tuple[Dict[str, int], List[PdsRegistryModel]] 

64 + get_pds_collections(body: str = None) Tuple[Dict[str, str], List[PdsRegistryModel]] 

65 + cache_pds_collections(pds_collections: List[PdsRegistryModel]) 

66 + load_pds_collections_from_cache() List[PdsRegistryModel] 

67 + query_cache(dataset_id: str) Optional[PdsRegistryModel] 

68 + distinct_dataset_values() Set 

69 } 

70 """ 

71 

72 SERVICE_ODE_END_POINT = "https://oderest.rsl.wustl.edu/live2/?" 

73 

74 def __init__(self, database: Database, *args, **kwargs): 

75 """Initializes the collection by setting a database to store the content that has 

76 been retrieved from the PDS 

77 

78 Args: 

79 database (Database): database 

80 """ 

81 super().__init__() 

82 if kwargs.get("report"): 

83 self.__report = kwargs.get("report") 

84 self.subscribe(self.__report) 

85 self.__database = database 

86 

87 @property 

88 def database(self) -> Database: 

89 """Returns the database 

90 

91 Returns: 

92 Database: database 

93 """ 

94 return self.__database 

95 

96 def _build_request_params( 

97 self, target: Optional[str] = None 

98 ) -> Dict[str, str]: 

99 """Build the query parameters. 

100 

101 Args: 

102 target (str, optional): target. Defaults to None. 

103 

104 Returns: 

105 Dict[str, str]: Query parameters 

106 """ 

107 params = {"query": "iipt", "output": "json"} 

108 if target is not None: 

109 params["odemetadb"] = target 

110 return params 

111 

112 def _get_response(self, params: Dict[str, str]) -> str: 

113 """Returns the content of web service response designed by 

114 the SERVICE_ODE_END_POINT and the query params. 

115 

116 Args: 

117 params (Dict[str, str]): Query parameters 

118 

119 Raises: 

120 Exception: PDS ODE REST API query error 

121 

122 Returns: 

123 str: the content of the web service response 

124 """ 

125 content: str 

126 with closing( 

127 requests_retry_session().get( 

128 PdsRegistry.SERVICE_ODE_END_POINT, 

129 params=params, 

130 timeout=(180, 1800), 

131 ) 

132 ) as response: 

133 url: str = response.url 

134 logger.debug( 

135 f"Requesting {url} to get the collections: \ 

136 {params['odemetadb'] if 'odemetadb' in params else 'All'}" 

137 ) 

138 if response.ok: 

139 content = response.json() 

140 else: 

141 raise Exception( 

142 f"PDS ODE REST API query {response.status_code} \ 

143 error: url={url}" 

144 ) 

145 return content 

146 

147 def _parse_collection_response( 

148 self, response: str, dataset_id: Optional[str] = None 

149 ) -> Tuple[Dict[str, int], List[PdsRegistryModel]]: 

150 """Parses the JSON response and returns a tuple that contains a 

151 statistic dictionary and a list of PdsRegistryModel objects. 

152 

153 Args: 

154 response (str): JSON response 

155 dataset_id (Optional[str]): dataset_id parameter, used to filter the response. Defaults to None 

156 

157 Returns: 

158 Tuple[Dict[str, int], List[PdsRegistryModel]]: statistic dictionary and a list of PdsRegistryModel objects 

159 """ 

160 iiptset_dicts = response["ODEResults"]["IIPTSets"]["IIPTSet"] # type: ignore 

161 nb_records: int = 0 

162 total: int = len(iiptset_dicts) 

163 errors: int = 0 

164 skip: int = 0 

165 nb_collections: int = 0 

166 pds_collections: List[PdsRegistryModel] = list() 

167 for iiptset_dict in iiptset_dicts: 

168 try: 

169 pds_collection: PdsRegistryModel = PdsRegistryModel.from_dict( 

170 iiptset_dict 

171 ) 

172 if pds_collection is not None and ( 

173 dataset_id is None 

174 or pds_collection.DataSetId.upper() == dataset_id.upper() 

175 ): 

176 nb_records += pds_collection.NumberProducts 

177 pds_collections.append(pds_collection) 

178 except PdsCollectionAttributeError as err: 

179 errors += 1 

180 logger.error(err) 

181 nb_collections: int = len(pds_collections) 

182 skip = total - (nb_collections + errors) 

183 stats = { 

184 "skip": skip, 

185 "errors": errors, 

186 "count": len(iiptset_dicts), 

187 "nb_records": nb_records, 

188 } 

189 return (stats, pds_collections) 

190 

191 def get_pds_collections( 

192 self, body: Optional[str] = None, dataset_id: Optional[str] = None 

193 ) -> Tuple[Dict[str, int], List[PdsRegistryModel]]: 

194 """Retrieve a list of Planetary Data System (PDS) data collections 

195 from the PDS ODE (Outer Planets Data Exploration) web service. 

196 

197 The method takes an optional body argument that specifies the name 

198 of the body for which to retrieve collections. If no argument is 

199 provided, the method retrieves collections for all bodys. 

200 The method sends an HTTP request to the PDS ODE web service with the 

201 appropriate parameters constructed from the body argument and 

202 parses the JSON response to extract the data collections. 

203 It then returns a tuple containing a dictionary of statistics and 

204 a list of PdsRegistryModel objects representing the data collections. 

205 

206 Args: 

207 body (str, optional): body. Defaults to None. 

208 dataset_id (str, optional): dataset ID. Defaults to None. 

209 

210 Returns: 

211 Tuple[Dict[str, str], List[PdsRegistryModel]]: a dictionary of s 

212 tatistics and a list of PdsRegistryModel objects representing the 

213 data collections 

214 """ 

215 request_params: Dict[str, str] = self._build_request_params(body) 

216 response: str = self._get_response(request_params) 

217 (stats, pds_collection) = self._parse_collection_response( 

218 response, dataset_id 

219 ) 

220 logger.info( 

221 f""" 

222 ODE Summary 

223 ----------- 

224 Nb records = {stats['nb_records']} 

225 Nb collections : {len(pds_collection)}/{stats['count']} 

226 Nb errors : {stats['errors']} 

227 Nb skip : {stats['skip']}""" 

228 ) 

229 return (stats, pds_collection) 

230 

231 def cache_pds_collections(self, pds_collections: List[PdsRegistryModel]): 

232 """Caches the PDS collections information by saving the PDS 

233 collections in the database. 

234 

235 Args: 

236 pds_collections (List[PdsRegistryModel]): the PDS collections information 

237 """ 

238 if not self.database.hdf5_storage.save_collections(pds_collections): 

239 logger.info("[PdsRegistry] No new collection to process") 

240 

241 def load_pds_collections_from_cache( 

242 self, body: Optional[str] = None, dataset_id: Optional[str] = None 

243 ) -> List[PdsRegistryModel]: 

244 """Loads the PDS collections information from the cache by loading 

245 the information from the database. 

246 

247 Args: 

248 body (Optional[str], optional): name of the body to get. Defaults to None. 

249 dataset_id (Optional[str], optional): Dataset ID parameter, used to filter the collection. Defaults to None. 

250 

251 Returns: 

252 List[PdsRegistryModel]: the PDS collections information 

253 """ 

254 return self.database.hdf5_storage.load_collections(body, dataset_id) 

255 

256 def query_cache(self, dataset_id: str) -> Optional[PdsRegistryModel]: 

257 """Query a local cache of PDS data collections for a specific 

258 dataset identified by its ID. 

259 

260 Args: 

261 dataset_id (str): ID of thr dataset 

262 

263 Returns: 

264 Optional[PdsRegistryModel]: PDS data collection 

265 """ 

266 result: Optional[PdsRegistryModel] = None 

267 for collection in self.load_pds_collections_from_cache(): 

268 if collection.DataSetId == dataset_id: 

269 result = collection 

270 break 

271 return result 

272 

273 def distinct_dataset_values(self) -> Set: 

274 """Gets a set of distinct values for the DataSetId attribute of 

275 PdsRegistryModel objects in a local cache of PDS data collections. 

276 

277 Returns: 

278 Set: Distinct values for the DataSetId attribute 

279 """ 

280 pds_collections: List[ 

281 PdsRegistryModel 

282 ] = self.load_pds_collections_from_cache() 

283 return set( 

284 [pds_collection.DataSetId for pds_collection in pds_collections] 

285 ) 

286 

287 

288class PdsRecordsWs(Observable): 

289 """Handles the PDS records web service. 

290 

291 Responsible to download from the web service and stores the JSON response in the database. 

292 This class is also responsible to parse the stored JSON and converts each record in the 

293 PdsRecordsModel. 

294 

295 .. uml:: 

296 

297 class PdsRecordsWs { 

298 +Database database 

299 - build_params(target: str, ihid: str, iid: str, pt: str, offset: int, limit: int = 1000,) Dict[str, str] 

300 - generate_all_pagination_params(target: str, ihid: str, iid: str, pt: str, total: int, offset: int = 1, limit: int = 1000) List[Tuple[str, Any]] 

301 - generate_urls_for_pages(self, all_pagination_params: List[Tuple[str]]) -> List[str] 

302 - __repr__(self) str 

303 + generate_urls_for_one_collection(pds_collection: PdsRegistryModel, offset: int = 1, limit: int = 5000): 

304 + generate_urls_for_all_collections(pds_collection: List[PdsRegistryModel], offset: int = 1, limit: int = 5000) 

305 + generate_urls_for_all_collections(pds_collections: List[PdsRegistryModel], offset: int = 1, limit: int = 5000) 

306 + download_pds_records_for_one_collection(pds_collection: PdsRegistryModel, limit: Optional[int] = None) 

307 + download_pds_records_for_all_collections(pds_collections: List[PdsRegistryModel]) 

308 + parse_pds_collection_from_cache(pds_collection: PdsRegistryModel, disable_tqdm: bool = False) Iterator[PdsRecordsModel] 

309 } 

310 """ 

311 

312 SERVICE_ODE_END_POINT = "https://oderest.rsl.wustl.edu/live2/?" 

313 

314 def __init__(self, database: Database, *args, **kwargs): 

315 super().__init__() 

316 self.__database = database 

317 if kwargs.get("report"): 

318 self.__report = kwargs.get("report") 

319 self.subscribe(self.__report) 

320 

321 @property 

322 def database(self) -> Database: 

323 """Returns the database 

324 

325 Returns: 

326 Database: database 

327 """ 

328 return self.__database 

329 

330 def _build_request_params( 

331 self, 

332 target: str, 

333 ihid: str, 

334 iid: str, 

335 pt: str, 

336 offset: int, 

337 limit: int = 1000, 

338 ) -> Dict[str, str]: 

339 """Builds the query parameters. 

340 

341 Args: 

342 target (str): body 

343 ihid (str): plateforme 

344 iid (str): instrument 

345 pt (str): product type 

346 offset (int): record number where the pagination start 

347 limit (int, optional): number of records in the response. Defaults to 1000. 

348 

349 Returns: 

350 Dict[str, str]: key/value to prepare the query 

351 """ 

352 params = { 

353 "query": "product", 

354 "target": target, 

355 "results": "copmf", 

356 "ihid": ihid, 

357 "iid": iid, 

358 "pt": pt, 

359 "offset": offset, 

360 "limit": limit, 

361 "output": "json", 

362 } 

363 return params 

364 

365 def _generate_all_pagination_params( 

366 self, 

367 target: str, 

368 ihid: str, 

369 iid: str, 

370 pt: str, 

371 total: int, 

372 offset: int = 1, 

373 limit: int = 1000, 

374 ) -> List[Tuple[str, Any]]: 

375 """Generates all pagination parameters 

376 

377 Args: 

378 target (str): body 

379 ihid (str): plateform 

380 iid (str): instrument 

381 pt (str): product type 

382 total (int): total number of records that we want 

383 offset (int, optional): record number where the pagination starts. Defaults to 1. 

384 limit (int, optional): maximum number of records in the response. Defaults to 1000. 

385 

386 Returns: 

387 List[Tuple[str]]: List of (target, ihid, iid, pt, total, pagination_start, limit) 

388 """ 

389 params_paginations: List[Tuple[str, Any]] = list() 

390 pagination_start = offset 

391 while pagination_start <= total: 

392 params_paginations.append( 

393 (target, ihid, iid, pt, total, pagination_start, limit) # type: ignore 

394 ) 

395 pagination_start = pagination_start + limit 

396 return params_paginations 

397 

398 def _generate_urls_for_pages( 

399 self, all_pagination_params: List[Tuple[str]] 

400 ) -> List[str]: 

401 """Generates all URLs based on all pagination parameters of the pages 

402 

403 Args: 

404 all_pagination_params (List[Tuple[str]]): pargination parameters for all pages 

405 

406 Returns: 

407 List[str]: all URLs 

408 """ 

409 urls: List[str] = list() 

410 for pagination_params in all_pagination_params: 

411 param_url = [*pagination_params[0:4], *pagination_params[5:7]] 

412 request_params: Dict[str, str] = self._build_request_params( 

413 *param_url # type: ignore 

414 ) 

415 url = PdsRecordsWs.SERVICE_ODE_END_POINT + urllib.parse.urlencode( 

416 request_params 

417 ) 

418 urls.append(url) 

419 return urls 

420 

421 def generate_urls_for_one_collection( 

422 self, 

423 pds_collection: PdsRegistryModel, 

424 offset: int = 1, 

425 limit: int = 5000, 

426 ): 

427 """Generates the URLs for one collection and save them in the database. 

428 

429 There Urls will be used to dowload massively the records. 

430 

431 Args: 

432 pds_collection (PdsRegistryModel): pds collection 

433 offset (int, optional): record number from which the response starts. Defaults to 1. 

434 limit (int, optional): maximum number of records per page. Defaults to 5000. 

435 """ 

436 all_pagination_params: List[ 

437 Tuple[str] 

438 ] = self._generate_all_pagination_params( 

439 pds_collection.ODEMetaDB.lower(), # type: ignore 

440 pds_collection.IHID, 

441 pds_collection.IID, 

442 pds_collection.PT, 

443 pds_collection.NumberProducts, 

444 offset, 

445 limit, 

446 ) 

447 urls = self._generate_urls_for_pages(all_pagination_params) 

448 self.database.hdf5_storage.save_urls(pds_collection, urls) 

449 

450 def generate_urls_for_all_collections( 

451 self, 

452 pds_collections: List[PdsRegistryModel], 

453 offset: int = 1, 

454 limit: int = 5000, 

455 ): 

456 """Generates the URLs for all the collections and save them in the database. 

457 

458 There Urls will be used to dowload massively the records. 

459 

460 Args: 

461 pds_collections (List[PdsRegistryModel]): PDS collections 

462 offset (int, optional): record number. Defaults to 1. 

463 limit (int, optional): limit per page. Defaults to 5000. 

464 """ 

465 for pds_collection in pds_collections: 

466 self.generate_urls_for_one_collection( 

467 pds_collection, offset, limit 

468 ) 

469 

470 def download_pds_records_for_one_collection( 

471 self, 

472 pds_collection: PdsRegistryModel, 

473 limit: Optional[int] = None, 

474 nb_workers: int = 3, 

475 time_sleep: int = 1, 

476 progress_bar: bool = True, 

477 ): 

478 """Download records for a given PDS collection based on the set of 

479 URLs loaded from the database. 

480 

481 Args: 

482 pds_collection (PdsRegistryModel): PDS collection 

483 limit (int, optional): Number of pages. Defaults to None. 

484 nb_workers (int, optional): Number of workers in parallel. Defaults to 3. 

485 time_sleep (int, optional): Time to way between two download series. Defaults to 1. 

486 progress_bar (False, optional): Set progress_bar. Defaults to True. 

487 """ 

488 urls: List[str] = self.database.hdf5_storage.load_urls(pds_collection) 

489 if len(urls) == 0: 

490 # No URL in the cache, generate now 

491 self.generate_urls_for_one_collection(pds_collection) 

492 urls = self.database.hdf5_storage.load_urls(pds_collection) 

493 

494 # if a sample is needed 

495 if limit is not None: 

496 urls = urls[0:limit] 

497 

498 # Get the storage for the collection 

499 file_storage: PdsCollectionStorage = ( 

500 self.database.pds_storage.get_pds_storage_for(pds_collection) 

501 ) 

502 

503 # Download files in the storage 

504 file_storage.download( 

505 urls=urls, 

506 time_sleep=time_sleep, 

507 nb_workers=nb_workers, 

508 progress_bar=progress_bar, 

509 ) 

510 

511 def download_pds_records_for_all_collections( 

512 self, 

513 pds_collections: List[PdsRegistryModel], 

514 limit: Optional[int] = None, 

515 nb_workers: int = 3, 

516 time_sleep: int = 1, 

517 progress_bar: bool = True, 

518 ): 

519 """Download PDS records for all collections 

520 

521 Args: 

522 pds_collections (List[PdsRegistryModel]): _description_ 

523 limit (Optional[int], optional): Number of pages. Defaults to None. 

524 nb_workers (int, optional): Number of workers in parallel. Defaults to 3. 

525 time_sleep (int, optional): Time to way between two download series. Defaults to 1. 

526 progress_bar (bool, optional): Set progress bar. Defaults to True. 

527 """ 

528 with ProgressLogger( 

529 total=len(pds_collections), 

530 iterable=pds_collections, 

531 logger=logger, 

532 description="Getting collections", 

533 position=0, 

534 disable_tqdm=not progress_bar, 

535 ) as progress_logger: 

536 for pds_collection in progress_logger: 

537 self.download_pds_records_for_one_collection( 

538 cast(PdsRegistryModel, pds_collection), 

539 limit, 

540 nb_workers=nb_workers, 

541 time_sleep=time_sleep, 

542 progress_bar=progress_bar, 

543 ) 

544 

545 def parse_pds_collection_from_cache( 

546 self, pds_collection: PdsRegistryModel, progress_bar: bool = True 

547 ) -> Iterator[PdsRecordsModel]: 

548 """Parses the PDS records from cache. 

549 

550 Responsible for parsing PDS records from the local cache of the PDS catalogs. 

551 

552 It takes a PdsRegistryModel object as input, which contains the metadata needed 

553 to locate the downloaded PDS records in the local cache. 

554 

555 If the parsing is successful and the resulting PdsRecordsModel object is not None, 

556 the method yields the object. If the parsing fails, the method logs an error message 

557 and notifies its observers with a MessageModel object containing information about 

558 the file and the error. 

559 

560 Args: 

561 pds_collection (PdsRegistryModel): PDS collection of the registry 

562 progress_bar (bool, optional): use progress bar. Defaults to True. 

563 

564 Yields: 

565 Iterator[PdsRecordsModel]: Iterator on the list of records_ 

566 """ 

567 

568 def dsRecordsModelDecoder(dct): 

569 if "ODEResults" in dct and dct["ODEResults"]["Count"] != "0": 

570 products_dict = dct["ODEResults"]["Products"]["Product"] 

571 if not isinstance(products_dict, list): 

572 products_dict = [products_dict] 

573 return PdsRecordsModel.from_dict(products_dict) 

574 elif "ODEResults" in dct and dct["ODEResults"]["Count"] == "0": 

575 return None 

576 return dct 

577 

578 collection_storage: PdsCollectionStorage = ( 

579 self.database.pds_storage.get_pds_storage_for(pds_collection) 

580 ) 

581 records_files: List[str] = collection_storage.list_records_files() 

582 with ProgressLogger( 

583 total=len(records_files), 

584 iterable=records_files, 

585 logger=logger, 

586 description=f"Downloaded responses from {pds_collection.DataSetId}", 

587 position=1, 

588 leave=False, 

589 disable_tqdm=not progress_bar, 

590 ) as progress_logger: 

591 for file_in_records in progress_logger: 

592 file = os.path.join( 

593 collection_storage.directory, cast(str, file_in_records) 

594 ) 

595 content: str 

596 with open(file, encoding="utf8", errors="ignore") as f: 

597 content = f.read() 

598 try: 

599 result = json.loads( 

600 content, object_hook=dsRecordsModelDecoder 

601 ) 

602 if result is not None: 

603 yield result 

604 except json.JSONDecodeError as err: 

605 message = f"{file} must be deleted !" 

606 logger.error(message) 

607 self.notify_observers(MessageModel(file, err)) 

608 

609 def __repr__(self) -> str: 

610 return f"PdsRecordsWs({self.database})"