Coverage for pds_crawler/etl.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

254 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6"""Provides a ETL API. 

7 

8.. code:: python 

9 

10 from pds_crawler.etl import PdsSourceEnum, PdsDataEnum, StacETL 

11 etl = StacETL("work/database") 

12 etl.extract(PdsSourceEnum.COLLECTIONS_INDEX) 

13 

14 

15""" 

16import logging 

17from abc import ABC 

18from abc import abstractmethod 

19from typing import Any 

20from typing import cast 

21from typing import List 

22from typing import Optional 

23from typing import Tuple 

24 

25from pystac import Catalog 

26from pystac import Collection 

27 

28from .extractor import PDSCatalogsDescription 

29from .extractor import PdsRecordsWs 

30from .extractor import PdsRegistry 

31from .load import Database 

32from .models import PdsRegistryModel 

33from .report import CrawlerReport 

34from .transformer import StacCatalogTransformer 

35from .transformer import StacRecordsTransformer 

36from .utils import DocEnum 

37from .utils import UtilsMonitoring 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42class AbstractSourceEnum(DocEnum): 

43 pass 

44 

45 

46class AbstractDataEnum(DocEnum): 

47 pass 

48 

49 

50class CheckUpdateEnum(AbstractSourceEnum): 

51 CHECK_PDS = ( 

52 "pds", 

53 "Check updates at PDS", 

54 ) 

55 CHECK_CACHE = ( 

56 "cache", 

57 "Check if some collections in cache are not transformed", 

58 ) 

59 

60 @staticmethod 

61 def find_enum(name: str): 

62 """Find enum based on its value 

63 

64 Args: 

65 name (str): enum value 

66 

67 Raises: 

68 ValuError: Unknown value 

69 

70 Returns: 

71 CheckUpdateEnum: Enum 

72 """ 

73 result = None 

74 for pf_name in CheckUpdateEnum.__members__: 

75 val = str(CheckUpdateEnum[pf_name].value) 

76 if val == name: 

77 result = CheckUpdateEnum[pf_name] 

78 break 

79 if result is None: 

80 raise ValueError(f"Unknown enum value for {name}") 

81 return result 

82 

83 

84class PdsSourceEnum(AbstractSourceEnum): 

85 COLLECTIONS_INDEX = ( 

86 "ode_collections", 

87 "Get the georeferenced products", 

88 ) 

89 COLLECTIONS_INDEX_SAVE = ( 

90 "ode_collections_save", 

91 "Get and save the PDS collections for the georeferenced products", 

92 ) 

93 PDS_CATALOGS = ( 

94 "pds_objects", 

95 "PDS objects describing a catalog (mission, instrument, plateform, ...)", 

96 ) 

97 PDS_RECORDS = ("ode_records", "Records metadata for a given collection") 

98 

99 @staticmethod 

100 def find_enum(name: str): 

101 """Find enum based on its value 

102 

103 Args: 

104 name (str): enum value 

105 

106 Raises: 

107 UnknownPFEnum: Unknown value 

108 

109 Returns: 

110 PdsSourceEnum: Enum 

111 """ 

112 result = None 

113 for pf_name in PdsSourceEnum.__members__: 

114 val = str(PdsSourceEnum[pf_name].value) 

115 if val == name: 

116 result = PdsSourceEnum[pf_name] 

117 break 

118 if result is None: 

119 raise ValueError(f"Unknown enum value for {name}") 

120 return result 

121 

122 

123class PdsDataEnum(AbstractDataEnum): 

124 PDS_CATALOGS = ( 

125 "pds_objects", 

126 "PDS objects describing a catalog (mission, instrument, plateform, ...)", 

127 ) 

128 PDS_RECORDS = ("ode_records", "Records metadata for a given collection") 

129 

130 @staticmethod 

131 def find_enum(name: str): 

132 """Find enum based on its value 

133 

134 Args: 

135 name (str): enum value 

136 

137 Raises: 

138 UnknownPFEnum: Unknown value 

139 

140 Returns: 

141 PdsDataEnum: Enum 

142 """ 

