Coverage for pds_crawler/extractor/pds_ode_website.py: 76%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

334 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6""" 

7Module Name: 

8 pds_ode_website 

9 

10Description: 

11 the pds_ode_website module parses the PDS3 Dataset explorer to get the different catalogs 

12 to download them. 

13 

14Classes: 

15 Crawler : 

16 Crawles the content of the Dataset explorer web site. 

17 PDSCatalogDescription : 

18 Parses the content of the PDS3 catalogs for a given PDS collection. 

19 PDSCatalogsDescription : 

20 Downloads the PDS3 objects (catalogs) on the local storage and parses the 

21 PDS3 objects from the local storage 

22 

23Author: 

24 Jean-Christophe Malapert 

25""" 

26import logging 

27import time 

28from contextlib import closing 

29from json.decoder import JSONDecodeError 

30from string import Template 

31from typing import Any 

32from typing import cast 

33from typing import Dict 

34from typing import Iterator 

35from typing import List 

36from typing import Optional 

37from typing import Union 

38from urllib.parse import parse_qs 

39from urllib.parse import ParseResult 

40from urllib.parse import urlparse 

41 

42from bs4 import BeautifulSoup 

43from bs4 import element 

44from lark.exceptions import UnexpectedCharacters 

45from requests.exceptions import ConnectionError 

46 

47from ..exception import NoFileExistInFolder 

48from ..exception import PdsCatalogDescriptionError 

49from ..load import Database 

50from ..load import PdsCollectionStorage 

51from ..load import PdsParserFactory 

52from ..models import CatalogModel 

53from ..models import PdsRecordModel 

54from ..models import PdsRecordsModel 

55from ..models import PdsRegistryModel 

56from ..models import VolumeModel 

57from ..report import MessageModel 

58from ..utils import Observable 

59from ..utils import parallel_requests 

60from ..utils import requests_retry_session 

61from .pds_ode_ws import PdsRecordsWs 

62 

63logger = logging.getLogger(__name__) 

64 

65 

66class Crawler: 

67 """Crawles the content of the Dataset explorer web site. 

68 

69 The main purpose of the class is to retrieve the links and subdirectories 

70 from the given web page and return them as a list. The class also checks 

71 if a given URL is a file or a directory and raises an exception if no files 

72 exist in the folder. 

73 

74 .. uml:: 

75 

76 class Crawler { 

77 + str url 

78 + str host 

79 + str fragment 

80 - get_subdirs_file(soup) List[Dict[str, str]] 

81 - get_content(host: str, query: str) Optional[List[Dict[str, str]]] 

82 + static is_file(url: str) bool 

83 + static query(url: str) str 

84 + parse() -> Optional[List[Dict[str, str]]] 

85 } 

86 """ 

87 

88 def __init__(self, url: str): 

89 """Initializes the Crawler with an URL 

90 

91 Args: 

92 url (str): URL 

93 """ 

94 super().__init__() 

95 self.__url: str = url 

96 url_split: List[str] = url.split("/") 

97 self._host: str = "/".join(url_split[:-1]) 

98 self._fragment: str = url_split[-1] 

99 

100 @staticmethod 

101 def is_file(url: str) -> bool: 

102 """Tests whether the URL points to a file or not. 

103 

104 The method extracts the last fragment of the path, 

105 which is usually the name of the file or directory 

106 being pointed to. If the last fragment contains a 

107 period (".") character, it is considered to be a 

108 file and not a directory. The method then checks 

109 if the characters after the last period character 

110 are numeric (indicating a file with an extension 

111 like ".mp3" or ".txt"), and if they are not numeric, 

112 it returns True to indicate that the URL points to a file. 

113 

114 Args: 

115 url (str): URL 

116 

117 Returns: 

118 bool: True if the URL points to a File otherwise False 

119 """ 

120 url_parse: ParseResult = urlparse(url) 

121 path: str = url_parse.path 

122 query: str = url_parse.query 

123 if query: 

124 return False 

125 last_fragment = path.split("/")[-1] 

126 return ( 

127 True 

128 if "." in last_fragment 

129 and not last_fragment.split(".")[-1].isnumeric() 

130 else False 

131 ) 

132 

133 @staticmethod 

134 def query(url: str) -> str: 

