21 from __future__
import annotations
23 __all__ = [
"RootRepoConverter"]
28 from typing
import TYPE_CHECKING, Dict, Iterator, Mapping, Optional, Tuple, List
31 from lsst.daf.butler
import CollectionType, DatasetType, DatasetRef, DimensionGraph, FileDataset
32 from .standardRepoConverter
import StandardRepoConverter
34 SKYMAP_DATASET_TYPES = {
35 coaddName: f
"{coaddName}Coadd_skyMap" for coaddName
in (
"deep",
"goodSeeing",
"dcr")
39 from lsst.daf.butler
import SkyPixDimension
43 """Strip HDU identifiers from paths and return a unique set of paths.
47 dataRefs : `lsst.daf.persistence.ButlerDataRef`
48 The gen2 datarefs to strip "[HDU]" values from.
53 The unique file paths without appended "[HDU]".
56 for dataRef
in dataRefs:
57 path = dataRef.getUri()
59 paths.add(path.split(
'[')[0])
64 """A specialization of `RepoConverter` for root data repositories.
66 `RootRepoConverter` adds support for raw images (mostly delegated to the
67 parent task's `RawIngestTask` subtask) and reference catalogs.
72 Keyword arguments are forwarded to (and required by) `RepoConverter`.
77 self._refCats: Dict[str, SkyPixDimension] = {}
78 if self.
tasktask.config.rootSkyMapName
is not None:
79 self.
_rootSkyMap_rootSkyMap = self.
tasktask.config.skyMaps[self.
tasktask.config.rootSkyMapName].skyMap.apply()
88 or datasetTypeName
in (
"raw",
"ref_cat",
"ref_cat_config")
90 or datasetTypeName
in self.
instrumentinstrument.getCuratedCalibrationNames()
97 def findMatchingSkyMap(self, datasetTypeName: str) -> Tuple[Optional[BaseSkyMap], Optional[str]]:
100 if skyMap
is None and self.
tasktask.config.rootSkyMapName
is not None:
101 self.
tasktask.log.debug(
102 (
"Assuming configured root skymap with name '%s' for dataset %s."),
103 self.
tasktask.config.rootSkyMapName, datasetTypeName
106 name = self.
tasktask.config.rootSkyMapName
110 if self.
tasktask.raws
is None:
112 self.
tasktask.log.info(f
"Finding raws in root {self.root}.")
113 if self.
subsetsubset
is not None:
114 dataRefs = itertools.chain.from_iterable(
116 visit=visit)
for visit
in self.
subsetsubset.visits
121 self.
tasktask.log.info(
"Ingesting raws from root %s into run %s.", self.
rootroot, self.
tasktask.raws.butler.run)
122 self.
_rawRefs_rawRefs.extend(self.
tasktask.raws.run(dataPaths, pool=pool))
126 if self.
tasktask.defineVisits
is None:
128 dimensions = DimensionGraph(self.
tasktask.universe, names=[
"exposure"])
129 exposureDataIds =
set(ref.dataId.subset(dimensions)
for ref
in self.
_rawRefs_rawRefs)
130 self.
tasktask.log.info(
"Defining visits from exposures.")
131 self.
tasktask.defineVisits.run(exposureDataIds, pool=pool)
136 if self.
tasktask.isDatasetTypeIncluded(
"ref_cat")
and len(self.
tasktask.config.refCats) != 0:
138 for refCat
in os.listdir(os.path.join(self.
rootroot,
"ref_cats")):
139 path = os.path.join(self.
rootroot,
"ref_cats", refCat)
140 configFile = os.path.join(path,
"config.py")
141 if not os.path.exists(configFile):
143 if refCat
not in self.
tasktask.config.refCats:
145 self.
tasktask.log.info(f
"Preparing ref_cat {refCat} from root {self.root}.")
146 onDiskConfig = RefCatDatasetConfig()
147 onDiskConfig.load(configFile)
148 if onDiskConfig.indexer.name !=
"HTM":
149 raise ValueError(f
"Reference catalog '{refCat}' uses unsupported "
150 f
"pixelization '{onDiskConfig.indexer.name}'.")
151 level = onDiskConfig.indexer[
"HTM"].depth
153 dimension = self.
tasktask.universe[f
"htm{level}"]
154 except KeyError
as err:
155 raise ValueError(f
"Reference catalog {refCat} uses HTM level {level}, but no htm{level} "
156 f
"skypix dimension is configured for this registry.")
from err
157 self.
tasktask.useSkyPix(dimension)
158 self._refCats[refCat] = dimension
159 if self.
tasktask.isDatasetTypeIncluded(
"brightObjectMask")
and self.
tasktask.config.rootSkyMapName:
166 for refCat, dimension
in self._refCats.
items():
167 datasetType = DatasetType(refCat, dimensions=[dimension], universe=self.
tasktask.universe,
168 storageClass=
"SimpleCatalog")
169 if self.
subsetsubset
is None:
170 regex = re.compile(
r"(\d+)\.fits")
171 for fileName
in os.listdir(os.path.join(self.
rootroot,
"ref_cats", refCat)):
172 m = regex.match(fileName)
174 htmId = int(m.group(1))
175 dataId = self.
tasktask.registry.expandDataId({dimension: htmId})
176 yield FileDataset(path=os.path.join(self.
rootroot,
"ref_cats", refCat, fileName),
177 refs=DatasetRef(datasetType, dataId))
179 for begin, end
in self.
subsetsubset.skypix[dimension]:
180 for htmId
in range(begin, end):
181 dataId = self.
tasktask.registry.expandDataId({dimension: htmId})
182 yield FileDataset(path=os.path.join(self.
rootroot,
"ref_cats", refCat, f
"{htmId}.fits"),
183 refs=DatasetRef(datasetType, dataId))
186 def getRun(self, datasetTypeName: str, calibDate: Optional[str] =
None) -> str:
188 if datasetTypeName
in self._refCats:
189 return self.
instrumentinstrument.makeRefCatCollectionName(
"gen2")
190 return super().
getRun(datasetTypeName, calibDate)
192 def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
194 super()._finish(datasets)
202 chained = self.
instrumentinstrument.makeRefCatCollectionName()
203 child = self.
instrumentinstrument.makeRefCatCollectionName(
"gen2")
204 self.
tasktask.registry.registerCollection(chained, CollectionType.CHAINED)
205 self.
tasktask.registry.registerCollection(child, CollectionType.RUN)
206 children =
list(self.
tasktask.registry.getCollectionChain(chained))
207 children.append(child)
208 self.
tasktask.registry.setCollectionChain(chained, children)
std::vector< SchemaItem< Flag > > * items
bool isDatasetTypeSpecial(self, str datasetTypeName)
List[str] getSpecialDirectories(self)
str getRun(self, str datasetTypeName, Optional[str] calibDate=None)
def runRawIngest(self, pool=None)
Iterator[FileDataset] iterDatasets(self)
def __init__(self, **kwds)
def runDefineVisits(self, pool=None)
Tuple[Optional[BaseSkyMap], Optional[str]] findMatchingSkyMap(self, str datasetTypeName)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
def getDataPaths(dataRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set