143 result = None 

144 for pf_name in PdsDataEnum.__members__: 

145 val = str(PdsDataEnum[pf_name].value) 

146 if val == name: 

147 result = PdsDataEnum[pf_name] 

148 break 

149 if result is None: 

150 raise ValueError(f"Unknown enum value for {name}") 

151 return result 

152 

153 

154class ETL(ABC): 

155 @abstractmethod 

156 def extract(self, source: AbstractSourceEnum, *args, **kwargs): 

157 raise NotImplementedError() 

158 

159 @abstractmethod 

160 def transform(self, data: AbstractDataEnum, *args, **kwargs): 

161 raise NotImplementedError() 

162 

163 @abstractmethod 

164 def load(self, data: PdsDataEnum, target: Any, *args, **kwargs): 

165 raise NotImplementedError() 

166 

167 

168class StacETL(ETL): 

169 """ETL to extract and transform PDS information to STAC""" 

170 

171 def __init__(self, full_path_database_name: str) -> None: 

172 """Initialize the class. 

173 

174 Args: 

175 full_path_database_name (str): the full path to the database 

176 """ 

177 # Create a new instance of the Database class 

178 db = Database(full_path_database_name) 

179 self.__report = CrawlerReport(db) 

180 self.__pds_registry = PdsRegistry(db, report=self.__report) 

181 self.__pds_records = PdsRecordsWs(db, report=self.__report) 

182 self.__pds_ode_catalogs = PDSCatalogsDescription( 

183 db, report=self.__report 

184 ) 

185 self.__stac_catalog_transformer = StacCatalogTransformer( 

186 db, report=self.__report 

187 ) 

188 self.__stac_records_transformer = StacRecordsTransformer( 

189 db, report=self.__report 

190 ) 

191 self.__body: Optional[str] = None 

192 

193 self.__dataset_id: Optional[str] = None 

194 self.__nb_workers: int = 3 

195 self.__time_sleep: int = 1 

196 self.__progress_bar: bool = True 

197 self.__is_sample: bool = False 

198 self.__nb_records_per_page: int = 5000 

199 self.__parser_timeout: int = 60 

200 

201 @property 

202 def nb_records_per_page(self) -> int: 

203 """Getter for the number of records per page.""" 

204 return self.__nb_records_per_page 

205 

206 @nb_records_per_page.setter 

207 def nb_records_per_page(self, value: int): 

208 """Setter for the number of records per page. 

209 

210 Args: 

211 value (int): the new value for the number of records per page 

212 """ 

213 self.__nb_records_per_page = value 

214 

215 @property 

216 def report(self) -> CrawlerReport: 

217 """Getter for the CrawlerReport instance.""" 

218 return self.__report 

219 

220 @report.setter 

221 def report(self, value: CrawlerReport): 

222 """Setter for the CrawlerReport instance. 

223 

224 Args: 

225 value (CrawlerReport): the new CrawlerReport instance 

226 """ 

227 self.__report = value 

228 

229 @property 

230 def pds_registry(self) -> PdsRegistry: 

231 """Getter for the PdsRegistry instance.""" 

232 return self.__pds_registry 

233 

234 @property 

235 def pds_records(self) -> PdsRecordsWs: 

236 """Getter for the PdsRecordsWs instance.""" 

237 return self.__pds_records 

238 

239 @property 

240 def pds_ode_catalogs(self) -> PDSCatalogsDescription: 

241 """Getter for the PDSCatalogsDescription instance.""" 

242 return self.__pds_ode_catalogs 

243 

244 @property 

245 def stac_catalog_transformer(self) -> StacCatalogTransformer: 

246 """Getter for the StacCatalogTransformer instance.""" 

247 return self.__stac_catalog_transformer 

248 

249 @property 

250 def stac_records_transformer(self) -> StacRecordsTransformer: 

251 """Getter for the StacRecordsTransformer instance.""" 