135 """Extracts the content of the URL with 3 retries with a timeout of 5s 

136 

137 Args: 

138 url (str): URL 

139 

140 Returns: 

141 str: the content of the URL 

142 """ 

143 while True: 

144 response = None 

145 try: 

146 with closing( 

147 requests_retry_session().get( 

148 url, stream=True, verify=False, timeout=5 

149 ) 

150 ) as response: 

151 response.encoding = "utf-8" 

152 content = response.text 

153 return content 

154 except: # noqa: E722 

155 logger.warning(f"Error when trying to query {url}, try again") 

156 time.sleep(1) 

157 finally: 

158 if response is not None: 

159 response.close() 

160 

161 def _get_subdirs_file(self, soup) -> List[Dict[str, str]]: 

162 """Parses the HTML content of a web page, and extracts 

163 links to subdirectories and files from the page. 

164 

165 The method first finds all "table" elements in the parsed HTML content 

166 and stores them in a list. It then creates an empty list called "links" 

167 to store the links that it finds. 

168 

169 Next, the method looks for "a" elements within the last table in the 

170 list of tables. It only considers "a" elements that have an "href" 

171 attribute and do not have a "title" attribute. For each "a" element 

172 that meets these criteria, the method creates a dictionary with two 

173 key-value pairs: "url" and "name". The "url" value is set to the value 

174 of the "href" attribute, which is the URL of the link. The "name" value 

175 is set to the text content of the "a" element, which is usually the 

176 name of the subdirectory or file. 

177 

178 Args: 

179 soup (_type_): Soup 

180 

181 Returns: 

182 List[Dict[str, str]]: links 

183 """ 

184 tables: List[element.Tag] = soup.findAll("table") 

185 links = list() 

186 for a in tables[-1].findAll("a", href=True, attrs={"title": None}): 

187 links.append({"url": a["href"], "name": a.text}) 

188 return links 

189 

190 def _get_content(self, host: str, query: str) -> List[Dict[str, str]]: 

191 """Get the content of an URL based on the host and the query 

192 

193 Args: 

194 host (str): host 

195 query (str): query 

196 

197 Raises: 

198 NoFileExistInFolder: When there is no file in the folder 

199 

200 Returns: 

201 List[Dict[str, str]]: links (url/name) 

202 """ 

203 url = host + "/" + query 

204 if Crawler.is_file(url): 

205 raise ValueError(f"URL {url} is a file") 

206 

207 content: str = Crawler.query(url) 

208 if "No files exist in this folder" in content: 

209 raise NoFileExistInFolder(url) 

210 soup = BeautifulSoup(content, features="html.parser") 

211 links = self._get_subdirs_file(soup) 

212 return links 

213 

214 def parse(self) -> List[Dict[str, str]]: 

215 """Parse the URL 

216 

217 Returns: 

218 List[Dict[str, str]]: links (url/name) 

219 """ 

220 return self._get_content(self.host, self.fragment) 

221 

222 @property 

223 def url(self) -> str: 

224 """Returns the URL to parse 

225 

226 Returns: 

227 str: URL 

228 """ 

229 return self.__url 

230 

231 @property 

232 def host(self) -> str: 

233 """Returns the host of the query 

234 

235 Returns: 

236 str: host 

237 """ 

238 return self._host 

239 

240 @property 

241 def fragment(self) -> str: 

242 """Returns the fagment of the URL 

243 

244 Returns: 

245 str: the fagment of the URL 

246 """ 

247 return self._fragment 

248 

249 

250class PDSCatalogDescription(Observable): 

