Coverage for pds_crawler/transformer/pds_to_stac.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1# -*- coding: utf-8 -*- 

2# pds-crawler - ETL to index PDS data to pdssp 

3# Copyright (C) 2023 - CNES (Jean-Christophe Malapert for Pôle Surfaces Planétaires) 

4# This file is part of pds-crawler <https://github.com/pdssp/pds_crawler> 

5# SPDX-License-Identifier: LGPL-3.0-or-later 

6""" 

7Module Name: 

8 pds_to_stac 

9 

10Description: 

11 the pds_to_stac module convert the PDS3 objects and records from ODE web service to a unique 

12 STAC PDS catalog. 

13 

14Classes: 

15 StacTransformer: 

16 Abstract class. 

17 StacRecordsTransformer : 

18 Converts records from ODE webservice to PDS STAC catalog. 

19 StacCatalogTransformer : 

20 Converts PDS3 object from ODE archive to PDS STAC catalog (without items) 

21 

22Author: 

23 Jean-Christophe Malapert 

24""" 

25import logging 

26from abc import ABC 

27from typing import Any 

28from typing import cast 

29from typing import Dict 

30from typing import Iterable 

31from typing import Iterator 

32from typing import List 

33from typing import Optional 

34from typing import Union 

35 

36import pystac 

37 

38from ..exception import CrawlerError 

39from ..extractor import PDSCatalogsDescription 

40from ..extractor import PdsRecordsWs 

41from ..load import Database 

42from ..models import PdsRecordModel 

43from ..models import PdsRecordsModel 

44from ..models import PdsRegistryModel 

45from ..report import MessageModel 

46from ..utils import Observable 

47from ..utils import ProgressLogger 

48from .pds3_objects import StacPdsCollection 

49 

50logger = logging.getLogger(__name__) 

51 

52 

53class StacTransformer(ABC, Observable): 

54 """Abstract class for STAC transformation""" 

55 

56 def __init__(self, database: Database): 

57 super().__init__() 

58 if not isinstance(database, Database): 

59 raise TypeError( 

60 "[StacTransformer] must be initialized with database attribute of type Dtabase." 

61 ) 

62 self.__database = database 

63 

64 @property 

65 def database(self) -> Database: 

66 """Returns the database 

67 

68 Returns: 

69 Database: Database 

70 """ 

71 return self.__database 

72 

73 

74class StacRecordsTransformer(StacTransformer): 

75 """Convert the records to STAC.""" 

76 

77 def __init__( 

78 self, 

79 database: Database, 

80 *args, 

81 **kwargs, 

82 ): 

83 """Init the transformer of :class:`pds_crawler.extractor.PdsRecordModel` 

84 provided by the response :class:`pds_crawler.extractor.PdsRecordsWs` 

85 

86 In addition, it is possible to pass a class by *report* keyword 

87 to notify information to this class 

88 

89 Args: 

90 database (Database): Database 

91 """ 

92 super().__init__(database) 

93 if kwargs.get("report"): 

94 self.__report = kwargs.get("report") 

95 self.subscribe(self.__report) 

96 self.init() 

97 

98 def init(self): 

99 self.__catalog: pystac.Catalog 

100 

101 @property 

102 def catalog(self) -> pystac.Catalog: 

103 """Return a pySTAC catalog 

104 

105 Returns: 

106 pystac.Catalog: pySTAC catalog 

107 """ 

108 return self.__catalog 

109 

110 def load_root_catalog(self): 

111 """Loads the root catalog""" 

112 self.database.stac_storage.refresh() 

113 self.__catalog = cast( 

114 pystac.Catalog, self.database.stac_storage.root_catalog 

115 ) 

116 

117 def _create_items_stac( 

118 self, 

119 pds_records: PdsRecordsWs, 

120 pds_collection: PdsRegistryModel, 

121 progress_bar: bool = True, 

122 ) -> pystac.ItemCollection: 

123 """Creates a collection of STAC items of records from a PDS collection. 

124 

125 The records are loaded from the local storage, handled by `PdsRecord` 

126 

127 Args: 

128 pds_records (PdsRecordsWs): Object that handle Records 

129 pds_collection (PdsRegistryModel): PDS collection data 

130 progress_bar (bool, optional) : Set progress bar. Defaults to True 

131 

132 Returns: 

133 pystac.ItemCollection: Collection of items 

134 """ 

135 

136 def create_items( 

137 pages: Iterator[PdsRecordsModel], 

138 pds_collection: PdsRegistryModel, 

139 progress_bar: bool = True, 

140 ) -> Iterable[pystac.Item]: 

141 """Creates items 

142 

143 Args: 

144 pages (Iterator[PdsRecordsModel]): the different pages of the web service response 

145 pds_collection (PdsRegistryModel): information about the collection 

146 progress_bar (bool, optional) : Set progress bar. Defaults to True 

147 

148 Returns: 

149 Iterable[pystac.Item]: Items 

150 

151 Yields: 

152 Iterator[Iterable[pystac.Item]]: Items 

153 """ 

