LSST Applications  21.0.0+75b29a8a7f,21.0.0+e70536a077,21.0.0-1-ga51b5d4+62c747d40b,21.0.0-10-gbfb87ad6+3307648ee3,21.0.0-15-gedb9d5423+47cba9fc36,21.0.0-2-g103fe59+fdf0863a2a,21.0.0-2-g1367e85+d38a93257c,21.0.0-2-g45278ab+e70536a077,21.0.0-2-g5242d73+d38a93257c,21.0.0-2-g7f82c8f+e682ffb718,21.0.0-2-g8dde007+d179fbfa6a,21.0.0-2-g8f08a60+9402881886,21.0.0-2-ga326454+e682ffb718,21.0.0-2-ga63a54e+08647d4b1b,21.0.0-2-gde069b7+26c92b3210,21.0.0-2-gecfae73+0445ed2f95,21.0.0-2-gfc62afb+d38a93257c,21.0.0-27-gbbd0d29+ae871e0f33,21.0.0-28-g5fc5e037+feb0e9397b,21.0.0-3-g21c7a62+f4b9c0ff5c,21.0.0-3-g357aad2+57b0bddf0b,21.0.0-3-g4be5c26+d38a93257c,21.0.0-3-g65f322c+3f454acf5d,21.0.0-3-g7d9da8d+75b29a8a7f,21.0.0-3-gaa929c8+9e4ef6332c,21.0.0-3-ge02ed75+4b120a55c4,21.0.0-4-g3300ddd+e70536a077,21.0.0-4-g591bb35+4b120a55c4,21.0.0-4-gc004bbf+4911b9cd27,21.0.0-4-gccdca77+f94adcd104,21.0.0-4-ge8fba5a+2b3a696ff9,21.0.0-5-gb155db7+2c5429117a,21.0.0-5-gdf36809+637e4641ee,21.0.0-6-g00874e7+c9fd7f7160,21.0.0-6-g4e60332+4b120a55c4,21.0.0-7-gc8ca178+40eb9cf840,21.0.0-8-gfbe0b4b+9e4ef6332c,21.0.0-9-g2fd488a+d83b7cd606,w.2021.05
LSST Data Management Base Package
rootRepoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["RootRepoConverter"]
24 
25 import os
26 import re
27 import itertools
28 from typing import TYPE_CHECKING, Dict, Iterator, Mapping, Optional, Tuple, List
29 
30 from lsst.skymap import BaseSkyMap
31 from lsst.daf.butler import CollectionType, DatasetType, DatasetRef, DimensionGraph, FileDataset
32 from .standardRepoConverter import StandardRepoConverter
33 
34 SKYMAP_DATASET_TYPES = {
35  coaddName: f"{coaddName}Coadd_skyMap" for coaddName in ("deep", "goodSeeing", "dcr")
36 }
37 
38 if TYPE_CHECKING:
39  from lsst.daf.butler import SkyPixDimension
40 
41 
42 def getDataPaths(dataRefs):
43  """Strip HDU identifiers from paths and return a unique set of paths.
44 
45  Parameters
46  ----------
47  dataRefs : `lsst.daf.persistence.ButlerDataRef`
48  The gen2 datarefs to strip "[HDU]" values from.
49 
50  Returns
51  -------
52  paths : `set` [`str`]
53  The unique file paths without appended "[HDU]".
54  """
55  paths = set()
56  for dataRef in dataRefs:
57  path = dataRef.getUri()
58  # handle with FITS files with multiple HDUs (e.g. decam raw)
59  paths.add(path.split('[')[0])
60  return paths
61 
62 
64  """A specialization of `RepoConverter` for root data repositories.
65 
66  `RootRepoConverter` adds support for raw images (mostly delegated to the
67  parent task's `RawIngestTask` subtask) and reference catalogs.
68 
69  Parameters
70  ----------
71  kwds
72  Keyword arguments are forwarded to (and required by) `RepoConverter`.
73  """
74 
75  def __init__(self, **kwds):
76  super().__init__(run=None, **kwds)
77  self._refCats: Dict[str, SkyPixDimension] = {}
78  if self.tasktask.config.rootSkyMapName is not None:
79  self._rootSkyMap_rootSkyMap = self.tasktask.config.skyMaps[self.tasktask.config.rootSkyMapName].skyMap.apply()
80  else:
81  self._rootSkyMap_rootSkyMap = None # All access to _rootSkyMap is guarded
82  self._rawRefs_rawRefs = []
83 
84  def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
85  # Docstring inherited from RepoConverter.
86  return (
87  super().isDatasetTypeSpecial(datasetTypeName)
88  or datasetTypeName in ("raw", "ref_cat", "ref_cat_config")
89  # in Gen2, some of these are in the root repo, not a calib repo
90  or datasetTypeName in self.instrumentinstrument.getCuratedCalibrationNames()
91  )
92 
93  def getSpecialDirectories(self) -> List[str]:
94  # Docstring inherited from RepoConverter.
95  return super().getSpecialDirectories() + ["CALIB", "ref_cats", "rerun"]
96 
97  def findMatchingSkyMap(self, datasetTypeName: str) -> Tuple[Optional[BaseSkyMap], Optional[str]]:
98  # Docstring inherited from StandardRepoConverter.findMatchingSkyMap.
99  skyMap, name = super().findMatchingSkyMap(datasetTypeName)
100  if skyMap is None and self.tasktask.config.rootSkyMapName is not None:
101  self.tasktask.log.debug(
102  ("Assuming configured root skymap with name '%s' for dataset %s."),
103  self.tasktask.config.rootSkyMapName, datasetTypeName
104  )
105  skyMap = self._rootSkyMap_rootSkyMap
106  name = self.tasktask.config.rootSkyMapName
107  return skyMap, name
108 
109  def runRawIngest(self, pool=None):
110  if self.tasktask.raws is None:
111  return
112  self.tasktask.log.info(f"Finding raws in root {self.root}.")
113  if self.subsetsubset is not None:
114  dataRefs = itertools.chain.from_iterable(
115  self.butler2butler2.subset(self.tasktask.config.rawDatasetType,
116  visit=visit) for visit in self.subsetsubset.visits
117  )
118  else:
119  dataRefs = self.butler2butler2.subset(self.tasktask.config.rawDatasetType)
120  dataPaths = getDataPaths(dataRefs)
121  self.tasktask.log.info("Ingesting raws from root %s into run %s.", self.rootroot, self.tasktask.raws.butler.run)
122  self._rawRefs_rawRefs.extend(self.tasktask.raws.run(dataPaths, pool=pool))
123  self._chain_chain_chain = [self.tasktask.raws.butler.run]
124 
125  def runDefineVisits(self, pool=None):
126  if self.tasktask.defineVisits is None:
127  return
128  dimensions = DimensionGraph(self.tasktask.universe, names=["exposure"])
129  exposureDataIds = set(ref.dataId.subset(dimensions) for ref in self._rawRefs_rawRefs)
130  self.tasktask.log.info("Defining visits from exposures.")
131  self.tasktask.defineVisits.run(exposureDataIds, pool=pool)
132 
133  def prep(self):
134  # Docstring inherited from RepoConverter.
135  # Gather information about reference catalogs.
136  if self.tasktask.isDatasetTypeIncluded("ref_cat") and len(self.tasktask.config.refCats) != 0:
137  from lsst.meas.algorithms import DatasetConfig as RefCatDatasetConfig
138  for refCat in os.listdir(os.path.join(self.rootroot, "ref_cats")):
139  path = os.path.join(self.rootroot, "ref_cats", refCat)
140  configFile = os.path.join(path, "config.py")
141  if not os.path.exists(configFile):
142  continue
143  if refCat not in self.tasktask.config.refCats:
144  continue
145  self.tasktask.log.info(f"Preparing ref_cat {refCat} from root {self.root}.")
146  onDiskConfig = RefCatDatasetConfig()
147  onDiskConfig.load(configFile)
148  if onDiskConfig.indexer.name != "HTM":
149  raise ValueError(f"Reference catalog '{refCat}' uses unsupported "
150  f"pixelization '{onDiskConfig.indexer.name}'.")
151  level = onDiskConfig.indexer["HTM"].depth
152  try:
153  dimension = self.tasktask.universe[f"htm{level}"]
154  except KeyError as err:
155  raise ValueError(f"Reference catalog {refCat} uses HTM level {level}, but no htm{level} "
156  f"skypix dimension is configured for this registry.") from err
157  self.tasktask.useSkyPix(dimension)
158  self._refCats[refCat] = dimension
159  if self.tasktask.isDatasetTypeIncluded("brightObjectMask") and self.tasktask.config.rootSkyMapName:
160  self.tasktask.useSkyMap(self._rootSkyMap_rootSkyMap, self.tasktask.config.rootSkyMapName)
161  super().prep()
162 
163  def iterDatasets(self) -> Iterator[FileDataset]:
164  # Docstring inherited from RepoConverter.
165  # Iterate over reference catalog files.
166  for refCat, dimension in self._refCats.items():
167  datasetType = DatasetType(refCat, dimensions=[dimension], universe=self.tasktask.universe,
168  storageClass="SimpleCatalog")
169  if self.subsetsubset is None:
170  regex = re.compile(r"(\d+)\.fits")
171  for fileName in os.listdir(os.path.join(self.rootroot, "ref_cats", refCat)):
172  m = regex.match(fileName)
173  if m is not None:
174  htmId = int(m.group(1))
175  dataId = self.tasktask.registry.expandDataId({dimension: htmId})
176  yield FileDataset(path=os.path.join(self.rootroot, "ref_cats", refCat, fileName),
177  refs=DatasetRef(datasetType, dataId))
178  else:
179  for begin, end in self.subsetsubset.skypix[dimension]:
180  for htmId in range(begin, end):
181  dataId = self.tasktask.registry.expandDataId({dimension: htmId})
182  yield FileDataset(path=os.path.join(self.rootroot, "ref_cats", refCat, f"{htmId}.fits"),
183  refs=DatasetRef(datasetType, dataId))
184  yield from super().iterDatasets()
185 
186  def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str:
187  # Docstring inherited from RepoConverter.
188  if datasetTypeName in self._refCats:
189  return self.instrumentinstrument.makeRefCatCollectionName("gen2")
190  return super().getRun(datasetTypeName, calibDate)
191 
192  def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
193  # Docstring inherited from RepoConverter.
194  super()._finish(datasets)
195  if self._refCats:
196  # Set up a CHAINED collection named something like "refcats" to
197  # also point to "refcats/gen2". It's conceivable (but unlikely)
198  # that "refcats/gen2" might not exist, if the scanner saw reference
199  # catalog datasets on disk but none overlapped the area of
200  # interest, so we register that here, too (multiple registrations
201  # of collections are fine).
202  chained = self.instrumentinstrument.makeRefCatCollectionName()
203  child = self.instrumentinstrument.makeRefCatCollectionName("gen2")
204  self.tasktask.registry.registerCollection(chained, CollectionType.CHAINED)
205  self.tasktask.registry.registerCollection(child, CollectionType.RUN)
206  children = list(self.tasktask.registry.getCollectionChain(chained))
207  children.append(child)
208  self.tasktask.registry.setCollectionChain(chained, children)
209  # Also add "refcats" to the list of collections that contains
210  # everything found in the root repo. Normally this is done in
211  # getRun, but here we want to add the (possibly new) CHAINED
212  # collection instead of the RUN collection.
213  self._chain_chain_chain.append(chained)
std::vector< SchemaItem< Flag > > * items
str getRun(self, str datasetTypeName, Optional[str] calibDate=None)
Tuple[Optional[BaseSkyMap], Optional[str]] findMatchingSkyMap(self, str datasetTypeName)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912