251 """Class that handles the PDS catalogs, based on the PDS3 objects. 

252 

253 This class can : 

254 

255 * load the URLs of all PDS catalogs for a given collection from the ODE web site. 

256 * get ODE catalogs objects from local storage 

257 

258 Note : The download of the PDS catalogs in the local storage is done 

259 by the PDSCatalogsDescription object, which performs a massive download 

260 in the local storage 

261 

262 .. uml:: 

263 

264 class PDSCatalogDescription { 

265 - Any report 

266 + str url 

267 + VolumeModel vol_desc_cat 

268 + str volume_desc_url 

269 + PdsRecordsWs pds_records 

270 + PdsRegistryModel pds_collection 

271 + database Database 

272 + PdsRecordsModel record 

273 + List[str] catalogs_urls 

274 - build_url_ode_collection() 

275 - find_volume_desc_url() str 

276 - parse_volume_desc_cat() VolumeModel 

277 - load_volume_description() 

278 - find_catalogs_urls() -> List[Dict[str, str]] 

279 - is_catalog_exists(catalog_name: Any) -> bool 

280 - get_url_for_multiple_catalogs(catalogs: List[str], catalogs_from_desc_cat: Dict[str, str]) List[str] 

281 - get_url_for_simple_catalog(catalog_name: str, catalogs_from_desc_cat: Dict[str, str]) List[str] 

282 - get_urls_from_catalog_type(catalog_name: Union[str, List[str]], catalogs_from_desc_cat: Dict[str, str]) List[str] 

283 - parse_catalog(file_storage: PdsStorage, catalog_name: str, cat_type: str, result: Dict[str, Union[str, List[str]]]) 

284 + load_catalogs_urls() List[str] 

285 + get_ode_catalogs(pds_collection: PdsRegistryModel) Dict[str, Any] 

286 + __repr__(self) str 

287 } 

288 """ 

289 

290 DATASET_EXPLORER = Template( 

291 "https://ode.rsl.wustl.edu/$ODEMetaDB/DataSetExplorer.aspx?target=$ODEMetaDB&instrumenthost=$ihid&instrumentid=$iid&datasetid=$Data_Set_Id" 

292 ) 

293 

294 def __init__(self, database: Database, *args, **kwargs): 

295 """Initialize the object with a database to store the information. 

296 

297 Args: 

298 database (Database): database 

299 """ 

300 super().__init__() 

301 if kwargs.get("report"): 

302 self.__report = kwargs.get("report") 

303 self.subscribe(self.__report) 

304 self.__database: Database = database 

305 self.__pds_records = PdsRecordsWs(self.__database) 

306 self._initialize_values() 

307 

308 def _initialize_values(self): 

309 """Initialize the values""" 

310 self.__pds_collection: PdsRegistryModel 

311 self.__record: PdsRecordModel 

312 self.__url: str 

313 self.__volume_desc_url: str 

314 self.__vol_desc_cat: VolumeModel 

315 self.__catalogs_urls: List[str] = list() 

316 

317 @property 

318 def url(self) -> str: 

319 """ODE URL that hosts the PDS catalogs 

320 

321 Returns: 

322 str: URL 

323 """ 

324 return self.__url 

325 

326 @property 

327 def vol_desc_cat(self) -> VolumeModel: 

328 """Returns the volume description catalog. 

329 

330 Returns: 

331 VolumeModel: the volume description catalog 

332 """ 

333 return self.__vol_desc_cat 

334 

335 @property 

336 def volume_desc_url(self) -> str: 

337 """The volume description URL 

338 

339 Returns: 

340 str: the volume description URL 

341 """ 

342 return self.__volume_desc_url 

343 

344 @property 

345 def pds_records(self) -> PdsRecordsWs: 

346 """Returns the PDS records object to access to the data from the local cache. 

347 

348 Returns: 

349 PdsRecordsWs: PDS records 

350 """ 

351 return self.__pds_records 

352 

353 @property 

354 def pds_collection(self) -> PdsRegistryModel: 

355 """PDS collection that contains the PDS catalogs. 

356 

357 Returns: 

358 PdsRegistryModel: _description_ 

359 """ 

360 return self.__pds_collection 

361 

362 @property 

363 def database(self) -> Database: 

364 return self.__database 

365 

366 @property 

367 def record(self) -> PdsRecordModel: 

368 return self.__record 

369 

370 @property 

371 def catalogs_urls(self) -> List[str]: 

372 return self.__catalogs_urls 

373 

374 def _build_url_ode_collection(self, volume_id: Optional[str] = None): 

375 """Computes the ODE URL. 

376 

377 This ODE URL is used to parse the web page to get the PDS objects. 

378 Sometimes the volume_id value is renamed. In this case, the web page 

379 must be parsed by getting the volume_id label to find the correct volume_id 

380 

381 Args: 

382 with_volume_id (str, optional): volume_id to set. Defaults to None. 

383 """ 

