21 from __future__
import annotations
23 __all__ = [
"CalibRepo",
"ConvertRepoConfig",
"ConvertRepoTask",
"ConvertRepoSkyMapConfig",
"Rerun"]
27 from dataclasses
import dataclass
28 from multiprocessing
import Pool
29 from typing
import Iterable, Optional, List, Tuple
31 from lsst.daf.butler
import (
37 from lsst.pex.config import Config, ConfigurableField, ConfigDictField, DictField, ListField, Field
41 from ..ingest
import RawIngestTask
42 from ..defineVisits
import DefineVisitsTask
43 from .repoConverter
import ConversionSubset
44 from .rootRepoConverter
import RootRepoConverter
45 from .calibRepoConverter
import CalibRepoConverter
46 from .standardRepoConverter
import StandardRepoConverter
47 from .._instrument
import Instrument
52 """Struct containing information about a skymap that may appear in a Gen2
57 """Name of the skymap used in Gen3 data IDs.
61 """Hash computed by `BaseSkyMap.getSha1`.
65 """Name of the skymap used in Gen3 data IDs.
69 """Whether this skymap has been found in at least one repository being
74 def _dropPrefix(s: str, prefix: str) -> Tuple[str, bool]:
75 """If ``s`` starts with ``prefix``, return the rest of ``s`` and `True`.
76 Otherwise return ``s`` and `False`.
78 if s.startswith(prefix):
79 return s[len(prefix):],
True
85 """Specification for a Gen2 processing-output repository to convert.
89 """Absolute or relative (to the root repository) path to the Gen2
93 runName: Optional[str]
94 """Name of the `~lsst.daf.butler.CollectionType.RUN` collection datasets
95 will be inserted into (`str` or `None`).
97 If `None`, a name will be guessed by calling `guessCollectionNames`.
100 chainName: Optional[str]
101 """Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that will
102 combine this repository's datasets with those of its parent repositories
105 If `None`, a name will be guessed by calling `guessCollectionNames`.
109 """Collection names associated with parent repositories, used to define the
110 chained collection (`list` [ `str` ]).
112 Ignored if `chainName` is `None`. Runs used in the root repo are
113 automatically included.
117 """Update `runName` and `chainName` with guesses that match Gen3 naming
120 If `chainName` is not `None`, and `runName` is, `runName` will be set
121 from it. If `runName` is already set, nothing will be changed, and
122 if `chainName` is `None`, no chained collection will be created.
126 instrument : `Instrument`
127 Instrument object for the repository being converted.
129 Path to the root repository. If this is present at the start of
130 ``self.path``, it will be stripped as part of generating the run
136 Raised if the appropriate collection names cannot be inferred.
138 if self.
runNamerunName
is not None:
141 if os.path.isabs(self.path):
142 rerunURI = ButlerURI(self.path)
143 rootURI = ButlerURI(root)
144 chainName = rerunURI.relative_to(rootURI)
145 if chainName
is None:
147 f
"Cannot guess run name collection for rerun at '{self.path}': "
148 f
"no clear relationship to root '{root}'."
151 chainName = self.path
152 chainName, _ = _dropPrefix(chainName,
"rerun/")
153 chainName, isPersonal = _dropPrefix(chainName,
"private/")
155 chainName = f
"u/{chainName}"
157 chainName, _ = _dropPrefix(chainName,
"shared/")
158 chainName = instrument.makeCollectionName(
"runs", chainName)
160 self.
runNamerunName = f
"{self.chainName}/direct"
165 """Specification for a Gen2 calibration repository to convert.
169 """Absolute or relative (to the root repository) path to the Gen2
170 repository (`str` or `None`).
172 If `None`, no calibration datasets will be converted from Gen2, but
173 curated calibrations may still be written.
177 """If `True`, write curated calibrations into the associated
178 ``CALIBRATION`` collection (`bool`).
181 labels: Tuple[str, ...] = ()
182 """Extra strings to insert into collection names, including both the
183 ``RUN`` collections that datasets are ingested directly into and the
184 ``CALIBRATION`` collection that associates them with validity ranges.
186 An empty tuple will directly populate the default calibration collection
187 for this instrument with the converted datasets, and is incompatible with
188 ``default=False``. This is a good choice for test data repositories where
189 only one ``CALIBRATION`` collection will ever exist. In other cases, this
190 should be a non-empty tuple, so the default calibration collection can
191 actually be a ``CHAINED`` collection pointer that points to the current
192 recommended ``CALIBRATION`` collection.
196 """If `True`, the created ``CALIBRATION`` collection should be the default
199 This field may only be `True` for one converted calibration collection if
200 more than one is passed to `ConvertRepoTask.run`. It defaults to `True`
201 because the vast majority of the time only one calibration collection is
202 being converted. If ``labels`` is not empty, ``default=True`` will cause
203 a ``CHAINED`` collection that points to the converted ``CALIBRATION``
204 collection to be defined. If ``labels`` is empty, ``default`` *must* be
205 `True` and no ``CHAINED`` collection pointer is necessary.
209 if not self.labels
and not self.default:
210 raise ValueError(
"labels=() requires default=True")
214 """Sub-config used to hold the parameters of a SkyMap.
218 This config only needs to exist because we can't put a
219 `~lsst.pex.config.RegistryField` directly inside a
220 `~lsst.pex.config.ConfigDictField`.
222 It needs to have its only field named "skyMap" for compatibility with the
223 configuration of `lsst.pipe.tasks.MakeSkyMapTask`, which we want so we can
224 use one config file in an obs package to configure both.
226 This name leads to unfortunate repetition with the field named
227 "skymap" that holds it - "skyMap[name].skyMap" - but that seems
230 skyMap = skyMapRegistry.makeField(
231 doc=
"Type and parameters for the SkyMap itself.",
238 "Configuration for subtask responsible for ingesting raws and adding "
239 "exposure dimension entries.",
240 target=RawIngestTask,
243 "Configuration for the subtask responsible for defining visits from "
245 target=DefineVisitsTask,
248 "Mapping from Gen3 skymap name to the parameters used to construct a "
249 "BaseSkyMap instance. This will be used to associate names with "
250 "existing skymaps found in the Gen2 repo.",
252 itemtype=ConvertRepoSkyMapConfig,
256 "Name of a Gen3 skymap (an entry in ``self.skyMaps``) to assume for "
257 "datasets in the root repository when no SkyMap is found there. ",
263 "A mapping from dataset type name to the RUN collection they should "
264 "be inserted into. This must include all datasets that can be found "
265 "in the root repository; other repositories will use per-repository "
272 "Like ``runs``, but is used even when the dataset is present in a "
273 "non-root repository (i.e. rerun), overriding the non-root "
274 "repository's main collection.",
278 "brightObjectMask":
"masks",
282 "Mapping from dataset type name or Gen2 policy entry (e.g. 'python' "
283 "or 'persistable') to the Gen3 StorageClass name.",
290 "defects":
"Defects",
291 "crosstalk":
"CrosstalkCalib",
292 "BaseSkyMap":
"SkyMap",
293 "BaseCatalog":
"Catalog",
294 "BackgroundList":
"Background",
296 "MultilevelParquetTable":
"DataFrame",
297 "ParquetTable":
"DataFrame",
302 "Mapping from dataset type name to formatter class. "
303 "By default these are derived from the formatters listed in the"
304 " Gen3 datastore configuration.",
310 "Mapping from dataset type name to target handler class.",
316 "If True (default), add dimension records for the Instrument and its "
317 "filters and detectors to the registry instead of assuming they are "
323 "The names of reference catalogs (subdirectories under ref_cats) to "
329 "Filename globs that should be ignored instead of being treated as "
332 default=[
"README.txt",
"*~?",
"butler.yaml",
"gen3.sqlite3",
333 "registry.sqlite3",
"calibRegistry.sqlite3",
"_mapper",
334 "_parent",
"repositoryCfg.yaml"]
337 "Gen2 dataset type to use for raw data.",
342 "Glob-style patterns for dataset type names that should be converted.",
347 "Glob-style patterns for dataset type names that should not be "
348 "converted despite matching a pattern in datasetIncludePatterns.",
353 "Key used for the Gen2 equivalent of 'detector' in data IDs.",
358 "If True (default), only convert datasets that are related to the "
359 "ingested visits. Ignored unless a list of visits is passed to "
365 "If True (default), define an '<instrument>/defaults' CHAINED "
366 "collection that includes everything found in the root repo as well "
367 "as the default calibration collection.",
372 "Additional child collections to include in the umbrella collection. "
373 "Ignored if doMakeUmbrellaCollection=False.",
380 return self.
rawsraws.transfer
384 self.
rawsraws.transfer = value
391 """A task that converts one or more related Gen2 data repositories to a
392 single Gen3 data repository (with multiple collections).
396 config: `ConvertRepoConfig`
397 Configuration for this task.
398 butler3: `lsst.daf.butler.Butler`
399 A writeable Gen3 Butler instance that represents the data repository
400 that datasets will be ingested into. If the 'raw' dataset is
401 configured to be included in the conversion, ``butler3.run`` should be
402 set to the name of the collection raws should be ingested into, and
403 ``butler3.collections`` should include a calibration collection from
404 which the ``camera`` dataset can be loaded, unless a calibration repo
405 is converted and ``doWriteCuratedCalibrations`` is `True`.
406 instrument : `lsst.obs.base.Instrument`
407 The Gen3 instrument that should be used for this conversion.
409 Other keyword arguments are forwarded to the `Task` constructor.
413 Most of the work of converting repositories is delegated to instances of
414 the `RepoConverter` hierarchy. The `ConvertRepoTask` instance itself holds
415 only state that is relevant for all Gen2 repositories being ingested, while
416 each `RepoConverter` instance holds only state relevant for the conversion
417 of a single Gen2 repository. Both the task and the `RepoConverter`
418 instances are single use; `ConvertRepoTask.run` and most `RepoConverter`
419 methods may only be called once on a particular instance.
422 ConfigClass = ConvertRepoConfig
424 _DefaultName =
"convertRepo"
426 def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, **kwargs):
437 self.
makeSubtaskmakeSubtask(
"raws", butler=butler3)
438 self.
makeSubtaskmakeSubtask(
"defineVisits", butler=butler3)
445 for name, config
in self.
configconfig.skyMaps.items():
446 instance = config.skyMap.apply()
452 def _reduce_kwargs(self):
454 return dict(**super()._reduce_kwargs(), butler3=self.
butler3butler3, instrument=self.
instrumentinstrument)
456 def _populateSkyMapDicts(self, name, instance):
457 struct =
ConfiguredSkyMap(name=name, sha1=instance.getSha1(), instance=instance)
462 """Return `True` if configuration indicates that the given dataset type
465 This method is intended to be called primarily by the
466 `RepoConverter` instances used interally by the task.
471 Name of the dataset type.
476 Whether the dataset should be included in the conversion.
479 any(fnmatch.fnmatchcase(datasetTypeName, pattern)
480 for pattern
in self.
configconfig.datasetIncludePatterns)
481 and not any(fnmatch.fnmatchcase(datasetTypeName, pattern)
482 for pattern
in self.
configconfig.datasetIgnorePatterns)
485 def useSkyMap(self, skyMap: BaseSkyMap, skyMapName: str) -> str:
486 """Indicate that a repository uses the given SkyMap.
488 This method is intended to be called primarily by the
489 `RepoConverter` instances used interally by the task.
493 skyMap : `lsst.skymap.BaseSkyMap`
494 SkyMap instance being used, typically retrieved from a Gen2
497 The name of the gen2 skymap, for error reporting.
502 The name of the skymap in Gen3 data IDs.
507 Raised if the specified skymap cannot be found.
509 sha1 = skyMap.getSha1()
514 except KeyError
as err:
515 msg = f
"SkyMap '{skyMapName}' with sha1={sha1} not included in configuration."
516 raise LookupError(msg)
from err
521 """Register all skymaps that have been marked as used.
523 This method is intended to be called primarily by the
524 `RepoConverter` instances used interally by the task.
528 subset : `ConversionSubset`, optional
529 Object that will be used to filter converted datasets by data ID.
530 If given, it will be updated with the tracts of this skymap that
531 overlap the visits in the subset.
535 struct.instance.register(struct.name, self.
butler3butler3)
536 if subset
is not None and self.
configconfig.relatedOnly:
537 subset.addSkyMap(self.
registryregistry, struct.name)
540 """Indicate that a repository uses the given SkyPix dimension.
542 This method is intended to be called primarily by the
543 `RepoConverter` instances used interally by the task.
547 dimension : `lsst.daf.butler.SkyPixDimension`
548 Dimension represening a pixelization of the sky.
553 """Register all skymaps that have been marked as used.
555 This method is intended to be called primarily by the
556 `RepoConverter` instances used interally by the task.
560 subset : `ConversionSubset`, optional
561 Object that will be used to filter converted datasets by data ID.
562 If given, it will be updated with the pixelization IDs that
563 overlap the visits in the subset.
565 if subset
is not None and self.
configconfig.relatedOnly:
567 subset.addSkyPix(self.
registryregistry, dimension)
569 def run(self, root: str, *,
570 calibs: Optional[List[CalibRepo]] =
None,
571 reruns: Optional[List[Rerun]] =
None,
572 visits: Optional[Iterable[int]] =
None,
573 pool: Optional[Pool] =
None,
575 """Convert a group of related data repositories.
580 Complete path to the root Gen2 data repository. This should be
581 a data repository that includes a Gen2 registry and any raw files
582 and/or reference catalogs.
583 calibs : `list` of `CalibRepo`
584 Specifications for Gen2 calibration repos to convert. If `None`
585 (default), curated calibrations only will be written to the default
586 calibration collection for this instrument; set to ``()`` explictly
588 reruns : `list` of `Rerun`
589 Specifications for rerun (processing output) repos to convert. If
590 `None` (default), no reruns are converted.
591 visits : iterable of `int`, optional
592 The integer IDs of visits to convert. If not provided, all visits
593 in the Gen2 root repository will be converted.
594 pool : `multiprocessing.Pool`, optional
595 If not `None`, a process pool with which to parallelize some
597 processes : `int`, optional
598 The number of processes to use for conversion.
600 if pool
is None and processes > 1:
601 pool = Pool(processes)
604 if visits
is not None:
607 if self.
configconfig.relatedOnly:
608 self.
loglog.
warn(
"config.relatedOnly is True but all visits are being ingested; "
609 "no filtering will be done.")
614 defaultCalibRepos = [c.path
for c
in calibs
if c.default]
615 if len(defaultCalibRepos) > 1:
616 raise ValueError(f
"Multiple calib repos marked as default: {defaultCalibRepos}.")
623 converters.append(rootConverter)
626 calibRoot = spec.path
627 if calibRoot
is not None:
628 if not os.path.isabs(calibRoot):
629 calibRoot = os.path.join(rootConverter.root, calibRoot)
633 mapper=rootConverter.mapper,
634 subset=rootConverter.subset)
635 converters.append(converter)
644 if not os.path.isabs(runRoot):
645 runRoot = os.path.join(rootConverter.root, runRoot)
646 spec.guessCollectionNames(self.
instrumentinstrument, rootConverter.root)
648 instrument=self.
instrumentinstrument, subset=rootConverter.subset)
649 converters.append(converter)
650 rerunConverters[spec.runName] = converter
653 if self.
configconfig.doRegisterInstrument:
658 rootConverter.runRawIngest(pool=pool)
667 if spec.default
and spec.labels:
670 defaultCalibName = self.
instrumentinstrument.makeCalibrationCollectionName()
671 self.
butler3butler3.registry.registerCollection(defaultCalibName, CollectionType.CHAINED)
672 recommendedCalibName = self.
instrumentinstrument.makeCalibrationCollectionName(*spec.labels)
673 self.
butler3butler3.registry.registerCollection(recommendedCalibName, CollectionType.CALIBRATION)
674 self.
butler3butler3.registry.setCollectionChain(defaultCalibName, [recommendedCalibName])
678 rootConverter.runDefineVisits(pool=pool)
681 for converter
in converters:
695 for converter
in converters:
696 converter.findDatasets()
699 for converter
in converters:
700 converter.expandDataIds()
703 for converter
in converters:
707 for converter
in converters:
711 if self.
configconfig.doMakeUmbrellaCollection:
712 umbrella = self.
instrumentinstrument.makeUmbrellaCollectionName()
713 self.
registryregistry.registerCollection(umbrella, CollectionType.CHAINED)
714 children =
list(self.
registryregistry.getCollectionChain(umbrella))
715 children.extend(rootConverter.getCollectionChain())
716 children.append(self.
instrumentinstrument.makeCalibrationCollectionName())
717 if BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME
not in children:
720 self.
registryregistry.registerRun(BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME)
721 children.append(BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME)
722 children.extend(self.
configconfig.extraUmbrellaChildren)
723 self.
loglog.
info(
"Defining %s from chain %s.", umbrella, children)
724 self.
registryregistry.setCollectionChain(umbrella, children)
728 if spec.chainName
is not None:
729 self.
butler3butler3.registry.registerCollection(spec.chainName, type=CollectionType.CHAINED)
730 chain = [spec.runName]
731 chain.extend(rerunConverters[spec.runName].getCollectionChain())
732 for parent
in spec.parents:
734 parentConverter = rerunConverters.get(parent)
735 if parentConverter
is not None:
736 chain.extend(parentConverter.getCollectionChain())
737 chain.extend(rootConverter.getCollectionChain())
741 chain.append(self.
instrumentinstrument.makeCalibrationCollectionName(*calibs[0].labels))
742 self.
loglog.
info(
"Defining %s from chain %s.", spec.chainName, chain)
743 self.
butler3butler3.registry.setCollectionChain(spec.chainName, chain)
def transfer(self, value)
def __init__(self, config=None, *Butler3 butler3, Instrument instrument, **kwargs)
def isDatasetTypeIncluded(self, str datasetTypeName)
def registerUsedSkyPix(self, Optional[ConversionSubset] subset)
def run(self, str root, *Optional[List[CalibRepo]] calibs=None, Optional[List[Rerun]] reruns=None, Optional[Iterable[int]] visits=None, Optional[Pool] pool=None, int processes=1)
str useSkyMap(self, BaseSkyMap skyMap, str skyMapName)
def registerUsedSkyMaps(self, Optional[ConversionSubset] subset)
def _populateSkyMapDicts(self, name, instance)
def useSkyPix(self, SkyPixDimension dimension)
None guessCollectionNames(self, Instrument instrument, str root)
def makeSubtask(self, name, **keyArgs)
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def writeCuratedCalibrations(repo, instrument, collection, labels)
daf::base::PropertyList * list
daf::base::PropertySet * set