252 return self.__stac_records_transformer 

253 

254 @property 

255 def body(self) -> Optional[str]: 

256 """Getter for the body attribute.""" 

257 return self.__body 

258 

259 @body.setter 

260 def body(self, name: str): 

261 """Setter for the body attribute. 

262 

263 Args: 

264 name (str): the new value for the solar body attribute 

265 """ 

266 self.__body = name 

267 

268 @property 

269 def dataset_id(self) -> Optional[str]: 

270 """Getter for the dataset_id attribute.""" 

271 return self.__dataset_id 

272 

273 @dataset_id.setter 

274 def dataset_id(self, value: str): 

275 self.__dataset_id = value 

276 

277 @property 

278 def nb_workers(self) -> int: 

279 """Getter for the nbumber of parallel workers.""" 

280 return self.__nb_workers 

281 

282 @nb_workers.setter 

283 def nb_workers(self, value: int): 

284 self.__nb_workers = value 

285 

286 @property 

287 def time_sleep(self) -> int: 

288 """Getter for the time sleep attribute.""" 

289 return self.__time_sleep 

290 

291 @time_sleep.setter 

292 def time_sleep(self, value: int): 

293 self.__time_sleep = value 

294 

295 @property 

296 def progress_bar(self) -> bool: 

297 """Getter for the progress_bar attribute.""" 

298 return self.__progress_bar 

299 

300 @progress_bar.setter 

301 def progress_bar(self, value: bool): 

302 self.__progress_bar = value 

303 

304 @property 

305 def is_sample(self) -> bool: 

306 """Getter for the is_sample attribute.""" 

307 return self.__is_sample 

308 

309 @is_sample.setter 

310 def is_sample(self, value: bool): 

311 self.__is_sample = value 

312 

313 @property 

314 def parser_timeout(self) -> int: 

315 """Getter for the parser timeout attribute.""" 

316 return self.__parser_timeout 

317 

318 @parser_timeout.setter 

319 def parser_timeout(self, value: int): 

320 self.__parser_timeout = value 

321 

322 @UtilsMonitoring.timeit 

323 def extract(self, source: PdsSourceEnum, *args, **kwargs): 

324 """Extract the PDS information. 

325 

326 It exists different types of extraction: 

327 

328 * COLLECTIONS_INDEX : query the PDS to get the georefereced collections 

329 * COLLECTIONS_INDEX_SAVE : like COLLECTIONS_INDEX but save the result in cache 

330 * PDS_RECORDS : Load the PDS collections from cache, build and cache all the URLs to get all pages of the PdsRecordsWs and download all pages 

331 * PDS_CATALOGS : Load the PDS collections from cache, and download the PDS3 objects 

332 

333 Args: 

334 source (PdsSourceEnum): Type of extraction 

335 

336 Raises: 

337 NotImplementedError: Extraction type is not implemented 

338 """ 

339 match source: 

340 case PdsSourceEnum.COLLECTIONS_INDEX: 

341 ( 

342 stats, 

343 collections_pds, 

344 ) = self.pds_registry.get_pds_collections( 

345 self.body, self.dataset_id 

346 ) 

347 for collection in collections_pds: 

348 print(collection) 

349 case PdsSourceEnum.COLLECTIONS_INDEX_SAVE: 

350 ( 

351 stats, 

352 collections_pds, 

353 ) = self.pds_registry.get_pds_collections( 

354 self.body, self.dataset_id 

355 ) 

356 self.pds_registry.cache_pds_collections(collections_pds) 

357 case PdsSourceEnum.PDS_RECORDS: 

358 pds_collections: List[ 

359 PdsRegistryModel 

360 ] = self.pds_registry.load_pds_collections_from_cache( 

361 self.body, self.dataset_id 

362 ) 

363 self.pds_records.generate_urls_for_all_collections( 

364 pds_collections=pds_collections, 

365 limit=self.nb_records_per_page, 

366 ) 

