21 from __future__
import annotations
23 __all__ = [
"RepoConverter"]
27 from dataclasses
import dataclass
28 from collections
import defaultdict
29 from abc
import ABC, abstractmethod
30 from typing
import TYPE_CHECKING, Generic, TypeVar, List, Tuple, Optional, Iterator, Set, Any, Callable, Dict
32 from lsst.daf.butler
import DatasetRef, Butler
as Butler3, DataCoordinate, FileDataset
35 from .filePathParser
import FilePathParser
38 from ..mapping
import Mapping
as CameraMapperMapping
39 from .dataIdExtractor
import DataIdExtractor
40 from .convertRepo
import ConvertRepoTask
41 from lsst.daf.butler
import StorageClass, Registry, SkyPixDimension
44 REPO_ROOT_FILES = (
"registry.sqlite3",
"_mapper",
"repositoryCfg.yaml",
"calibRegistry.sqlite3",
"_parent")
51 """A simple container that maintains a most-recently-used ordering. 64 def apply(self, func: Callable[[T], Any]) -> Any:
65 """Apply a function to elements until it returns a value that coerces 66 to `True`, and move the corresponding element to the front of the 77 The first value returned by ``func`` that coerces to `True`. 79 for n, element
in enumerate(self):
80 result = func(element)
96 """Add a new element to the front of the stack. 103 """A helper class for `ConvertRepoTask` and `RepoConverter` that maintains 104 lists of related data ID values that should be included in the conversion. 109 Instrument name used in Gen3 data IDs. 110 visits : `set` of `int` 111 Visit IDs that define the filter. 114 def __init__(self, instrument: str, visits: Set[int]):
122 """Populate the included tract IDs for the given skymap from those that 123 overlap the visits the `ConversionSubset` was initialized with. 127 registry : `lsst.daf.butler.Registry` 128 Registry that can be queried for visit/tract overlaps. 130 SkyMap name used in Gen3 data IDs. 133 self.
tracts[name] = tracts
135 for dataId
in self.registry.queryDimensions([
"tract"], expand=
False,
136 dataId={
"skymap": name,
"visit": visit}):
137 tracts.add(dataId[
"tract"])
138 self.task.log.info(
"Limiting datasets defined on skymap %s to %s tracts.", name, len(tracts))
140 def addSkyPix(self, registry: Registry, dimension: SkyPixDimension):
141 """Populate the included skypix IDs for the given dimension from those 142 that overlap the visits the `ConversionSubset` was initialized with. 146 registry : `lsst.daf.butler.Registry` 147 Registry that can be queried for visit regions. 149 SkyMap name used in Gen3 data IDs. 154 dataId = registry.expandDataId(instrument=self.
instrument, visit=visit)
158 ranges = ranges.join(dimension.pixelization.envelope(region))
159 self.
skypix[dimension] = ranges
162 """Test whether the given data ID is related to this subset and hence 163 should be included in a repository conversion. 167 dataId : `lsst.daf.butler.DataCoordinate` 173 `True` if this data ID should be included in a repository 178 More formally, this tests that the given data ID is not unrelated; 179 if a data ID does not involve tracts, visits, or skypix dimensions, 180 we always include it. 185 if "visit" in dataId.graph
and dataId[
"visit"]
not in self.
visits:
187 if "tract" in dataId.graph
and dataId[
"tract"]
not in self.
tracts[dataId[
"skymap"]]:
190 if dimension
in dataId.graph
and not ranges.intersects(dataId[dimension]):
198 """The name of the instrument, as used in Gen3 data IDs (`str`). 202 """The set of visit IDs that should be included in the conversion (`set` 206 regions: Optional[List[Region]]
207 """Regions for all visits (`list` of `lsst.sphgeom.Region`). 209 Set to `None` before it has been initialized. Any code that attempts to 210 use it when it is `None` has a logic bug. 213 tracts: Dict[str, Set[int]]
214 """Tracts that should be included in the conversion, grouped by skymap 215 name (`dict` mapping `str` to `set` of `int`). 218 skypix: Dict[SkyPixDimension, RangeSet]
219 """SkyPix ranges that should be included in the conversion, grouped by 220 dimension (`dict` mapping `SkyPixDimension` to `lsst.sphgeom.RangeSet`). 225 """An abstract base class for objects that help `ConvertRepoTask` convert 226 datasets from a single Gen2 repository. 230 task : `ConvertRepoTask` 231 Task instance that is using this helper object. 233 Root of the Gen2 repo being converted. 234 collections : `list` of `str` 235 Gen3 collections with which all converted datasets should be 237 subset : `ConversionSubset, optional 238 Helper object that implements a filter that restricts the data IDs that 243 `RepoConverter` defines the only public API users of its subclasses should 244 use (`prep`, `insertDimensionRecords`, and `ingest`). These delegate to 245 several abstract methods that subclasses must implement. In some cases, 246 subclasses may reimplement the public methods as well, but are expected to 247 delegate to ``super()`` either at the beginning or end of their own 251 def __init__(self, *, task: ConvertRepoTask, root: str, collections: List[str],
252 subset: Optional[ConversionSubset] =
None):
262 """Test whether the given dataset is handled specially by this 263 converter and hence should be ignored by generic base-class logic that 264 searches for dataset types to convert. 268 datasetTypeName : `str` 269 Name of the dataset type to test. 274 `True` if the dataset type is special. 276 raise NotImplementedError()
280 """Test whether the given directory is handled specially by this 281 converter and hence should be ignored by generic base-class logic that 282 searches for datasets to convert. 287 Subdirectory. This is only ever a single subdirectory, and it 288 could appear anywhere within a repo root. (A full path relative 289 to the repo root might be more useful, but it is harder to 290 implement, and we don't currently need it to identify any special 296 `True` if the direct is special. 298 raise NotImplementedError()
302 """Iterate over all `CameraMapper` `Mapping` objects that should be 303 considered for conversion by this repository. 305 This this should include any datasets that may appear in the 306 repository, including those that are special (see 307 `isDatasetTypeSpecial`) and those that are being ignored (see 308 `ConvertRepoTask.isDatasetTypeIncluded`); this allows the converter 309 to identify and hence skip these datasets quietly instead of warning 310 about them as unrecognized. 314 datasetTypeName: `str` 315 Name of the dataset type. 316 mapping : `lsst.obs.base.mapping.Mapping` 317 Mapping object used by the Gen2 `CameraMapper` to describe the 320 raise NotImplementedError()
324 storageClass: StorageClass) -> DataIdExtractor:
325 """Construct a `DataIdExtractor` instance appropriate for a particular 330 datasetTypeName : `str` 331 Name of the dataset type; typically forwarded directly to 332 the `DataIdExtractor` constructor. 333 parser : `FilePathParser` 334 Object that parses filenames into Gen2 data IDs; typically 335 forwarded directly to the `DataIdExtractor` constructor. 336 storageClass : `lsst.daf.butler.StorageClass` 337 Storage class for this dataset type in the Gen3 butler; typically 338 forwarded directly to the `DataIdExtractor` constructor. 342 extractor : `DataIdExtractor` 343 A new `DataIdExtractor` instance. 345 raise NotImplementedError()
348 """Iterate over all datasets in the repository that should be 349 ingested into the Gen3 repository. 351 Subclasses may override this method, but must delegate to the base 352 class implementation at some point in their own logic. 356 dataset : `FileDataset` 357 Structures representing datasets to be ingested. Paths should be 359 ref : `lsst.daf.butler.DatasetRef` 360 Reference for the Gen3 datasets, including a complete `DatasetType` 363 for dirPath, subdirNamesInDir, fileNamesInDir
in os.walk(self.
root, followlinks=
True):
366 def isRepoRoot(dirName):
367 return any(os.path.exists(os.path.join(dirPath, dirName, f))
368 for f
in REPO_ROOT_FILES)
369 subdirNamesInDir[:] = [d
for d
in subdirNamesInDir
374 dirPathInRoot = dirPath[len(self.
root) + len(os.path.sep):]
375 for fileNameInDir
in fileNamesInDir:
376 if any(fnmatch.fnmatchcase(fileNameInDir, pattern)
377 for pattern
in self.
task.config.fileIgnorePatterns):
379 fileNameInRoot = os.path.join(dirPathInRoot, fileNameInDir)
380 if fileNameInRoot
in REPO_ROOT_FILES:
384 if self.
subset is None or self.
subset.isRelated(ref.dataId):
385 yield FileDataset(path=os.path.join(self.
root, fileNameInRoot), ref=ref)
390 """Prepare the repository by identifying the dataset types to be 391 converted and building `DataIdExtractor` instance for them. 393 Subclasses may override this method, but must delegate to the base 394 class implementation at some point in their own logic. More often, 395 subclasses will specialize the behavior of `prep` simply by overriding 396 `iterMappings`, `isDatasetTypeSpecial`, and `makeDataIdExtractor`, to 397 which the base implementation delegates. 399 This should not perform any write operations to the Gen3 repository. 400 It is guaranteed to be called before `insertDimensionData` and 403 self.
task.log.info(f
"Preparing other datasets from root {self.root}.")
406 parser = FilePathParser.fromMapping(mapping)
413 if (
not self.
task.isDatasetTypeIncluded(datasetTypeName)
or 417 self._skipParsers.push((parser, datasetTypeName,
None))
420 if storageClass
is None:
425 self._skipParsers.push((parser, datasetTypeName,
"no storage class found."))
430 """Insert any dimension records uniquely derived from this repository 433 Subclasses may override this method, but may not need to; the default 434 implementation does nothing. 436 SkyMap and SkyPix dimensions should instead be handled by calling 437 `ConvertRepoTask.useSkyMap` or `ConvertRepoTask.useSkyPix`, because 438 these dimensions are in general shared by multiple Gen2 repositories. 440 This method is guaranteed to be called between `prep` and `ingest`. 445 """Insert converted datasets into the Gen3 repository. 447 Subclasses may override this method, but must delegate to the base 448 class implementation at some point in their own logic. More often, 449 subclasses will specialize the behavior of `ingest` simply by 450 overriding `iterDatasets` and `isDirectorySpecial`, to which the base 451 implementation delegates. 453 This method is guaranteed to be called after both `prep` and 454 `insertDimensionData`. 456 self.task.log.info(
"Finding datasets in repo %s.", self.root)
457 datasetsByType = defaultdict(list)
458 for dataset
in self.iterDatasets():
459 datasetsByType[dataset.ref.datasetType].
append(dataset)
460 for datasetType, datasetsForType
in datasetsByType.items():
461 self.task.registry.registerDatasetType(datasetType)
462 self.task.log.info(
"Ingesting %s %s datasets.", len(datasetsForType), datasetType.name)
464 butler3, collections = self.getButler(datasetType.name)
465 except LookupError
as err:
466 self.task.log.warn(str(err))
469 butler3.ingest(*datasetsForType, transfer=self.task.config.transfer)
470 except LookupError
as err:
471 raise LookupError(f
"Error expanding data ID for dataset type {datasetType.name}.")
from err
472 for collection
in collections:
473 self.task.registry.associate(collection, [dataset.ref
for dataset
in datasetsForType])
475 def getButler(self, datasetTypeName: str) -> Tuple[Butler3, List[str]]:
476 """Create a new Gen3 Butler appropriate for a particular dataset type. 478 This should be used exclusively by subclasses when obtaining a butler 479 to use for dataset ingest (`ConvertRepoTask.butler3` should never be 484 datasetTypeName : `str` 485 Name of the dataset type. 489 butler : `lsst.daf.butler.Butler` 490 Gen3 Butler instance appropriate for ingesting the given dataset 492 collections : `list` of `str` 493 Collections the dataset should be associated with, in addition to 494 the one used to define the `lsst.daf.butler.Run` used in 497 if datasetTypeName
in self.
task.config.collections:
499 Butler3(butler=self.
task.butler3, run=self.
task.config.collections[datasetTypeName]),
508 raise LookupError(
"No collection configured for dataset type {datasetTypeName}.")
510 def _extractDatasetRef(self, fileNameInRoot: str) -> Optional[DatasetRef]:
511 """Extract a `DatasetRef` from a file name. 513 This method is for internal use by `RepoConverter` itself (not its 518 fileNameInRoot : `str` 519 Name of the file to be ingested, relative to the repository root. 523 ref : `lsst.daf.butler.DatasetRef` or `None` 524 Reference for the Gen3 datasets, including a complete `DatasetType` 525 and data ID. `None` if the converter does not recognize the 526 file as one to be converted. 528 def closure(extractor):
530 dataId = extractor.apply(fileNameInRoot)
531 except LookupError
as err:
532 raise RuntimeError(f
"Error extracting data ID for {extractor.datasetType.name} " 533 f
"on file {fileNameInRoot}.")
from err
537 return DatasetRef(extractor.datasetType, dataId=dataId)
538 return self._extractors.apply(closure)
540 def _handleUnrecognizedFile(self, fileNameInRoot: str):
541 """Generate appropriate warnings (or not) for files not matched by 542 `_extractDatasetRef`. 544 This method is for internal use by `RepoConverter` itself (not its 549 fileNameInRoot : `str` 550 Name of the file, relative to the repository root. 552 def closure(skipTuple):
553 parser, datasetTypeName, message = skipTuple
554 if parser(fileNameInRoot)
is not None:
555 if message
is not None:
556 self.
task.log.warn(
"Skipping dataset %s file %s: %s", datasetTypeName,
557 fileNameInRoot, message)
560 if not self._skipParsers.apply(closure):
561 self.
task.log.warn(
"Skipping unrecognized file %s.", fileNameInRoot)
563 def _guessStorageClass(self, datasetTypeName: str, mapping: CameraMapperMapping
564 ) -> Optional[StorageClass]:
565 """Infer the Gen3 `StorageClass` from a dataset from a combination of 566 configuration and Gen2 dataset type information. 568 datasetTypeName: `str` 569 Name of the dataset type. 570 mapping : `lsst.obs.base.mapping.Mapping` 571 Mapping object used by the Gen2 `CameraMapper` to describe the 574 storageClassName = self.
task.config.storageClasses.get(datasetTypeName)
575 if storageClassName
is None and mapping.python
is not None:
576 storageClassName = self.
task.config.storageClasses.get(mapping.python,
None)
577 if storageClassName
is None and mapping.persistable
is not None:
578 storageClassName = self.
task.config.storageClasses.get(mapping.persistable,
None)
579 if storageClassName
is None and mapping.python
is not None:
580 unqualified = mapping.python.split(
".")[-1]
581 storageClassName = self.
task.config.storageClasses.get(unqualified,
None)
582 if storageClassName
is not None:
583 storageClass = self.
task.butler3.storageClasses.getStorageClass(storageClassName)
586 storageClass = self.
task.butler3.storageClasses.getStorageClass(mapping.persistable)
589 if storageClass
is None and mapping.python
is not None:
591 storageClass = self.
task.butler3.storageClasses.getStorageClass(unqualified)
594 if storageClass
is None:
595 self.
task.log.debug(
"No StorageClass found for %s; skipping.", datasetTypeName)
597 self.
task.log.debug(
"Using StorageClass %s for %s.", storageClass.name, datasetTypeName)
603 task: ConvertRepoTask
604 """The parent task that constructed and uses this converter 609 """Root path to the Gen2 repository this converter manages (`str`). 611 This is a complete path, not relative to some other repository root. 614 subset: Optional[ConversionSubset]
615 """An object that represents a filter to be applied to the datasets that 616 are converted (`ConversionSubset` or `None`).
def _handleUnrecognizedFile
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def insertDimensionData(self)
A RangeSet is a set of unsigned 64 bit integers.
daf::base::PropertyList * list