384 url_build: str = PDSCatalogDescription.DATASET_EXPLORER.substitute( 

385 ODEMetaDB=self.pds_collection.ODEMetaDB.lower(), 

386 ihid=self.record.ihid, 

387 iid=self.record.iid, 

388 Data_Set_Id=self.record.Data_Set_Id, 

389 ) 

390 if volume_id: 

391 self.__url = url_build + f"&volumeid={volume_id}" 

392 else: 

393 self.__url = url_build 

394 

395 def _find_volume_desc_url(self) -> str: 

396 """Find the URL volume description by parsing the ODE URL. 

397 

398 The volume description contains all the references to the interesting catalogs to parse. 

399 

400 Raises: 

401 NoFileExistInFolder: voldesc.cat file not found in PDS catalog 

402 

403 Returns: 

404 str: the Volume description URL 

405 """ 

406 crawler = Crawler(self.url) 

407 links = crawler.parse() 

408 vol_desc_url = None 

409 for link in links: 

410 if link["name"] == "voldesc.cat": 

411 vol_desc_url = link["url"] 

412 break 

413 if vol_desc_url is None: 

414 raise NoFileExistInFolder( 

415 f"voldesc.cat file not found in {self.pds_collection}" 

416 ) 

417 logger.info(f"voldesc.cat found in {vol_desc_url}") 

418 return vol_desc_url 

419 

420 def _find_volume_id(self) -> str: 

421 """Find volume_id in web page 

422 

423 Raises: 

424 NoFileExistInFolder: Volume_id not found 

425 

426 Returns: 

427 str: volume_id 

428 """ 

429 self._build_url_ode_collection() 

430 crawler = Crawler(self.url) 

431 links = crawler.parse() 

432 volume_id = None 

433 for link in links: 

434 if link["name"] == self.record.PDSVolume_Id: 

435 url: str = link["url"] 

436 parsed_url = urlparse(url) 

437 volume_id = parse_qs(parsed_url.query)["volumeid"][0] 

438 break 

439 if volume_id is None: 

440 raise NoFileExistInFolder(f"volumeid not found in {self.url}") 

441 logger.info(f"volume_id found : {volume_id}") 

442 return volume_id 

443 

444 def _parse_volume_desc_cat(self) -> VolumeModel: 

445 """Set the volume description file by parsing the ODE URL. 

446 

447 Raises: 

448 PdsCatalogDescriptionError: Error when getting or parsing the volume description file 

449 

450 Returns: 

451 VolumeModel: the Volume description object 

452 """ 

453 with closing( 

454 requests_retry_session().get( 

455 self.volume_desc_url, stream=True, verify=False, timeout=5 

456 ) 

457 ) as request: 

458 if request.ok: 

459 content = request.text 

460 vol_desc_cat = PdsParserFactory.parse( 

461 uri=content, 

462 type_file=PdsParserFactory.FileGrammary.VOL_DESC, 

463 ) 

464 return vol_desc_cat 

465 else: 

466 raise PdsCatalogDescriptionError( 

467 f"Error when getting or parsing {self.volume_desc_url}" 

468 ) 

469 

470 def _load_volume_description(self): 

471 """Load the volume description.""" 

472 try: 

473 self.__volume_desc_url: str = self._find_volume_desc_url() 

474 except NoFileExistInFolder: 

475 volume_id = self._find_volume_id() 

476 self._build_url_ode_collection(volume_id=volume_id) 

477 self.__volume_desc_url: str = self._find_volume_desc_url() 

478 self.__vol_desc_cat: VolumeModel = self._parse_volume_desc_cat() 

479 

480 def _find_catalogs_urls(self) -> List[Dict[str, str]]: 

481 """Retrieve the URL of the PDS object by parsing the ODE URL. 

482 

483 Returns: 

484 List[Dict[str, str]]: Catalogs name and its URL 

485 """ 

486 url = self.url + "&pathtovol=catalog/" 

487 result: List[Dict[str, str]] 

488 try: 

489 crawler = Crawler(url) 

490 result = crawler.parse() 

491 except NoFileExistInFolder as err: 

492 self.notify_observers(MessageModel(url, err)) 

493 logger.error(f"[NoFileExistInFolder]: {url}") 

494 result = list() 

495 return result 

496 

497 def _is_catalog_exists(self, catalog_name: Any) -> bool: 

