21 from __future__
import annotations
23 __all__ = [
"RootRepoConverter"]
28 from typing
import TYPE_CHECKING, Iterator, Optional, Tuple, List, Set
31 from lsst.daf.butler
import DatasetType, DatasetRef, DimensionGraph, FileDataset
32 from .standardRepoConverter
import StandardRepoConverter
34 SKYMAP_DATASET_TYPES = {
35 coaddName: f
"{coaddName}Coadd_skyMap" for coaddName
in (
"deep",
"goodSeeing",
"dcr")
39 from lsst.daf.butler
import SkyPixDimension
43 """Strip HDU identifiers from paths and return a unique set of paths.
47 dataRefs : `lsst.daf.persistence.ButlerDataRef`
48 The gen2 datarefs to strip "[HDU]" values from.
53 The unique file paths without appended "[HDU]".
56 for dataRef
in dataRefs:
57 path = dataRef.getUri()
59 paths.add(path.split(
'[')[0])
64 """A specialization of `RepoConverter` for root data repositories.
66 `RootRepoConverter` adds support for raw images (mostly delegated to the
67 parent task's `RawIngestTask` subtask) and reference catalogs.
72 Keyword arguments are forwarded to (and required by) `RepoConverter`.
77 self._refCats: List[Tuple[str, SkyPixDimension]] = []
78 if self.
task.config.rootSkyMapName
is not None:
89 or datasetTypeName
in (
"raw",
"ref_cat",
"ref_cat_config")
91 or datasetTypeName
in self.
task.config.curatedCalibrations
98 def findMatchingSkyMap(self, datasetTypeName: str) -> Tuple[Optional[BaseSkyMap], Optional[str]]:
101 if skyMap
is None and self.
task.config.rootSkyMapName
is not None:
103 (
"Assuming configured root skymap with name '%s' for dataset %s."),
104 self.
task.config.rootSkyMapName, datasetTypeName
107 name = self.
task.config.rootSkyMapName
111 if self.
task.raws
is None:
113 self.
task.log.info(f
"Finding raws in root {self.root}.")
114 if self.
subset is not None:
115 dataRefs = itertools.chain.from_iterable(
117 visit=visit)
for visit
in self.
subset.visits
122 self.
task.log.info(
"Ingesting raws from root %s into run %s.", self.
root, self.
task.raws.butler.run)
124 self.
_chain = {self.
task.raws.butler.run: {self.
task.raws.datasetType.name}}
127 if self.
task.defineVisits
is None:
129 dimensions = DimensionGraph(self.
task.universe, names=[
"exposure"])
130 exposureDataIds =
set(ref.dataId.subset(dimensions)
for ref
in self.
_rawRefs)
131 self.
task.log.info(
"Defining visits from exposures.")
132 self.
task.defineVisits.run(exposureDataIds)
137 if self.
task.isDatasetTypeIncluded(
"ref_cat")
and len(self.
task.config.refCats) != 0:
139 for refCat
in os.listdir(os.path.join(self.
root,
"ref_cats")):
140 path = os.path.join(self.
root,
"ref_cats", refCat)
141 configFile = os.path.join(path,
"config.py")
142 if not os.path.exists(configFile):
144 if refCat
not in self.
task.config.refCats:
146 self.
task.log.info(f
"Preparing ref_cat {refCat} from root {self.root}.")
147 onDiskConfig = RefCatDatasetConfig()
148 onDiskConfig.load(configFile)
149 if onDiskConfig.indexer.name !=
"HTM":
150 raise ValueError(f
"Reference catalog '{refCat}' uses unsupported "
151 f
"pixelization '{onDiskConfig.indexer.name}'.")
152 level = onDiskConfig.indexer[
"HTM"].depth
154 dimension = self.
task.universe[f
"htm{level}"]
155 except KeyError
as err:
156 raise ValueError(f
"Reference catalog {refCat} uses HTM level {level}, but no htm{level} "
157 f
"skypix dimension is configured for this registry.")
from err
158 self.
task.useSkyPix(dimension)
159 self._refCats.
append((refCat, dimension))
160 if self.
task.isDatasetTypeIncluded(
"brightObjectMask")
and self.
task.config.rootSkyMapName:
167 for refCat, dimension
in self._refCats:
168 datasetType = DatasetType(refCat, dimensions=[dimension], universe=self.
task.universe,
169 storageClass=
"SimpleCatalog")
171 regex = re.compile(
r"(\d+)\.fits")
172 for fileName
in os.listdir(os.path.join(self.
root,
"ref_cats", refCat)):
173 m = regex.match(fileName)
175 htmId = int(m.group(1))
176 dataId = self.
task.registry.expandDataId({dimension: htmId})
177 yield FileDataset(path=os.path.join(self.
root,
"ref_cats", refCat, fileName),
178 refs=DatasetRef(datasetType, dataId))
180 for begin, end
in self.
subset.skypix[dimension]:
181 for htmId
in range(begin, end):
182 dataId = self.
task.registry.expandDataId({dimension: htmId})
183 yield FileDataset(path=os.path.join(self.
root,
"ref_cats", refCat, f
"{htmId}.fits"),
184 refs=DatasetRef(datasetType, dataId))
187 def getRun(self, datasetTypeName: str) -> str:
189 run = self.
task.config.runs[datasetTypeName]
190 self.
_chain.setdefault(run,
set()).add(datasetTypeName)
194 """Return tuples of run name and associated dataset type names that
195 can be used to construct a chained collection that refers to the
196 converted root repository (`list` [ `tuple` ]).