LSSTApplications  20.0.0
LSSTDataManagementBasePackage
rootRepoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["RootRepoConverter"]
24 
25 import os
26 import re
27 import itertools
28 from typing import TYPE_CHECKING, Iterator, Optional, Tuple, List, Set
29 
30 from lsst.skymap import BaseSkyMap
31 from lsst.daf.butler import DatasetType, DatasetRef, DimensionGraph, FileDataset
32 from .standardRepoConverter import StandardRepoConverter
33 
34 SKYMAP_DATASET_TYPES = {
35  coaddName: f"{coaddName}Coadd_skyMap" for coaddName in ("deep", "goodSeeing", "dcr")
36 }
37 
38 if TYPE_CHECKING:
39  from lsst.daf.butler import SkyPixDimension
40 
41 
42 def getDataPaths(dataRefs):
43  """Strip HDU identifiers from paths and return a unique set of paths.
44 
45  Parameters
46  ----------
47  dataRefs : `lsst.daf.persistence.ButlerDataRef`
48  The gen2 datarefs to strip "[HDU]" values from.
49 
50  Returns
51  -------
52  paths : `set` [`str`]
53  The unique file paths without appended "[HDU]".
54  """
55  paths = set()
56  for dataRef in dataRefs:
57  path = dataRef.getUri()
58  # handle with FITS files with multiple HDUs (e.g. decam raw)
59  paths.add(path.split('[')[0])
60  return paths
61 
62 
64  """A specialization of `RepoConverter` for root data repositories.
65 
66  `RootRepoConverter` adds support for raw images (mostly delegated to the
67  parent task's `RawIngestTask` subtask) and reference catalogs.
68 
69  Parameters
70  ----------
71  kwds
72  Keyword arguments are forwarded to (and required by) `RepoConverter`.
73  """
74 
75  def __init__(self, **kwds):
76  super().__init__(run=None, **kwds)
77  self._refCats: List[Tuple[str, SkyPixDimension]] = []
78  if self.task.config.rootSkyMapName is not None:
79  self._rootSkyMap = self.task.config.skyMaps[self.task.config.rootSkyMapName].skyMap.apply()
80  else:
81  self._rootSkyMap = None # All access to _rootSkyMap is guarded
82  self._chain = {}
83  self._rawRefs = []
84 
85  def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
86  # Docstring inherited from RepoConverter.
87  return (
88  super().isDatasetTypeSpecial(datasetTypeName)
89  or datasetTypeName in ("raw", "ref_cat", "ref_cat_config")
90  # in Gen2, some of these are in the root repo, not a calib repo
91  or datasetTypeName in self.task.config.curatedCalibrations
92  )
93 
94  def getSpecialDirectories(self) -> List[str]:
95  # Docstring inherited from RepoConverter.
96  return super().getSpecialDirectories() + ["CALIB", "ref_cats", "rerun"]
97 
98  def findMatchingSkyMap(self, datasetTypeName: str) -> Tuple[Optional[BaseSkyMap], Optional[str]]:
99  # Docstring inherited from StandardRepoConverter.findMatchingSkyMap.
100  skyMap, name = super().findMatchingSkyMap(datasetTypeName)
101  if skyMap is None and self.task.config.rootSkyMapName is not None:
102  self.task.log.debug(
103  ("Assuming configured root skymap with name '%s' for dataset %s."),
104  self.task.config.rootSkyMapName, datasetTypeName
105  )
106  skyMap = self._rootSkyMap
107  name = self.task.config.rootSkyMapName
108  return skyMap, name
109 
110  def runRawIngest(self):
111  if self.task.raws is None:
112  return
113  self.task.log.info(f"Finding raws in root {self.root}.")
114  if self.subset is not None:
115  dataRefs = itertools.chain.from_iterable(
116  self.butler2.subset(self.task.config.rawDatasetType,
117  visit=visit) for visit in self.subset.visits
118  )
119  else:
120  dataRefs = self.butler2.subset(self.task.config.rawDatasetType)
121  dataPaths = getDataPaths(dataRefs)
122  self.task.log.info("Ingesting raws from root %s into run %s.", self.root, self.task.raws.butler.run)
123  self._rawRefs.extend(self.task.raws.run(dataPaths))
124  self._chain = {self.task.raws.butler.run: {self.task.raws.datasetType.name}}
125 
126  def runDefineVisits(self):
127  if self.task.defineVisits is None:
128  return
129  dimensions = DimensionGraph(self.task.universe, names=["exposure"])
130  exposureDataIds = set(ref.dataId.subset(dimensions) for ref in self._rawRefs)
131  self.task.log.info("Defining visits from exposures.")
132  self.task.defineVisits.run(exposureDataIds)
133 
134  def prep(self):
135  # Docstring inherited from RepoConverter.
136  # Gather information about reference catalogs.
137  if self.task.isDatasetTypeIncluded("ref_cat") and len(self.task.config.refCats) != 0:
138  from lsst.meas.algorithms import DatasetConfig as RefCatDatasetConfig
139  for refCat in os.listdir(os.path.join(self.root, "ref_cats")):
140  path = os.path.join(self.root, "ref_cats", refCat)
141  configFile = os.path.join(path, "config.py")
142  if not os.path.exists(configFile):
143  continue
144  if refCat not in self.task.config.refCats:
145  continue
146  self.task.log.info(f"Preparing ref_cat {refCat} from root {self.root}.")
147  onDiskConfig = RefCatDatasetConfig()
148  onDiskConfig.load(configFile)
149  if onDiskConfig.indexer.name != "HTM":
150  raise ValueError(f"Reference catalog '{refCat}' uses unsupported "
151  f"pixelization '{onDiskConfig.indexer.name}'.")
152  level = onDiskConfig.indexer["HTM"].depth
153  try:
154  dimension = self.task.universe[f"htm{level}"]
155  except KeyError as err:
156  raise ValueError(f"Reference catalog {refCat} uses HTM level {level}, but no htm{level} "
157  f"skypix dimension is configured for this registry.") from err
158  self.task.useSkyPix(dimension)
159  self._refCats.append((refCat, dimension))
160  if self.task.isDatasetTypeIncluded("brightObjectMask") and self.task.config.rootSkyMapName:
161  self.task.useSkyMap(self._rootSkyMap, self.task.config.rootSkyMapName)
162  super().prep()
163 
164  def iterDatasets(self) -> Iterator[FileDataset]:
165  # Docstring inherited from RepoConverter.
166  # Iterate over reference catalog files.
167  for refCat, dimension in self._refCats:
168  datasetType = DatasetType(refCat, dimensions=[dimension], universe=self.task.universe,
169  storageClass="SimpleCatalog")
170  if self.subset is None:
171  regex = re.compile(r"(\d+)\.fits")
172  for fileName in os.listdir(os.path.join(self.root, "ref_cats", refCat)):
173  m = regex.match(fileName)
174  if m is not None:
175  htmId = int(m.group(1))
176  dataId = self.task.registry.expandDataId({dimension: htmId})
177  yield FileDataset(path=os.path.join(self.root, "ref_cats", refCat, fileName),
178  refs=DatasetRef(datasetType, dataId))
179  else:
180  for begin, end in self.subset.skypix[dimension]:
181  for htmId in range(begin, end):
182  dataId = self.task.registry.expandDataId({dimension: htmId})
183  yield FileDataset(path=os.path.join(self.root, "ref_cats", refCat, f"{htmId}.fits"),
184  refs=DatasetRef(datasetType, dataId))
185  yield from super().iterDatasets()
186 
187  def getRun(self, datasetTypeName: str) -> str:
188  # Docstring inherited from RepoConverter.
189  run = self.task.config.runs[datasetTypeName]
190  self._chain.setdefault(run, set()).add(datasetTypeName)
191  return run
192 
193  def getCollectionChain(self) -> List[Tuple[str, Set[str]]]:
194  """Return tuples of run name and associated dataset type names that
195  can be used to construct a chained collection that refers to the
196  converted root repository (`list` [ `tuple` ]).
197  """
198  return list(self._chain.items())
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter._rootSkyMap
_rootSkyMap
Definition: rootRepoConverter.py:79
lsst.obs.base.gen2to3.repoConverter.RepoConverter.root
root
Definition: repoConverter.py:207
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.iterDatasets
Iterator[FileDataset] iterDatasets(self)
Definition: rootRepoConverter.py:164
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter._chain
_chain
Definition: rootRepoConverter.py:82
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.runRawIngest
def runRawIngest(self)
Definition: rootRepoConverter.py:110
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.runDefineVisits
def runDefineVisits(self)
Definition: rootRepoConverter.py:126
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.findMatchingSkyMap
Tuple[Optional[BaseSkyMap], Optional[str]] findMatchingSkyMap(self, str datasetTypeName)
Definition: rootRepoConverter.py:98
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.__init__
def __init__(self, **kwds)
Definition: rootRepoConverter.py:75
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter
Definition: rootRepoConverter.py:63
lsst.obs.base.gen2to3.repoConverter.RepoConverter.task
task
Definition: repoConverter.py:206
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.getCollectionChain
List[Tuple[str, Set[str]]] getCollectionChain(self)
Definition: rootRepoConverter.py:193
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.getSpecialDirectories
List[str] getSpecialDirectories(self)
Definition: rootRepoConverter.py:94
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.prep
def prep(self)
Definition: rootRepoConverter.py:134
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter._rawRefs
_rawRefs
Definition: rootRepoConverter.py:83
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.getRun
str getRun(self, str datasetTypeName)
Definition: rootRepoConverter.py:187
items
std::vector< SchemaItem< Flag > > * items
Definition: BaseColumnView.cc:142
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.skymap
Definition: __init__.py:1
lsst.obs.base.gen2to3.standardRepoConverter.StandardRepoConverter
Definition: standardRepoConverter.py:77
lsst.obs.base.gen2to3.rootRepoConverter.RootRepoConverter.isDatasetTypeSpecial
bool isDatasetTypeSpecial(self, str datasetTypeName)
Definition: rootRepoConverter.py:85
lsst::meas::algorithms
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
Definition: CoaddBoundedField.h:34
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.obs.base.gen2to3.repoConverter.RepoConverter.subset
subset
Definition: repoConverter.py:208
lsst.obs.base.gen2to3.rootRepoConverter.getDataPaths
def getDataPaths(dataRefs)
Definition: rootRepoConverter.py:42
lsst.obs.base.gen2to3.standardRepoConverter.StandardRepoConverter.butler2
butler2
Definition: standardRepoConverter.py:91