498 """Checks if the catalog_name is set. 

499 

500 Args: 

501 catalog_name (Any): object to test 

502 

503 Returns: 

504 bool: True if the cataloh_name is not None otherwise False 

505 """ 

506 return catalog_name is not None 

507 

508 def _get_url_for_multiple_catalogs( 

509 self, catalogs: List[str], catalogs_from_desc_cat: Dict[str, str] 

510 ) -> List[str]: 

511 """Get the URLs for all the PDS objects 

512 

513 Args: 

514 catalogs (List[str]): PDS object name 

515 catalogs_from_desc_cat (Dict[str, str]): _description_ 

516 

517 Returns: 

518 List[str]: Returns the URLs of the PDS objects 

519 """ 

520 url_list: List[str] = list() 

521 for catalog_name in catalogs: 

522 catalog_name_lower: str = catalog_name.lower() 

523 if catalog_name_lower in catalogs_from_desc_cat: 

524 url: str = catalogs_from_desc_cat[catalog_name_lower] 

525 url_list.append(url) 

526 else: 

527 logger.error(f"Cannot find {catalog_name_lower} catalog") 

528 return url_list 

529 

530 def _get_url_for_simple_catalog( 

531 self, catalog_name: str, catalogs_from_desc_cat: Dict[str, str] 

532 ) -> List[str]: 

533 """Returns the URL of the catalog name that is contained in catalogs_from_desc_cat 

534 

535 Args: 

536 catalog_name (str): catalog name 

537 catalogs_from_desc_cat (Dict[str, str]): list of catalogs 

538 

539 Returns: 

540 List[str]: the URL of the catalog name 

541 """ 

542 url_list: List[str] = list() 

543 catalog_name_lower: str = catalog_name.lower() 

544 if catalog_name_lower in catalogs_from_desc_cat: 

545 url: str = catalogs_from_desc_cat[catalog_name_lower] 

546 url_list.append(url) 

547 else: 

548 logger.error(f"Cannot find {catalog_name_lower} catalog") 

549 return url_list 

550 

551 def _get_urls_from_catalog_type( 

552 self, 

553 catalog_type: Union[str, List[str]], 

554 catalogs_from_desc_cat: Dict[str, str], 

555 ) -> List[str]: 

556 """Returns the URLs of the catalog type that is contained in catalogs_from_desc_cat 

557 

558 A catalog type can be associated to one or several catalogs. 

559 The list of catalogs (URL included) is provided by catalogs_from_desc_cat 

560 

561 Args: 

562 catalog_type (Union[str, List[str]]): catalog type 

563 catalogs_from_desc_cat (Dict[str, str]): list of catalogs 

564 

565 Returns: 

566 List[str]: _description_ 

567 """ 

568 url_list: List[str] = list() 

569 if self._is_catalog_exists(catalog_type): 

570 if isinstance(catalog_type, list): 

571 url_list.extend( 

572 self._get_url_for_multiple_catalogs( 

573 catalog_type, catalogs_from_desc_cat 

574 ) 

575 ) 

576 else: 

577 url_list.extend( 

578 self._get_url_for_simple_catalog( 

579 catalog_type, catalogs_from_desc_cat 

580 ) 

581 ) 

582 return url_list 

583 

584 def _get_urls_from_volume_catalog(self) -> List[str]: 

585 """Get catalog URLs associated of the catalogs in the Volume catalog. 

586 

587 Returns: 

588 List[str]: List of URLs 

589 """ 

590 self._build_url_ode_collection(volume_id=self.record.PDSVolume_Id) 

591 

592 # Extract the Volume description catalog 

593 # that contains an index of all catalogs 

594 self._load_volume_description() 

595 

596 # Find all the catalogs in the catalog directory of ODE 

597 catalog_urls: List[Dict[str, str]] = self._find_catalogs_urls() 

598 mapping_file_url: Dict[str, str] = { 

599 catalog["name"]: catalog["url"] for catalog in catalog_urls 

600 } 

601 

602 # Make the mapping beween catalog_type from Volume description catalog 

603 # and the catalogs found in catalog directory 

604 # In the volume description catalog, it is possible to have several 

605 # catalogs related to one catalog type. 

606 url_list: List[str] = list() 

607 catalog: CatalogModel = self.vol_desc_cat.CATALOG 

608 catalog_dict: Dict[str, str] = catalog.__dict__ 