154 for page in pages: 

155 pds_records_model = page.pds_records_model 

156 with ProgressLogger( 

157 total=len(pds_records_model), 

158 iterable=pds_records_model, 

159 logger=logger, 

160 description="STAC Item objects creation", 

161 position=2, 

162 leave=False, 

163 disable_tqdm=not progress_bar, 

164 ) as pds_records_model_pbar: 

165 for record in cast( 

166 List[PdsRecordModel], pds_records_model_pbar 

167 ): 

168 if self.database.stac_storage.item_exists(record): 

169 logger.warning( 

170 f"this {record} exists in STAC directory, skip it" 

171 ) 

172 continue 

173 try: 

174 yield record.to_stac_item(pds_collection) 

175 except CrawlerError as err: 

176 self.notify_observers( 

177 MessageModel(record.ode_id, err) 

178 ) 

179 

180 pages: Iterator[ 

181 PdsRecordsModel 

182 ] = pds_records.parse_pds_collection_from_cache(pds_collection) 

183 return pystac.ItemCollection( 

184 create_items(pages, pds_collection, progress_bar=progress_bar) 

185 ) 

186 

187 def _is_exist( 

188 self, catlog_or_collection: Union[pystac.Catalog, pystac.Collection] 

189 ) -> bool: 

190 """Check if catlog_or_collection exists. 

191 

192 Args: 

193 catlog_or_collection (Union[pystac.Catalog, pystac.Collection]): STAC catalog or collection 

194 

195 Returns: 

196 bool: True when the catalog or the collection exists otherwise False 

197 """ 

198 return catlog_or_collection is not None 

199 

200 def to_stac( 

201 self, 

202 pds_records: PdsRecordsWs, 

203 pds_collections: List[PdsRegistryModel], 

204 progress_bar: bool = True, 

205 ): 

206 """Create STAC catalogs with its children for all collections. 

207 

208 Args: 

209 pds_records (PdsRecordsWs): Web service that handles the query to get the responses for a given collection 

210 pds_collections (List[PdsRegistryModel]): All PDS collections data 

211 progress_bar (bool, optional): Set progress bar. Defaults to True 

212 """ 

213 # Create a progress logger to track the processing of collections 

214 with ProgressLogger( 

215 total=len(pds_collections), 

216 iterable=pds_collections, 

217 logger=logger, 

218 description="Processing collection", 

219 position=0, 

220 disable_tqdm=not progress_bar, 

221 ) as progress_logger: 

222 # Iterate over each PDS collection and process it 

223 for pds_collection in cast( 

224 List[PdsRegistryModel], progress_logger 

225 ): 

226 progress_logger.write_msg( 

227 f"Processing the collection {pds_collection}" 

228 ) 

229 

230 # Create items for the current collection 

231 items_stac = self._create_items_stac( 

232 pds_records, pds_collection, progress_bar=progress_bar 

233 ) 

234 if len(items_stac.items) == 0: 

235 progress_logger.write_msg( 

236 "No new item, skip the STAC catalogs creation" 

237 ) 

238 continue 

239 else: 

240 progress_logger.write_msg( 

241 f"{len(items_stac.items)} items to add" 

242 ) 

243 

244 # load STAC collection if it exists 

245 stac_collection = cast( 

246 pystac.Collection, 

247 self.catalog.get_child( 

248 pds_collection.get_collection_id(), recursive=True 

249 ), 

250 ) 

251 

252 # load STAC instrument if it exists 

253 stac_instru = cast( 

254 pystac.Catalog, 

255 self.catalog.get_child( 

256 pds_collection.get_instrument_id(), recursive=True 

257 ), 

258 ) 

259 

260 # load STAC plateform if it exists 

261 stac_host = cast( 

262 pystac.Catalog, 

263 self.catalog.get_child( 

264 pds_collection.get_plateform_id(), recursive=True 

265 ), 

266 ) 

267 

268 # load STAC mission if it exists 

269 stac_mission: pystac.Catalog = cast( 

270 pystac.Catalog, 

271 self.catalog.get_child( 

272 pds_collection.get_mission_id(), recursive=True 

273 ), 

274 ) 

275 

276 # load STAC body if it exists 

277 stac_body: pystac.Catalog = cast( 

278 pystac.Catalog, 

279 self.catalog.get_child(pds_collection.get_body_id()), 

280 ) 

281 

282 new_catalog: Optional[ 

283 Union[pystac.Catalog, pystac.Collection] 

284 ] = None 

285 

286 # Create the STAC catalog for the body if it doesn't exist 

287 if not self._is_exist(stac_body): 

