21 from __future__
import annotations
23 __all__ = [
"ConvertRepoConfig",
"ConvertRepoTask",
"ConvertRepoSkyMapConfig",
"Rerun"]
27 from dataclasses
import dataclass
28 from multiprocessing
import Pool
29 from typing
import Iterable, Optional, List, Dict
31 from lsst.daf.butler
import (
36 from lsst.pex.config import Config, ConfigurableField, ConfigDictField, DictField, ListField, Field
40 from ..ingest
import RawIngestTask
41 from ..defineVisits
import DefineVisitsTask
42 from .repoConverter
import ConversionSubset
43 from .rootRepoConverter
import RootRepoConverter
44 from .calibRepoConverter
import CalibRepoConverter
45 from .standardRepoConverter
import StandardRepoConverter
46 from .._instrument
import Instrument
51 """Struct containing information about a skymap that may appear in a Gen2
56 """Name of the skymap used in Gen3 data IDs.
60 """Hash computed by `BaseSkyMap.getSha1`.
64 """Name of the skymap used in Gen3 data IDs.
68 """Whether this skymap has been found in at least one repository being
75 """Specification for a Gen2 processing-output repository to convert.
79 """Absolute or relative (to the root repository) path to the Gen2
84 """Name of the `~lsst.daf.butler.CollectionType.RUN` collection datasets
85 will be inserted into (`str`).
88 chainName: Optional[str]
89 """Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that will
90 combine this repository's datasets with those of its parent repositories
95 """Collection names associated with parent repositories, used to define the
96 chained collection (`list` [ `str` ]).
98 Ignored if `chainName` is `None`. Runs used in the root repo are
99 automatically included.
104 """Sub-config used to hold the parameters of a SkyMap.
108 This config only needs to exist because we can't put a
109 `~lsst.pex.config.RegistryField` directly inside a
110 `~lsst.pex.config.ConfigDictField`.
112 It needs to have its only field named "skyMap" for compatibility with the
113 configuration of `lsst.pipe.tasks.MakeSkyMapTask`, which we want so we can
114 use one config file in an obs package to configure both.
116 This name leads to unfortunate repetition with the field named
117 "skymap" that holds it - "skyMap[name].skyMap" - but that seems
120 skyMap = skyMapRegistry.makeField(
121 doc=
"Type and parameters for the SkyMap itself.",
128 "Configuration for subtask responsible for ingesting raws and adding "
129 "exposure dimension entries.",
130 target=RawIngestTask,
133 "Configuration for the subtask responsible for defining visits from "
135 target=DefineVisitsTask,
138 "Mapping from Gen3 skymap name to the parameters used to construct a "
139 "BaseSkyMap instance. This will be used to associate names with "
140 "existing skymaps found in the Gen2 repo.",
142 itemtype=ConvertRepoSkyMapConfig,
146 "Name of a Gen3 skymap (an entry in ``self.skyMaps``) to assume for "
147 "datasets in the root repository when no SkyMap is found there. ",
153 "A mapping from dataset type name to the RUN collection they should "
154 "be inserted into. This must include all datasets that can be found "
155 "in the root repository; other repositories will use per-repository "
160 "deepCoadd_skyMap":
"skymaps",
164 "Like ``runs``, but is used even when the dataset is present in a "
165 "non-root repository (i.e. rerun), overriding the non-root "
166 "repository's main collection.",
170 "brightObjectMask":
"masks",
174 "Mapping from dataset type name or Gen2 policy entry (e.g. 'python' "
175 "or 'persistable') to the Gen3 StorageClass name.",
182 "defects":
"Defects",
183 "crosstalk":
"CrosstalkCalib",
184 "BaseSkyMap":
"SkyMap",
185 "BaseCatalog":
"Catalog",
186 "BackgroundList":
"Background",
188 "MultilevelParquetTable":
"DataFrame",
189 "ParquetTable":
"DataFrame",
194 "Mapping from dataset type name to formatter class. "
195 "By default these are derived from the formatters listed in the"
196 " Gen3 datastore configuration.",
202 "Mapping from dataset type name to target handler class.",
208 "If True (default), add dimension records for the Instrument and its "
209 "filters and detectors to the registry instead of assuming they are "
215 "If True (default), ingest human-curated calibrations directly via "
216 "the Instrument interface. Note that these calibrations are never "
217 "converted from Gen2 repositories.",
222 "The names of reference catalogs (subdirectories under ref_cats) to "
228 "Filename globs that should be ignored instead of being treated as "
231 default=[
"README.txt",
"*~?",
"butler.yaml",
"gen3.sqlite3",
232 "registry.sqlite3",
"calibRegistry.sqlite3",
"_mapper",
233 "_parent",
"repositoryCfg.yaml"]
236 "Gen2 dataset type to use for raw data.",
241 "Glob-style patterns for dataset type names that should be converted.",
246 "Glob-style patterns for dataset type names that should not be "
247 "converted despite matching a pattern in datasetIncludePatterns.",
252 "Key used for the Gen2 equivalent of 'detector' in data IDs.",
257 "If True (default), only convert datasets that are related to the "
258 "ingested visits. Ignored unless a list of visits is passed to "
266 return self.
raws.transfer
270 self.
raws.transfer = value
280 """A task that converts one or more related Gen2 data repositories to a
281 single Gen3 data repository (with multiple collections).
285 config: `ConvertRepoConfig`
286 Configuration for this task.
287 butler3: `lsst.daf.butler.Butler`
288 A writeable Gen3 Butler instance that represents the data repository
289 that datasets will be ingested into. If the 'raw' dataset is
290 configured to be included in the conversion, ``butler3.run`` should be
291 set to the name of the collection raws should be ingested into, and
292 ``butler3.collections`` should include a calibration collection from
293 which the ``camera`` dataset can be loaded, unless a calibration repo
294 is converted and ``doWriteCuratedCalibrations`` is `True`.
295 instrument : `lsst.obs.base.Instrument`
296 The Gen3 instrument that should be used for this conversion.
298 Other keyword arguments are forwarded to the `Task` constructor.
302 Most of the work of converting repositories is delegated to instances of
303 the `RepoConverter` hierarchy. The `ConvertRepoTask` instance itself holds
304 only state that is relevant for all Gen2 repositories being ingested, while
305 each `RepoConverter` instance holds only state relevant for the conversion
306 of a single Gen2 repository. Both the task and the `RepoConverter`
307 instances are single use; `ConvertRepoTask.run` and most `RepoConverter`
308 methods may only be called once on a particular instance.
311 ConfigClass = ConvertRepoConfig
313 _DefaultName =
"convertRepo"
315 def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, **kwargs):
330 for name, config
in self.
config.skyMaps.items():
331 instance = config.skyMap.apply()
337 def _reduce_kwargs(self):
339 return dict(**super()._reduce_kwargs(), butler3=self.
butler3, instrument=self.
instrument)
341 def _populateSkyMapDicts(self, name, instance):
342 struct =
ConfiguredSkyMap(name=name, sha1=instance.getSha1(), instance=instance)
347 """Return `True` if configuration indicates that the given dataset type
350 This method is intended to be called primarily by the
351 `RepoConverter` instances used interally by the task.
356 Name of the dataset type.
361 Whether the dataset should be included in the conversion.
364 any(fnmatch.fnmatchcase(datasetTypeName, pattern)
365 for pattern
in self.
config.datasetIncludePatterns)
366 and not any(fnmatch.fnmatchcase(datasetTypeName, pattern)
367 for pattern
in self.
config.datasetIgnorePatterns)
370 def useSkyMap(self, skyMap: BaseSkyMap, skyMapName: str) -> str:
371 """Indicate that a repository uses the given SkyMap.
373 This method is intended to be called primarily by the
374 `RepoConverter` instances used interally by the task.
378 skyMap : `lsst.skymap.BaseSkyMap`
379 SkyMap instance being used, typically retrieved from a Gen2
382 The name of the gen2 skymap, for error reporting.
387 The name of the skymap in Gen3 data IDs.
392 Raised if the specified skymap cannot be found.
394 sha1 = skyMap.getSha1()
399 except KeyError
as err:
400 msg = f
"SkyMap '{skyMapName}' with sha1={sha1} not included in configuration."
401 raise LookupError(msg)
from err
406 """Register all skymaps that have been marked as used.
408 This method is intended to be called primarily by the
409 `RepoConverter` instances used interally by the task.
413 subset : `ConversionSubset`, optional
414 Object that will be used to filter converted datasets by data ID.
415 If given, it will be updated with the tracts of this skymap that
416 overlap the visits in the subset.
420 struct.instance.register(struct.name, self.
registry)
421 if subset
is not None and self.
config.relatedOnly:
422 subset.addSkyMap(self.
registry, struct.name)
425 """Indicate that a repository uses the given SkyPix dimension.
427 This method is intended to be called primarily by the
428 `RepoConverter` instances used interally by the task.
432 dimension : `lsst.daf.butler.SkyPixDimension`
433 Dimension represening a pixelization of the sky.
438 """Register all skymaps that have been marked as used.
440 This method is intended to be called primarily by the
441 `RepoConverter` instances used interally by the task.
445 subset : `ConversionSubset`, optional
446 Object that will be used to filter converted datasets by data ID.
447 If given, it will be updated with the pixelization IDs that
448 overlap the visits in the subset.
450 if subset
is not None and self.
config.relatedOnly:
452 subset.addSkyPix(self.
registry, dimension)
454 def run(self, root: str, *,
455 calibs: Dict[str, str] =
None,
457 visits: Optional[Iterable[int]] =
None,
458 pool: Optional[Pool] =
None,
460 """Convert a group of related data repositories.
465 Complete path to the root Gen2 data repository. This should be
466 a data repository that includes a Gen2 registry and any raw files
467 and/or reference catalogs.
469 Dictionary mapping calibration repository path to the
470 `~lsst.daf.butler.CollectionType.CALIBRATION` collection that
471 converted datasets within it should be certified into.
472 reruns : `list` of `Rerun`
473 Specifications for rerun (processing output) collections to
475 visits : iterable of `int`, optional
476 The integer IDs of visits to convert. If not provided, all visits
477 in the Gen2 root repository will be converted.
478 pool : `multiprocessing.Pool`, optional
479 If not `None`, a process pool with which to parallelize some
481 processes : `int`, optional
482 The number of processes to use for conversion.
484 if pool
is None and processes > 1:
485 pool = Pool(processes)
488 if visits
is not None:
491 if self.
config.relatedOnly:
492 self.
log.
warn(
"config.relatedOnly is True but all visits are being ingested; "
493 "no filtering will be done.")
499 converters.append(rootConverter)
500 for calibRoot, collection
in calibs.items():
501 if not os.path.isabs(calibRoot):
502 calibRoot = os.path.join(rootConverter.root, calibRoot)
505 mapper=rootConverter.mapper,
506 subset=rootConverter.subset)
507 converters.append(converter)
511 if not os.path.isabs(runRoot):
512 runRoot = os.path.join(rootConverter.root, runRoot)
514 instrument=self.
instrument, subset=rootConverter.subset)
515 converters.append(converter)
516 rerunConverters[spec.runName] = converter
519 if self.
config.doRegisterInstrument:
524 rootConverter.runRawIngest(pool=pool)
532 if self.
config.doWriteCuratedCalibrations:
533 butler3 = Butler3(butler=self.
butler3)
536 calibCollections =
set()
537 for collection
in calibs.values():
539 calibCollections.add(collection)
547 defaultCalibCollection = self.
instrument.makeCollectionName(
"calib")
548 if defaultCalibCollection
not in calibCollections:
553 rootConverter.runDefineVisits(pool=pool)
556 for converter
in converters:
570 for converter
in converters:
571 converter.findDatasets()
574 for converter
in converters:
575 converter.expandDataIds()
578 for converter
in converters:
582 for converter
in converters:
587 if spec.chainName
is not None:
588 self.
butler3.registry.registerCollection(spec.chainName, type=CollectionType.CHAINED)
589 chain = [spec.runName]
590 chain.extend(rerunConverters[spec.runName].getCollectionChain())
591 for parent
in spec.parents:
593 parentConverter = rerunConverters.get(parent)
594 if parentConverter
is not None:
595 chain.extend(parentConverter.getCollectionChain())
596 chain.extend(rootConverter.getCollectionChain())
597 self.
log.
info(
"Defining %s from chain %s.", spec.chainName, chain)
598 self.
butler3.registry.setCollectionChain(spec.chainName, chain)