609 for key in catalog_dict.keys(): 

610 url_list.extend( 

611 self._get_urls_from_catalog_type( 

612 catalog_dict[key], mapping_file_url 

613 ) 

614 ) 

615 return url_list 

616 

617 def _parse_catalog( 

618 self, 

619 file_storage: PdsCollectionStorage, 

620 catalog_name: str, 

621 cat_type: str, 

622 result: Dict[str, Union[str, List[str]]], 

623 timeout: int = 30, 

624 ): 

625 """Parses the PDS object (`catalog_name`), represented by a catalog type and stored 

626 on the file storage with a specific implementation associated to the catalog_type. 

627 

628 The catalog is parsed by using the `get_catalog`method from the PdsStorage 

629 object. The result is then stored in result variable where the key is the `catalog_type`. 

630 At each `catalog_type` is associated one or several catalogs. 

631 

632 If the parsing is not successful, the eror message is notified by the use of MessageModel 

633 object. 

634 

635 Args: 

636 file_storage (PdsCollectionStorage): storage where the PDS objects have been downloaded 

637 catalog_name (str): catalog name that must be parsed 

638 cat_type (str): Type of catalog where an implementation is associated 

639 result (Dict[str, Union[str, List[str]]]): the catalogs in the Volume description 

640 timeout (int, optional): parser timeout in seconds. Defaults to 30 

641 """ 

642 try: 

643 cat_obj = file_storage.get_catalog( 

644 file=catalog_name, 

645 catalogue_type=PdsParserFactory.FileGrammary.get_enum_from(cat_type), # type: ignore 

646 timeout=timeout, 

647 ) 

648 if cat_type in result: 

649 cast(List, result[cat_type]).append(cat_obj) 

650 else: 

651 result[cat_type] = cat_obj 

652 except KeyError as err: 

653 message = ( 

654 f"Unable to find {catalog_name} in {file_storage.directory}" 

655 ) 

656 logger.error(message) 

657 logger.exception(err) 

658 self.notify_observers( 

659 MessageModel(catalog_name, Exception(message)) 

660 ) 

661 except UnicodeDecodeError as err: 

662 message = ( 

663 f"Unable to find {catalog_name} in {file_storage.directory}" 

664 ) 

665 logger.error(message) 

666 logger.exception(err) 

667 self.notify_observers( 

668 MessageModel(catalog_name, Exception(message)) 

669 ) 

670 except Exception as err: 

671 message = ( 

672 f"Unable to parse {catalog_name} in {file_storage.directory}" 

673 ) 

674 logger.error(message) 

675 logger.exception(err) 

676 self.notify_observers( 

677 MessageModel(catalog_name, Exception(message)) 

678 ) 

679 

680 def load_catalogs_urls( 

681 self, pds_collection: PdsRegistryModel, progress_bar: bool = True 

682 ): 

683 """Loads the catalogs URLs from cache for a given 

684 `pds_collection` collection. 

685 

686 Args: 

687 pds_collection (PdsRegistryModel): PDS collection 

688 progress_bar (bool, True): Set progress_bar. Defaults to True. 

689 """ 

690 self._initialize_values() 

691 self.__pds_collection = pds_collection 

692 records_iter: Iterator[ 

693 PdsRecordsModel 

694 ] = self.pds_records.parse_pds_collection_from_cache( 

695 pds_collection, 

696 progress_bar=progress_bar, 

697 ) 

698 try: 

699 records: PdsRecordsModel = next(records_iter) 

700 self.__record = records.pds_records_model[0] 

701 try: 

702 self.__catalogs_urls: List[ 

703 str 

704 ] = self._get_urls_from_volume_catalog() 

705 except NoFileExistInFolder as err: 

706 logger.exception(f"[NoFileExistInFolder]: {err}") 

707 self.notify_observers(MessageModel(str(pds_collection), err)) 

708 except UnexpectedCharacters as err: 

709 logger.exception(f"[ParserError]: {err}") 

710 self.notify_observers(MessageModel(str(pds_collection), err)) 

711 except ConnectionError as err: 

712 logger.exception(f"[ConnectionError]: {err}") 

713 self.notify_observers(MessageModel(str(pds_collection), err)) 

714 except StopIteration: 

