Coverage for pds_crawler/extractor/pds_ode_website.py: 76%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2# pds-crawler - ETL to index PDS data to pdssp
3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires)
4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler>
5# SPDX-License-Identifier: LGPL-3.0-or-later
6"""
7Module Name:
8 pds_ode_website
10Description:
11 the pds_ode_website module parses the PDS3 Dataset explorer to get the different catalogs
12 to download them.
14Classes:
15 Crawler :
16 Crawles the content of the Dataset explorer web site.
17 PDSCatalogDescription :
18 Parses the content of the PDS3 catalogs for a given PDS collection.
19 PDSCatalogsDescription :
20 Downloads the PDS3 objects (catalogs) on the local storage and parses the
21 PDS3 objects from the local storage
23Author:
24 Jean-Christophe Malapert
25"""
26import logging
27import time
28from contextlib import closing
29from json.decoder import JSONDecodeError
30from string import Template
31from typing import Any
32from typing import cast
33from typing import Dict
34from typing import Iterator
35from typing import List
36from typing import Optional
37from typing import Union
38from urllib.parse import parse_qs
39from urllib.parse import ParseResult
40from urllib.parse import urlparse
42from bs4 import BeautifulSoup
43from bs4 import element
44from lark.exceptions import UnexpectedCharacters
45from requests.exceptions import ConnectionError
47from ..exception import NoFileExistInFolder
48from ..exception import PdsCatalogDescriptionError
49from ..load import Database
50from ..load import PdsCollectionStorage
51from ..load import PdsParserFactory
52from ..models import CatalogModel
53from ..models import PdsRecordModel
54from ..models import PdsRecordsModel
55from ..models import PdsRegistryModel
56from ..models import VolumeModel
57from ..report import MessageModel
58from ..utils import Observable
59from ..utils import parallel_requests
60from ..utils import requests_retry_session
61from .pds_ode_ws import PdsRecordsWs
63logger = logging.getLogger(__name__)
66class Crawler:
67 """Crawles the content of the Dataset explorer web site.
69 The main purpose of the class is to retrieve the links and subdirectories
70 from the given web page and return them as a list. The class also checks
71 if a given URL is a file or a directory and raises an exception if no files
72 exist in the folder.
74 .. uml::
76 class Crawler {
77 + str url
78 + str host
79 + str fragment
80 - get_subdirs_file(soup) List[Dict[str, str]]
81 - get_content(host: str, query: str) Optional[List[Dict[str, str]]]
82 + static is_file(url: str) bool
83 + static query(url: str) str
84 + parse() -> Optional[List[Dict[str, str]]]
85 }
86 """
88 def __init__(self, url: str):
89 """Initializes the Crawler with an URL
91 Args:
92 url (str): URL
93 """
94 super().__init__()
95 self.__url: str = url
96 url_split: List[str] = url.split("/")
97 self._host: str = "/".join(url_split[:-1])
98 self._fragment: str = url_split[-1]
100 @staticmethod
101 def is_file(url: str) -> bool:
102 """Tests whether the URL points to a file or not.
104 The method extracts the last fragment of the path,
105 which is usually the name of the file or directory
106 being pointed to. If the last fragment contains a
107 period (".") character, it is considered to be a
108 file and not a directory. The method then checks
109 if the characters after the last period character
110 are numeric (indicating a file with an extension
111 like ".mp3" or ".txt"), and if they are not numeric,
112 it returns True to indicate that the URL points to a file.
114 Args:
115 url (str): URL
117 Returns:
118 bool: True if the URL points to a File otherwise False
119 """
120 url_parse: ParseResult = urlparse(url)
121 path: str = url_parse.path
122 query: str = url_parse.query
123 if query:
124 return False
125 last_fragment = path.split("/")[-1]
126 return (
127 True
128 if "." in last_fragment
129 and not last_fragment.split(".")[-1].isnumeric()
130 else False
131 )
133 @staticmethod
134 def query(url: str) -> str:
135 """Extracts the content of the URL with 3 retries with a timeout of 5s
137 Args:
138 url (str): URL
140 Returns:
141 str: the content of the URL
142 """
143 while True:
144 response = None
145 try:
146 with closing(
147 requests_retry_session().get(
148 url, stream=True, verify=False, timeout=5
149 )
150 ) as response:
151 response.encoding = "utf-8"
152 content = response.text
153 return content
154 except: # noqa: E722
155 logger.warning(f"Error when trying to query {url}, try again")
156 time.sleep(1)
157 finally:
158 if response is not None:
159 response.close()
161 def _get_subdirs_file(self, soup) -> List[Dict[str, str]]:
162 """Parses the HTML content of a web page, and extracts
163 links to subdirectories and files from the page.
165 The method first finds all "table" elements in the parsed HTML content
166 and stores them in a list. It then creates an empty list called "links"
167 to store the links that it finds.
169 Next, the method looks for "a" elements within the last table in the
170 list of tables. It only considers "a" elements that have an "href"
171 attribute and do not have a "title" attribute. For each "a" element
172 that meets these criteria, the method creates a dictionary with two
173 key-value pairs: "url" and "name". The "url" value is set to the value
174 of the "href" attribute, which is the URL of the link. The "name" value
175 is set to the text content of the "a" element, which is usually the
176 name of the subdirectory or file.
178 Args:
179 soup (_type_): Soup
181 Returns:
182 List[Dict[str, str]]: links
183 """
184 tables: List[element.Tag] = soup.findAll("table")
185 links = list()
186 for a in tables[-1].findAll("a", href=True, attrs={"title": None}):
187 links.append({"url": a["href"], "name": a.text})
188 return links
190 def _get_content(self, host: str, query: str) -> List[Dict[str, str]]:
191 """Get the content of an URL based on the host and the query
193 Args:
194 host (str): host
195 query (str): query
197 Raises:
198 NoFileExistInFolder: When there is no file in the folder
200 Returns:
201 List[Dict[str, str]]: links (url/name)
202 """
203 url = host + "/" + query
204 if Crawler.is_file(url):
205 raise ValueError(f"URL {url} is a file")
207 content: str = Crawler.query(url)
208 if "No files exist in this folder" in content:
209 raise NoFileExistInFolder(url)
210 soup = BeautifulSoup(content, features="html.parser")
211 links = self._get_subdirs_file(soup)
212 return links
214 def parse(self) -> List[Dict[str, str]]:
215 """Parse the URL
217 Returns:
218 List[Dict[str, str]]: links (url/name)
219 """
220 return self._get_content(self.host, self.fragment)
222 @property
223 def url(self) -> str:
224 """Returns the URL to parse
226 Returns:
227 str: URL
228 """
229 return self.__url
231 @property
232 def host(self) -> str:
233 """Returns the host of the query
235 Returns:
236 str: host
237 """
238 return self._host
240 @property
241 def fragment(self) -> str:
242 """Returns the fagment of the URL
244 Returns:
245 str: the fagment of the URL
246 """
247 return self._fragment
250class PDSCatalogDescription(Observable):
251 """Class that handles the PDS catalogs, based on the PDS3 objects.
253 This class can :
255 * load the URLs of all PDS catalogs for a given collection from the ODE web site.
256 * get ODE catalogs objects from local storage
258 Note : The download of the PDS catalogs in the local storage is done
259 by the PDSCatalogsDescription object, which performs a massive download
260 in the local storage
262 .. uml::
264 class PDSCatalogDescription {
265 - Any report
266 + str url
267 + VolumeModel vol_desc_cat
268 + str volume_desc_url
269 + PdsRecordsWs pds_records
270 + PdsRegistryModel pds_collection
271 + database Database
272 + PdsRecordsModel record
273 + List[str] catalogs_urls
274 - build_url_ode_collection()
275 - find_volume_desc_url() str
276 - parse_volume_desc_cat() VolumeModel
277 - load_volume_description()
278 - find_catalogs_urls() -> List[Dict[str, str]]
279 - is_catalog_exists(catalog_name: Any) -> bool
280 - get_url_for_multiple_catalogs(catalogs: List[str], catalogs_from_desc_cat: Dict[str, str]) List[str]
281 - get_url_for_simple_catalog(catalog_name: str, catalogs_from_desc_cat: Dict[str, str]) List[str]
282 - get_urls_from_catalog_type(catalog_name: Union[str, List[str]], catalogs_from_desc_cat: Dict[str, str]) List[str]
283 - parse_catalog(file_storage: PdsStorage, catalog_name: str, cat_type: str, result: Dict[str, Union[str, List[str]]])
284 + load_catalogs_urls() List[str]
285 + get_ode_catalogs(pds_collection: PdsRegistryModel) Dict[str, Any]
286 + __repr__(self) str
287 }
288 """
290 DATASET_EXPLORER = Template(
291 "https://ode.rsl.wustl.edu/$ODEMetaDB/DataSetExplorer.aspx?target=$ODEMetaDB&instrumenthost=$ihid&instrumentid=$iid&datasetid=$Data_Set_Id"
292 )
294 def __init__(self, database: Database, *args, **kwargs):
295 """Initialize the object with a database to store the information.
297 Args:
298 database (Database): database
299 """
300 super().__init__()
301 if kwargs.get("report"):
302 self.__report = kwargs.get("report")
303 self.subscribe(self.__report)
304 self.__database: Database = database
305 self.__pds_records = PdsRecordsWs(self.__database)
306 self._initialize_values()
308 def _initialize_values(self):
309 """Initialize the values"""
310 self.__pds_collection: PdsRegistryModel
311 self.__record: PdsRecordModel
312 self.__url: str
313 self.__volume_desc_url: str
314 self.__vol_desc_cat: VolumeModel
315 self.__catalogs_urls: List[str] = list()
317 @property
318 def url(self) -> str:
319 """ODE URL that hosts the PDS catalogs
321 Returns:
322 str: URL
323 """
324 return self.__url
326 @property
327 def vol_desc_cat(self) -> VolumeModel:
328 """Returns the volume description catalog.
330 Returns:
331 VolumeModel: the volume description catalog
332 """
333 return self.__vol_desc_cat
335 @property
336 def volume_desc_url(self) -> str:
337 """The volume description URL
339 Returns:
340 str: the volume description URL
341 """
342 return self.__volume_desc_url
344 @property
345 def pds_records(self) -> PdsRecordsWs:
346 """Returns the PDS records object to access to the data from the local cache.
348 Returns:
349 PdsRecordsWs: PDS records
350 """
351 return self.__pds_records
353 @property
354 def pds_collection(self) -> PdsRegistryModel:
355 """PDS collection that contains the PDS catalogs.
357 Returns:
358 PdsRegistryModel: _description_
359 """
360 return self.__pds_collection
362 @property
363 def database(self) -> Database:
364 return self.__database
366 @property
367 def record(self) -> PdsRecordModel:
368 return self.__record
370 @property
371 def catalogs_urls(self) -> List[str]:
372 return self.__catalogs_urls
374 def _build_url_ode_collection(self, volume_id: Optional[str] = None):
375 """Computes the ODE URL.
377 This ODE URL is used to parse the web page to get the PDS objects.
378 Sometimes the volume_id value is renamed. In this case, the web page
379 must be parsed by getting the volume_id label to find the correct volume_id
381 Args:
382 with_volume_id (str, optional): volume_id to set. Defaults to None.
383 """
384 url_build: str = PDSCatalogDescription.DATASET_EXPLORER.substitute(
385 ODEMetaDB=self.pds_collection.ODEMetaDB.lower(),
386 ihid=self.record.ihid,
387 iid=self.record.iid,
388 Data_Set_Id=self.record.Data_Set_Id,
389 )
390 if volume_id:
391 self.__url = url_build + f"&volumeid={volume_id}"
392 else:
393 self.__url = url_build
395 def _find_volume_desc_url(self) -> str:
396 """Find the URL volume description by parsing the ODE URL.
398 The volume description contains all the references to the interesting catalogs to parse.
400 Raises:
401 NoFileExistInFolder: voldesc.cat file not found in PDS catalog
403 Returns:
404 str: the Volume description URL
405 """
406 crawler = Crawler(self.url)
407 links = crawler.parse()
408 vol_desc_url = None
409 for link in links:
410 if link["name"] == "voldesc.cat":
411 vol_desc_url = link["url"]
412 break
413 if vol_desc_url is None:
414 raise NoFileExistInFolder(
415 f"voldesc.cat file not found in {self.pds_collection}"
416 )
417 logger.info(f"voldesc.cat found in {vol_desc_url}")
418 return vol_desc_url
420 def _find_volume_id(self) -> str:
421 """Find volume_id in web page
423 Raises:
424 NoFileExistInFolder: Volume_id not found
426 Returns:
427 str: volume_id
428 """
429 self._build_url_ode_collection()
430 crawler = Crawler(self.url)
431 links = crawler.parse()
432 volume_id = None
433 for link in links:
434 if link["name"] == self.record.PDSVolume_Id:
435 url: str = link["url"]
436 parsed_url = urlparse(url)
437 volume_id = parse_qs(parsed_url.query)["volumeid"][0]
438 break
439 if volume_id is None:
440 raise NoFileExistInFolder(f"volumeid not found in {self.url}")
441 logger.info(f"volume_id found : {volume_id}")
442 return volume_id
444 def _parse_volume_desc_cat(self) -> VolumeModel:
445 """Set the volume description file by parsing the ODE URL.
447 Raises:
448 PdsCatalogDescriptionError: Error when getting or parsing the volume description file
450 Returns:
451 VolumeModel: the Volume description object
452 """
453 with closing(
454 requests_retry_session().get(
455 self.volume_desc_url, stream=True, verify=False, timeout=5
456 )
457 ) as request:
458 if request.ok:
459 content = request.text
460 vol_desc_cat = PdsParserFactory.parse(
461 uri=content,
462 type_file=PdsParserFactory.FileGrammary.VOL_DESC,
463 )
464 return vol_desc_cat
465 else:
466 raise PdsCatalogDescriptionError(
467 f"Error when getting or parsing {self.volume_desc_url}"
468 )
470 def _load_volume_description(self):
471 """Load the volume description."""
472 try:
473 self.__volume_desc_url: str = self._find_volume_desc_url()
474 except NoFileExistInFolder:
475 volume_id = self._find_volume_id()
476 self._build_url_ode_collection(volume_id=volume_id)
477 self.__volume_desc_url: str = self._find_volume_desc_url()
478 self.__vol_desc_cat: VolumeModel = self._parse_volume_desc_cat()
480 def _find_catalogs_urls(self) -> List[Dict[str, str]]:
481 """Retrieve the URL of the PDS object by parsing the ODE URL.
483 Returns:
484 List[Dict[str, str]]: Catalogs name and its URL
485 """
486 url = self.url + "&pathtovol=catalog/"
487 result: List[Dict[str, str]]
488 try:
489 crawler = Crawler(url)
490 result = crawler.parse()
491 except NoFileExistInFolder as err:
492 self.notify_observers(MessageModel(url, err))
493 logger.error(f"[NoFileExistInFolder]: {url}")
494 result = list()
495 return result
497 def _is_catalog_exists(self, catalog_name: Any) -> bool:
498 """Checks if the catalog_name is set.
500 Args:
501 catalog_name (Any): object to test
503 Returns:
504 bool: True if the cataloh_name is not None otherwise False
505 """
506 return catalog_name is not None
508 def _get_url_for_multiple_catalogs(
509 self, catalogs: List[str], catalogs_from_desc_cat: Dict[str, str]
510 ) -> List[str]:
511 """Get the URLs for all the PDS objects
513 Args:
514 catalogs (List[str]): PDS object name
515 catalogs_from_desc_cat (Dict[str, str]): _description_
517 Returns:
518 List[str]: Returns the URLs of the PDS objects
519 """
520 url_list: List[str] = list()
521 for catalog_name in catalogs:
522 catalog_name_lower: str = catalog_name.lower()
523 if catalog_name_lower in catalogs_from_desc_cat:
524 url: str = catalogs_from_desc_cat[catalog_name_lower]
525 url_list.append(url)
526 else:
527 logger.error(f"Cannot find {catalog_name_lower} catalog")
528 return url_list
530 def _get_url_for_simple_catalog(
531 self, catalog_name: str, catalogs_from_desc_cat: Dict[str, str]
532 ) -> List[str]:
533 """Returns the URL of the catalog name that is contained in catalogs_from_desc_cat
535 Args:
536 catalog_name (str): catalog name
537 catalogs_from_desc_cat (Dict[str, str]): list of catalogs
539 Returns:
540 List[str]: the URL of the catalog name
541 """
542 url_list: List[str] = list()
543 catalog_name_lower: str = catalog_name.lower()
544 if catalog_name_lower in catalogs_from_desc_cat:
545 url: str = catalogs_from_desc_cat[catalog_name_lower]
546 url_list.append(url)
547 else:
548 logger.error(f"Cannot find {catalog_name_lower} catalog")
549 return url_list
551 def _get_urls_from_catalog_type(
552 self,
553 catalog_type: Union[str, List[str]],
554 catalogs_from_desc_cat: Dict[str, str],
555 ) -> List[str]:
556 """Returns the URLs of the catalog type that is contained in catalogs_from_desc_cat
558 A catalog type can be associated to one or several catalogs.
559 The list of catalogs (URL included) is provided by catalogs_from_desc_cat
561 Args:
562 catalog_type (Union[str, List[str]]): catalog type
563 catalogs_from_desc_cat (Dict[str, str]): list of catalogs
565 Returns:
566 List[str]: _description_
567 """
568 url_list: List[str] = list()
569 if self._is_catalog_exists(catalog_type):
570 if isinstance(catalog_type, list):
571 url_list.extend(
572 self._get_url_for_multiple_catalogs(
573 catalog_type, catalogs_from_desc_cat
574 )
575 )
576 else:
577 url_list.extend(
578 self._get_url_for_simple_catalog(
579 catalog_type, catalogs_from_desc_cat
580 )
581 )
582 return url_list
584 def _get_urls_from_volume_catalog(self) -> List[str]:
585 """Get catalog URLs associated of the catalogs in the Volume catalog.
587 Returns:
588 List[str]: List of URLs
589 """
590 self._build_url_ode_collection(volume_id=self.record.PDSVolume_Id)
592 # Extract the Volume description catalog
593 # that contains an index of all catalogs
594 self._load_volume_description()
596 # Find all the catalogs in the catalog directory of ODE
597 catalog_urls: List[Dict[str, str]] = self._find_catalogs_urls()
598 mapping_file_url: Dict[str, str] = {
599 catalog["name"]: catalog["url"] for catalog in catalog_urls
600 }
602 # Make the mapping beween catalog_type from Volume description catalog
603 # and the catalogs found in catalog directory
604 # In the volume description catalog, it is possible to have several
605 # catalogs related to one catalog type.
606 url_list: List[str] = list()
607 catalog: CatalogModel = self.vol_desc_cat.CATALOG
608 catalog_dict: Dict[str, str] = catalog.__dict__
609 for key in catalog_dict.keys():
610 url_list.extend(
611 self._get_urls_from_catalog_type(
612 catalog_dict[key], mapping_file_url
613 )
614 )
615 return url_list
617 def _parse_catalog(
618 self,
619 file_storage: PdsCollectionStorage,
620 catalog_name: str,
621 cat_type: str,
622 result: Dict[str, Union[str, List[str]]],
623 timeout: int = 30,
624 ):
625 """Parses the PDS object (`catalog_name`), represented by a catalog type and stored
626 on the file storage with a specific implementation associated to the catalog_type.
628 The catalog is parsed by using the `get_catalog`method from the PdsStorage
629 object. The result is then stored in result variable where the key is the `catalog_type`.
630 At each `catalog_type` is associated one or several catalogs.
632 If the parsing is not successful, the eror message is notified by the use of MessageModel
633 object.
635 Args:
636 file_storage (PdsCollectionStorage): storage where the PDS objects have been downloaded
637 catalog_name (str): catalog name that must be parsed
638 cat_type (str): Type of catalog where an implementation is associated
639 result (Dict[str, Union[str, List[str]]]): the catalogs in the Volume description
640 timeout (int, optional): parser timeout in seconds. Defaults to 30
641 """
642 try:
643 cat_obj = file_storage.get_catalog(
644 file=catalog_name,
645 catalogue_type=PdsParserFactory.FileGrammary.get_enum_from(cat_type), # type: ignore
646 timeout=timeout,
647 )
648 if cat_type in result:
649 cast(List, result[cat_type]).append(cat_obj)
650 else:
651 result[cat_type] = cat_obj
652 except KeyError as err:
653 message = (
654 f"Unable to find {catalog_name} in {file_storage.directory}"
655 )
656 logger.error(message)
657 logger.exception(err)
658 self.notify_observers(
659 MessageModel(catalog_name, Exception(message))
660 )
661 except UnicodeDecodeError as err:
662 message = (
663 f"Unable to find {catalog_name} in {file_storage.directory}"
664 )
665 logger.error(message)
666 logger.exception(err)
667 self.notify_observers(
668 MessageModel(catalog_name, Exception(message))
669 )
670 except Exception as err:
671 message = (
672 f"Unable to parse {catalog_name} in {file_storage.directory}"
673 )
674 logger.error(message)
675 logger.exception(err)
676 self.notify_observers(
677 MessageModel(catalog_name, Exception(message))
678 )
680 def load_catalogs_urls(
681 self, pds_collection: PdsRegistryModel, progress_bar: bool = True
682 ):
683 """Loads the catalogs URLs from cache for a given
684 `pds_collection` collection.
686 Args:
687 pds_collection (PdsRegistryModel): PDS collection
688 progress_bar (bool, True): Set progress_bar. Defaults to True.
689 """
690 self._initialize_values()
691 self.__pds_collection = pds_collection
692 records_iter: Iterator[
693 PdsRecordsModel
694 ] = self.pds_records.parse_pds_collection_from_cache(
695 pds_collection,
696 progress_bar=progress_bar,
697 )
698 try:
699 records: PdsRecordsModel = next(records_iter)
700 self.__record = records.pds_records_model[0]
701 try:
702 self.__catalogs_urls: List[
703 str
704 ] = self._get_urls_from_volume_catalog()
705 except NoFileExistInFolder as err:
706 logger.exception(f"[NoFileExistInFolder]: {err}")
707 self.notify_observers(MessageModel(str(pds_collection), err))
708 except UnexpectedCharacters as err:
709 logger.exception(f"[ParserError]: {err}")
710 self.notify_observers(MessageModel(str(pds_collection), err))
711 except ConnectionError as err:
712 logger.exception(f"[ConnectionError]: {err}")
713 self.notify_observers(MessageModel(str(pds_collection), err))
714 except StopIteration:
715 logger.error(
716 f"No record for {pds_collection}. Please download them"
717 )
718 self.notify_observers(
719 MessageModel(str(pds_collection), Exception("No record"))
720 )
721 except JSONDecodeError:
722 logger.error(
723 f"[CorruptedFile] Please remove the file corresponding to this collection {pds_collection}"
724 )
725 self.notify_observers(
726 MessageModel(
727 str(pds_collection),
728 Exception("[CorruptedFile] Please remove the file"),
729 )
730 )
732 def get_ode_catalogs(
733 self, pds_collection: PdsRegistryModel, timeout: int = 30
734 ) -> Dict[str, Any]:
735 """Returns the PDS objects for a given space mission collection.
737 The function retrieves the PdsStorage object associated
738 with the PdsRegistryModel using get_pds_storage_for(), and then
739 retrieves the description of the volume containing the PDS objects with
740 get_volume_description(). It then lists the different types of catalogs
741 in the directory using list_catalogs(), and for each catalog, it uses
742 _parse_catalog() to retrieve information on each catalog.
744 Args:
745 pds_collection (PdsRegistryModel): the space mission collection
746 timeout (int): parser timeout in seconds. Default to 30
748 Raises:
749 TypeError: Illegal datatype for catalog
751 Returns:
752 Dict[str, Any]: list of PDS Object name and its object
753 """
754 result = dict()
755 result["collection"] = pds_collection
756 try:
757 file_storage: PdsCollectionStorage = (
758 self.database.pds_storage.get_pds_storage_for(pds_collection)
759 )
760 result[
761 PdsParserFactory.FileGrammary.VOL_DESC.name
762 ] = file_storage.get_volume_description(timeout)
763 catalogs = file_storage.list_catalogs()
764 for cat_type in catalogs.keys():
765 catalog_value: Union[str, List[str]] = catalogs[cat_type]
766 if catalog_value is None:
767 continue
768 elif isinstance(catalog_value, str):
769 catalog_name: str = catalog_value
770 self._parse_catalog(
771 file_storage, catalog_name, cat_type, result, timeout
772 )
773 elif isinstance(catalog_value, list):
774 result[cat_type] = list()
775 for catalog_name in catalog_value:
776 self._parse_catalog(
777 file_storage,
778 catalog_name,
779 cat_type,
780 result,
781 timeout,
782 )
783 else:
784 raise TypeError(
785 f"Illegal datatype for catalog : {type(catalog_value)}"
786 )
788 return result
789 except FileNotFoundError as err:
790 logger.exception(err)
791 return result
793 def __repr__(self) -> str:
794 return f"PDSCatalogDescription({self.pds_records})"
797class PDSCatalogsDescription(Observable):
798 """Provides the means to download the PDS catalogs (PDS objects).
800 .. uml::
802 class PDSCatalogsDescription {
803 - Any report
804 + Database database
805 + PDSCatalogDescription pds_object_cats
806 - build_all_urls(pds_collection: PdsRegistryModel) List[str]
807 + download(pds_collections: List[PdsRegistryModel])
808 + get_ode_catalogs(pds_collections: List[PdsRegistryModel]) -> Iterator[Dict[str, Any]]
809 + __repr__(self) str
810 }
811 """
813 def __init__(self, database: Database, *args, **kwargs):
814 """Initialize the means to download by using a database to store the results.
816 Args:
817 database (Database): database
818 """
819 super().__init__()
820 if kwargs.get("report"):
821 self.__report = kwargs.get("report")
822 self.subscribe(self.__report)
823 self.__pds_object_cats = PDSCatalogDescription(
824 database, *args, **kwargs
825 )
826 self.__database = database
828 @property
829 def pds_object_cats(self) -> PDSCatalogDescription:
830 return self.__pds_object_cats
832 @property
833 def database(self) -> Database:
834 return self.__database
836 def _build_all_urls(
837 self, pds_collection: PdsRegistryModel, progress_bar: bool = True
838 ) -> List[str]:
839 """Builds all the PDS objects URLs for collections of space missions.
841 These URLs are used to retrieve all PDS objects.
843 Args:
844 pds_collection (PdsRegistryModel): the collections of space missions
845 progress_bar (bool, True): Set progress_bar. Defaults to True.
847 Returns:
848 List[str]: List of URLs
849 """
850 logger.info(f"Fetching Catalogs URLs from {pds_collection}")
851 urls_list: List[str] = list()
852 self.pds_object_cats.load_catalogs_urls(pds_collection, progress_bar)
853 urls: List[str] = self.pds_object_cats.catalogs_urls
854 if len(urls) != 0:
855 urls_list.extend(urls)
856 urls_list.append(self.pds_object_cats.volume_desc_url)
857 return urls_list
859 def download(
860 self,
861 pds_collections: List[PdsRegistryModel],
862 nb_workers: int = 3,
863 time_sleep: int = 1,
864 progress_bar: bool = True,
865 ):
866 """Downloads the PDS objects for the collections of space missions.
868 This method is responsible for downloading the PDS objects for the given
869 collections of space missions. It does so by building a list of URLs of
870 PDS objects, creating a PdsStorage instance for the given
871 collection, and using the parallel_requests method to download each PDS object.
872 The parallel_requests function is likely using threading or multiprocessing to
873 download the objects in parallel, which is a good optimization to speed up the
874 download process.
876 Args:
877 pds_collections (List[PdsRegistryModel]): the collections of space missions
878 nb_workers (int, optional): Number of workers in parallel. Defaults to 3.
879 time_sleep (int, optional): Time to way between two download series. Defaults to 1.
880 progress_bar (bool, True): Set progress_bar. Defaults to True.
881 """
882 for pds_collection in pds_collections:
883 urls_list: List[str] = self._build_all_urls(
884 pds_collection, progress_bar
885 )
886 try:
887 file_storage: PdsCollectionStorage = (
888 self.database.pds_storage.get_pds_storage_for(
889 pds_collection
890 )
891 )
892 file_storage.download(
893 urls=urls_list,
894 nb_workers=nb_workers,
895 timeout=5,
896 time_sleep=time_sleep,
897 progress_bar=progress_bar,
898 )
899 except UnexpectedCharacters as err:
900 logger.exception(f"[ParserError]: {err}")
901 except ConnectionError as err:
902 logger.exception(f"[ConnectionError]: {err}")
904 def get_ode_catalogs(
905 self, pds_collections: List[PdsRegistryModel], timeout: int = 30
906 ) -> Iterator[Dict[str, Any]]:
907 """Get all the PDS objects for the `pds_collections`.
909 This class PDSCatalogsDescription provides the means to download the
910 PDS catalogs for the PDS collections. It has three main
911 methods:
912 1. _build_all_urls(): Builds all the PDS object URLs for a given collection
913 of space missions. This method is used to retrieve all the PDS objects of a
914 collection.
915 2. download(): Downloads the PDS objects for the PDS collections.
916 It takes a list of PdsRegistryModel as input and downloads the PDS objects for
917 each collection.
918 3. get_ode_catalogs(): Gets all the PDS objects for a list of collections of
919 space missions. It takes a list of PdsRegistryModel as input and returns an
920 iterator that yields a dictionary containing the PDS object name and its object.
921 The method internally calls the get_ode_catalogs() method of the PDSCatalogDescription
922 class, which retrieves the PDS objects for a given collection.
924 Args:
925 pds_collections (List[PdsRegistryModel]): the collections of the space mission.
926 timeout (int, optional): parser timeout in seconds. Defaults to 30
928 Yields:
929 Iterator[Dict[str, Any]]: PDS object name and its object
930 """
931 for pds_collection in pds_collections:
932 yield self.pds_object_cats.get_ode_catalogs(
933 pds_collection, timeout
934 )