367 limit: Optional[int] = 1 if self.is_sample else None 

368 self.pds_records.download_pds_records_for_all_collections( 

369 pds_collections=pds_collections, 

370 progress_bar=self.progress_bar, 

371 limit=limit, 

372 ) 

373 case PdsSourceEnum.PDS_CATALOGS: 

374 pds_collections: List[ 

375 PdsRegistryModel 

376 ] = self.pds_registry.load_pds_collections_from_cache( 

377 self.body, self.dataset_id 

378 ) 

379 self.pds_ode_catalogs.download( 

380 pds_collections=pds_collections, 

381 nb_workers=self.nb_workers, 

382 time_sleep=self.time_sleep, 

383 progress_bar=self.progress_bar, 

384 ) 

385 case _: 

386 raise NotImplementedError( 

387 f"Extraction is not implemented for {source}" 

388 ) 

389 

390 def _check_collections_to_ingest( 

391 self, cached_pds_collections: List[PdsRegistryModel] 

392 ) -> int: 

393 """This method checks the number of PDS collections that need to be ingested into the 

394 database from the cache. 

395 

396 Args: 

397 cached_pds_collections (List[PdsRegistryModel]): List of PDS collections in the cache. 

398 

399 Returns: 

400 int: Number of collections to ingest from cache. 

401 """ 

402 db: Database = self.stac_records_transformer.database 

403 root_stac_catalog: Catalog = cast( 

404 Catalog, db.stac_storage.root_catalog 

405 ) 

406 nb_to_ingest: int = 0 

407 for pds_collection in cached_pds_collections: 

408 coll = cast( 

409 Collection, 

410 root_stac_catalog.get_child( 

411 pds_collection.get_collection_id(), recursive=True 

412 ), 

413 ) 

414 if coll is None: 

415 nb_to_ingest += 1 

416 logger.info(f"{pds_collection} was not ingested") 

417 return nb_to_ingest 

418 

419 def _check_updates_from_PDS( 

420 self, 

421 pds_collections: List[PdsRegistryModel], 

422 pds_collections_cache: List[PdsRegistryModel], 

423 ) -> int: 

424 """ 

425 This method checks the number of PDS collections that need to be updated in the database from the PDS. 

426 

427 Args: 

428 pds_collections (List[PdsRegistryModel]): List of PDS collections. 

429 pds_collections_cache (List[PdsRegistryModel]): List of PDS collections in the cache. 

430 

431 Returns: 

432 int: Number of collections to update in the database. 

433 """ 

434 nb_to_update: int = 0 

435 for pds_collection_cache in pds_collections_cache: 

436 try: 

437 pds_collections.index(pds_collection_cache) 

438 except ValueError: 

439 nb_to_update += 1 

440 logger.info(f"{pds_collections_cache} has been updated at PDS") 

441 return nb_to_update 

442 

443 def check_collections_to_ingest_from_pds( 

444 self, 

445 pds_collections: List[PdsRegistryModel], 

446 pds_collections_cache: List[PdsRegistryModel], 

447 ) -> Tuple[int, int]: 

448 """ 

449 This method checks the number of PDS collections that need to be ingested into the database from the PDS. 

450 

451 Args: 

452 pds_collections (List[PdsRegistryModel]): List of PDS collections. 

453 pds_collections_cache (List[PdsRegistryModel]): List of PDS collections in the cache. 

454 

455 Returns: 

456 Tuple[int, int]: Tuple containing the number of collections to ingest and the total number of records. 

457 """ 

458 nb_to_ingest: int = 0 

459 nb_records: int = 0 

460 for pds_collection in pds_collections: 

461 try: 

462 pds_collections_cache.index(pds_collection) 

463 except ValueError: 

464 nb_records += pds_collection.NumberProducts 

465 nb_to_ingest += 1 

466 logger.info(f"{pds_collection} must be ingested") 

467 return (nb_to_ingest, nb_records) 

468 

469 @UtilsMonitoring.timeit 