715 logger.error( 

716 f"No record for {pds_collection}. Please download them" 

717 ) 

718 self.notify_observers( 

719 MessageModel(str(pds_collection), Exception("No record")) 

720 ) 

721 except JSONDecodeError: 

722 logger.error( 

723 f"[CorruptedFile] Please remove the file corresponding to this collection {pds_collection}" 

724 ) 

725 self.notify_observers( 

726 MessageModel( 

727 str(pds_collection), 

728 Exception("[CorruptedFile] Please remove the file"), 

729 ) 

730 ) 

731 

732 def get_ode_catalogs( 

733 self, pds_collection: PdsRegistryModel, timeout: int = 30 

734 ) -> Dict[str, Any]: 

735 """Returns the PDS objects for a given space mission collection. 

736 

737 The function retrieves the PdsStorage object associated 

738 with the PdsRegistryModel using get_pds_storage_for(), and then 

739 retrieves the description of the volume containing the PDS objects with 

740 get_volume_description(). It then lists the different types of catalogs 

741 in the directory using list_catalogs(), and for each catalog, it uses 

742 _parse_catalog() to retrieve information on each catalog. 

743 

744 Args: 

745 pds_collection (PdsRegistryModel): the space mission collection 

746 timeout (int): parser timeout in seconds. Default to 30 

747 

748 Raises: 

749 TypeError: Illegal datatype for catalog 

750 

751 Returns: 

752 Dict[str, Any]: list of PDS Object name and its object 

753 """ 

754 result = dict() 

755 result["collection"] = pds_collection 

756 try: 

757 file_storage: PdsCollectionStorage = ( 

758 self.database.pds_storage.get_pds_storage_for(pds_collection) 

759 ) 

760 result[ 

761 PdsParserFactory.FileGrammary.VOL_DESC.name 

762 ] = file_storage.get_volume_description(timeout) 

763 catalogs = file_storage.list_catalogs() 

764 for cat_type in catalogs.keys(): 

765 catalog_value: Union[str, List[str]] = catalogs[cat_type] 

766 if catalog_value is None: 

767 continue 

768 elif isinstance(catalog_value, str): 

769 catalog_name: str = catalog_value 

770 self._parse_catalog( 

771 file_storage, catalog_name, cat_type, result, timeout 

772 ) 

773 elif isinstance(catalog_value, list): 

774 result[cat_type] = list() 

775 for catalog_name in catalog_value: 

776 self._parse_catalog( 

777 file_storage, 

778 catalog_name, 

779 cat_type, 

780 result, 

781 timeout, 

782 ) 

783 else: 

784 raise TypeError( 

785 f"Illegal datatype for catalog : {type(catalog_value)}" 

786 ) 

787 

788 return result 

789 except FileNotFoundError as err: 

790 logger.exception(err) 

791 return result 

792 

793 def __repr__(self) -> str: 

794 return f"PDSCatalogDescription({self.pds_records})" 

795 

796 

797class PDSCatalogsDescription(Observable): 

798 """Provides the means to download the PDS catalogs (PDS objects). 

799 

800 .. uml:: 

801 

802 class PDSCatalogsDescription { 

803 - Any report 

804 + Database database 

805 + PDSCatalogDescription pds_object_cats 

806 - build_all_urls(pds_collection: PdsRegistryModel) List[str] 

807 + download(pds_collections: List[PdsRegistryModel]) 

808 + get_ode_catalogs(pds_collections: List[PdsRegistryModel]) -> Iterator[Dict[str, Any]] 

809 + __repr__(self) str 

810 } 

811 """ 

812 

813 def __init__(self, database: Database, *args, **kwargs): 

814 """Initialize the means to download by using a database to store the results. 

815 

816 Args: 

817 database (Database): database 

818 """ 

819 super().__init__() 

820 if kwargs.get("report"): 

821 self.__report = kwargs.get("report") 

822 self.subscribe(self.__report) 

823 self.__pds_object_cats = PDSCatalogDescription( 

824 database, *args, **kwargs 

825 ) 

826 self.__database = database 

827 

828 @property 

829 def pds_object_cats(self) -> PDSCatalogDescription: 

830 return self.__pds_object_cats 

831 

832 @property 

833 def database(self) -> Database: 

834 return self.__database 

835 