288 stac_body: pystac.Catalog = ( 

289 pds_collection.create_stac_body_catalog() 

290 ) 

291 if new_catalog is None: 

292 new_catalog = stac_body 

293 self.catalog.add_child(stac_body) 

294 

295 # Create the STAC catalog for the mission if it doesn't exist 

296 if not self._is_exist(stac_mission): 

297 stac_mission: pystac.Catalog = ( 

298 pds_collection.create_stac_mission_catalog() 

299 ) 

300 if new_catalog is None: 

301 new_catalog = stac_mission 

302 stac_body.add_child(stac_mission) 

303 

304 if not self._is_exist(stac_host): 

305 stac_host: pystac.Catalog = ( 

306 pds_collection.create_stac_platform_catalog() 

307 ) 

308 if new_catalog is None: 

309 new_catalog = stac_host 

310 stac_mission.add_child(stac_host) 

311 

312 if not self._is_exist(stac_instru): 

313 stac_instru: pystac.Catalog = ( 

314 pds_collection.create_stac_instru_catalog() 

315 ) 

316 if new_catalog is None: 

317 new_catalog = stac_instru 

318 stac_host.add_child(stac_instru) 

319 

320 if not self._is_exist(stac_collection): 

321 stac_collection: pystac.Catalog = ( 

322 pds_collection.create_stac_collection() 

323 ) 

324 if new_catalog is None: 

325 new_catalog = stac_collection 

326 stac_instru.add_child(stac_collection) 

327 

328 stac_collection.add_items(items_stac) 

329 

330 if new_catalog is None: 

331 self.database.stac_storage.normalize_and_save( 

332 stac_collection 

333 ) 

334 parent = cast( 

335 pystac.Collection, stac_collection.get_parent() 

336 ) 

337 parent.save_object(include_self_link=False) 

338 else: 

339 self.database.stac_storage.normalize_and_save(new_catalog) 

340 parent = cast(pystac.Catalog, new_catalog.get_parent()) 

341 parent.save_object(include_self_link=False) 

342 

343 def describe(self): 

344 """Describes the STAC catalog and its children as a tree""" 

345 self.catalog.describe() 

346 

347 def save(self): 

348 """Nothing happens. 

349 Averything is saved in to_stac method""" 

350 pass 

351 

352 

353class StacCatalogTransformer(StacTransformer): 

354 """Converts the catalogs to STAC.""" 

355 

356 def __init__(self, database: Database, *args, **kwargs): 

357 """Initialises the object with database to get access to the data.""" 

358 super().__init__(database) 

359 if kwargs.get("report"): 

360 self.__report = kwargs.get("report") 

361 self.subscribe(self.__report) 

362 self.init() 

363 

364 def init(self): 

365 """Initialise the catalog""" 

366 self.database.stac_storage._load_root_catalog() 

367 self.__catalog = cast( 

368 pystac.Catalog, self.database.stac_storage.root_catalog 

369 ) 

370 self.__stac_pds_collection = StacPdsCollection(self.__catalog) 

371 

372 @property 

373 def catalog(self) -> pystac.Catalog: 

374 """Returns the root catalog 

375 

376 Returns: 

377 pystac.Catalog: the root catalog 

378 """ 

379 return self.__catalog 

380 

381 def _build_stac_cats_and_colls_for_all_pds_catalogs( 

382 self, catalogs_pds_collections: Iterator[Dict[str, Any]] 

383 ): 

384 """Builds STAC catalogs and collections for all PDS collections 

385 

386 Args: 

387 catalogs_pds_collections (Iterator[Dict[str, Any]]): Catalogs for all PDS collections 

388 """ 

389 for catalogs_pds_collection in catalogs_pds_collections: 

390 logger.info(f"Creating STAC catalog for {catalogs_pds_collection}") 

391 self.__stac_pds_collection.catalogs = catalogs_pds_collection 

392 self.__stac_pds_collection.to_stac() 

393 

394 def to_stac( 

395 self, 

396 pds_ode_catalogs: PDSCatalogsDescription, 

397 pds_collections: List[PdsRegistryModel], 

398 **kwargs, 

399 ): 

400 """Creates the STAC catalog and its children. 

401 

402 Args: 

403 pds_ode_catalogs (PDSCatalogsDescription): PDS3 objects 

404 pds_collections (List[PdsRegistryModel]): PDS Collection data 

405 """ 

406 catalogs: Iterator[Dict[str, Any]] = pds_ode_catalogs.get_ode_catalogs( 

407 pds_collections, kwargs.get("timeout", 30) 

408 ) 

409 self._build_stac_cats_and_colls_for_all_pds_catalogs(catalogs) 

410 

411 def describe(self): 

412 """Describe the catalog""" 

413 self.catalog.describe() 

414 

415 def save(self): 

416 """Save on disk the catalog""" 

417 self.database.stac_storage.root_normalize_and_save(self.catalog)