470 def check_update(self, source: CheckUpdateEnum, *args, **kwargs): 

471 """ 

472 This method checks if there are any updates from the PDS or in the cache. 

473 

474 Args: 

475 source (CheckUpdateEnum): The source of the check (cache or PDS). 

476 *args: Variable length argument list. 

477 **kwargs: Arbitrary keyword arguments. 

478 

479 Raises: 

480 NotImplementedError: If the source is not supported. 

481 """ 

482 match source: 

483 case CheckUpdateEnum.CHECK_CACHE: 

484 pds_collections: List[ 

485 PdsRegistryModel 

486 ] = self.pds_registry.load_pds_collections_from_cache( 

487 self.body, self.dataset_id 

488 ) 

489 nb_to_ingest: int = self._check_collections_to_ingest( 

490 pds_collections 

491 ) 

492 logger.info( 

493 f"""Summary: 

494 {nb_to_ingest} collection(s) to ingest from cache""" 

495 ) 

496 

497 case CheckUpdateEnum.CHECK_PDS: 

498 pds_collections_cache: List[ 

499 PdsRegistryModel 

500 ] = self.pds_registry.load_pds_collections_from_cache( 

501 self.body, self.dataset_id 

502 ) 

503 _, pds_collections = self.pds_registry.get_pds_collections( 

504 self.body, self.dataset_id 

505 ) 

506 nb_to_update: int = self._check_updates_from_PDS( 

507 pds_collections, pds_collections_cache 

508 ) 

509 ( 

510 nb_to_ingest, 

511 nb_records, 

512 ) = self.check_collections_to_ingest_from_pds( 

513 pds_collections, pds_collections_cache 

514 ) 

515 logger.info( 

516 f"""Summary: 

517 - {nb_to_ingest} collection(s) to ingest from PDS, i.e {nb_records} products 

518 - {nb_to_update} collection(s) to update""" 

519 ) 

520 

521 case _: 

522 raise NotImplementedError( 

523 f"Check update is not implemented for {source}" 

524 ) 

525 

526 @UtilsMonitoring.timeit 

527 def transform(self, data: PdsDataEnum, *args, **kwargs): 

528 """Transform the downloaded information as STAC 

529 

530 It exists different types of extraction: 

531 

532 * PDS_RECORDS : Load the PDS collections from cache, convert to STAC items and parents if parents are not available 

533 * PDS_CATALOGS : Load the PDS collections from cache, convert/update to STAC the various catalogs/collections 

534 

535 Args: 

536 data (PdsDataEnum): Type of transformation 

537 

538 Raises: 

539 NotImplementedError: Transformation type is not implemented 

540 """ 

541 pds_collections: List[ 

542 PdsRegistryModel 

543 ] = self.pds_registry.load_pds_collections_from_cache( 

544 self.body, self.dataset_id 

545 ) 

546 match data: 

547 case PdsDataEnum.PDS_CATALOGS: 

548 self.report.name = PdsDataEnum.PDS_CATALOGS.name 

549 self.report.start_report() 

550 self.stac_catalog_transformer.init() 

551 self.stac_catalog_transformer.to_stac( 

552 self.pds_ode_catalogs, 

553 pds_collections, 

554 timeout=self.parser_timeout, 

555 ) 

556 self.stac_catalog_transformer.save() 

557 self.report.close_report() 

558 case PdsDataEnum.PDS_RECORDS: 

559 self.report.name = PdsDataEnum.PDS_RECORDS.name 

560 self.report.start_report() 

561 self.stac_records_transformer.init() 

562 self.stac_records_transformer.load_root_catalog() 

563 self.stac_records_transformer.to_stac( 

564 self.pds_records, pds_collections 

565 ) 

566 self.stac_records_transformer.save() 

567 self.report.close_report() 

568 case _: 

569 raise NotImplementedError( 

570 f"Transformation is not implemented for {data}" 

571 ) 

572 

573 def load(self, data: PdsDataEnum, target: str, *args, **kwargs): 

574 pass