836 def _build_all_urls( 

837 self, pds_collection: PdsRegistryModel, progress_bar: bool = True 

838 ) -> List[str]: 

839 """Builds all the PDS objects URLs for collections of space missions. 

840 

841 These URLs are used to retrieve all PDS objects. 

842 

843 Args: 

844 pds_collection (PdsRegistryModel): the collections of space missions 

845 progress_bar (bool, True): Set progress_bar. Defaults to True. 

846 

847 Returns: 

848 List[str]: List of URLs 

849 """ 

850 logger.info(f"Fetching Catalogs URLs from {pds_collection}") 

851 urls_list: List[str] = list() 

852 self.pds_object_cats.load_catalogs_urls(pds_collection, progress_bar) 

853 urls: List[str] = self.pds_object_cats.catalogs_urls 

854 if len(urls) != 0: 

855 urls_list.extend(urls) 

856 urls_list.append(self.pds_object_cats.volume_desc_url) 

857 return urls_list 

858 

859 def download( 

860 self, 

861 pds_collections: List[PdsRegistryModel], 

862 nb_workers: int = 3, 

863 time_sleep: int = 1, 

864 progress_bar: bool = True, 

865 ): 

866 """Downloads the PDS objects for the collections of space missions. 

867 

868 This method is responsible for downloading the PDS objects for the given 

869 collections of space missions. It does so by building a list of URLs of 

870 PDS objects, creating a PdsStorage instance for the given 

871 collection, and using the parallel_requests method to download each PDS object. 

872 The parallel_requests function is likely using threading or multiprocessing to 

873 download the objects in parallel, which is a good optimization to speed up the 

874 download process. 

875 

876 Args: 

877 pds_collections (List[PdsRegistryModel]): the collections of space missions 

878 nb_workers (int, optional): Number of workers in parallel. Defaults to 3. 

879 time_sleep (int, optional): Time to way between two download series. Defaults to 1. 

880 progress_bar (bool, True): Set progress_bar. Defaults to True. 

881 """ 

882 for pds_collection in pds_collections: 

883 urls_list: List[str] = self._build_all_urls( 

884 pds_collection, progress_bar 

885 ) 

886 try: 

887 file_storage: PdsCollectionStorage = ( 

888 self.database.pds_storage.get_pds_storage_for( 

889 pds_collection 

890 ) 

891 ) 

892 file_storage.download( 

893 urls=urls_list, 

894 nb_workers=nb_workers, 

895 timeout=5, 

896 time_sleep=time_sleep, 

897 progress_bar=progress_bar, 

898 ) 

899 except UnexpectedCharacters as err: 

900 logger.exception(f"[ParserError]: {err}") 

901 except ConnectionError as err: 

902 logger.exception(f"[ConnectionError]: {err}") 

903 

904 def get_ode_catalogs( 

905 self, pds_collections: List[PdsRegistryModel], timeout: int = 30 

906 ) -> Iterator[Dict[str, Any]]: 

907 """Get all the PDS objects for the `pds_collections`. 

908 

909 This class PDSCatalogsDescription provides the means to download the 

910 PDS catalogs for the PDS collections. It has three main 

911 methods: 

912 1. _build_all_urls(): Builds all the PDS object URLs for a given collection 

913 of space missions. This method is used to retrieve all the PDS objects of a 

914 collection. 

915 2. download(): Downloads the PDS objects for the PDS collections. 

916 It takes a list of PdsRegistryModel as input and downloads the PDS objects for 

917 each collection. 

918 3. get_ode_catalogs(): Gets all the PDS objects for a list of collections of 

919 space missions. It takes a list of PdsRegistryModel as input and returns an 

920 iterator that yields a dictionary containing the PDS object name and its object. 

921 The method internally calls the get_ode_catalogs() method of the PDSCatalogDescription 

922 class, which retrieves the PDS objects for a given collection. 

923 

924 Args: 

925 pds_collections (List[PdsRegistryModel]): the collections of the space mission. 

926 timeout (int, optional): parser timeout in seconds. Defaults to 30 

927 

928 Yields: 

929 Iterator[Dict[str, Any]]: PDS object name and its object 

930 """ 

931 for pds_collection in pds_collections: 

932 yield self.pds_object_cats.get_ode_catalogs( 

933 pds_collection, timeout 

934 )