LSSTApplications  18.1.0
LSSTDataManagementBasePackage
bootstrapRepo.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 
23 __all__ = ("BootstrapRepoConfig", "BootstrapRepoTask", "BootstrapRepoInputs",
24  "BootstrapRepoSkyMapConfig", "BootstrapRepoRefCatConfig")
25 
26 import os.path
27 from dataclasses import dataclass
28 from typing import List
29 import glob
30 
31 from lsst import sphgeom
32 from lsst.daf.butler import Butler, DatasetType
33 from lsst.daf.butler.instrument import Instrument
34 from lsst.pex.config import Config, Field, ConfigurableField, ConfigDictField, ConfigField
35 from lsst.pipe.base import Task
36 from lsst.obs.base.gen3 import RawIngestTask, makeTransferChoiceField
37 from lsst.skymap import skyMapRegistry
38 from lsst.meas.algorithms import DatasetConfig
39 
40 from .repoConverter import RepoConverter
41 from .calibRepoConverter import CalibRepoConverter
42 
43 
45  datasetTypeName = Field(("DatasetType used to write the SkyMap instance. If None, the instance will "
46  "not be written, and only the Registry will be modified."),
47  dtype=str, default="deepCoadd_skyMap", optional=True)
48  collection = Field(("Butler collection the SkyMap instance should be written to. If None, the "
49  "collection used to initialize the butler will be used."),
50  dtype=str, default="skymaps", optional=True)
51  skyMap = skyMapRegistry.makeField(
52  doc="Type and parameters for the SkyMap itself.",
53  default="dodeca",
54  )
55 
56 
58  datasetTypeName = Field(("DatasetType used to write the catalog shards.."),
59  dtype=str, default="ref_cat")
60  filterByRawRegions = Field(("If True, do not ingest shards that do not overlap visits. "
61  "Does not guarantee that all ingested shards will overlap a visit."),
62  dtype=bool, default=True)
63  collection = Field(("Butler collection the reference catalog should be written to. If None, the "
64  "collection used to initialize the butler will be used. May also be a string with "
65  "the format placeholder '{name}', which will be replaced with the reference "
66  "catalog name (i.e. the key of the configuration dictionary,"),
67  dtype=str, default="refcats/{name}", optional=True)
68  transfer = makeTransferChoiceField(default="symlink")
69 
70 
72  collection = Field(("Butler collection that datasets should be ingested into. "
73  "If None, the collection used to initialize the butler will be used."),
74  dtype=str, default=None, optional=True)
75  transfer = makeTransferChoiceField(default="symlink")
76 
77 
79  skymap = Field("SkyMap dimension name used to define the tracts and patches for bright object masks.",
80  dtype=str, default=None, optional=False)
81  filterByRawRegions = Field(("If True, do not ingest files that do not overlap visits. "
82  "Does not guarantee that all ingested files will overlap a visit."),
83  dtype=bool, default=True)
84 
85 
87  raws = ConfigurableField(target=RawIngestTask,
88  doc=("Configuration for subtask responsible for ingesting raws and adding "
89  "visit and exposure dimension entries."))
90  skymaps = ConfigDictField(doc=("SkyMap definitions to register and ingest into the repo, keyed by "
91  "skymap dimension name."),
92  keytype=str,
93  itemtype=BootstrapRepoSkyMapConfig,
94  default={})
95  refCats = ConfigDictField(doc=("Reference catalogs to ingest into the repo, keyed by their subdirectory "
96  "within the overall reference catalog root."),
97  keytype=str,
98  itemtype=BootstrapRepoRefCatConfig,
99  default={})
100  brightObjectMasks = ConfigField(doc="Configuration for ingesting brightObjectMask files.",
101  dtype=BootstrapRepoBrightObjectMasksConfig)
102  calibrations = ConfigField(doc="Configuration for ingesting and creating master calibration products.",
103  dtype=BootstrapRepoGenericIngestConfig)
104 
105  def setDefaults(self):
106  self.raws.transfer = "symlink"
107 
108 
109 @dataclass
111  """Simple struct that aggregates all non-config inputs to
112  `BootstrapRepoTask`.
113 
114  Generally, this stuct contains inputs that depend on the organization
115  of the input files on a particular system, while the config includes
116  everything else. The exception is the ``instrument`` attribute, which
117  cannot be included in the config because it's expected that driver code
118  will actually use it (via
119  `~lsst.daf.butler.instrument.Instrument.applyConfigOverrides`) to define
120  the config.
121  """
122 
123  instrument: Instrument
124  """Instrument subclass instance for the raws and calibrations to be
125  included in the initial repo.
126  """
127 
128  raws: List[str]
129  """List of filenames for raw files to ingest (complete paths).
130  """
131 
132  refCatRoot: str
133  """Root of the directory containing the reference catalogs, with immediate
134  subdirectories that correspond to different reference catalogs.
135  """
136 
137  brightObjectMaskRoot: str
138  """Root of the Gen2 repository containing bright object masks.
139  """
140 
141  calibRoot: str
142  """Root of the Gen2 calibraion repository containing flats, biases,
143  darks, and fringes.
144  """
145 
146 
148  """A Task that populates a Gen3 repo with the minimum content needed to
149  run the DRP pipelines.
150 
151  BootstrapRepoTask currently relies on Gen2 data repository information
152  for both bright object masks and master calibrations, but nothing else;
153  unlike dedicated Gen2->Gen3 conversion code, it will be updated in the
154  future as more pure-Gen3 approaches become available.
155 
156  Like other Gen3 Tasks that are not PipelineTasks, BootstrapRepoTask does
157  not yet have a dedicated, general-purpose command-line driver. At least
158  for now, it is instead expected that custom driver scripts will be written
159  for different contexts and predefined datasets.
160 
161  Parameters
162  ----------
163  config : `BootstrapRepoConfig`
164  Configuration for the task.
165  butler : `lsst.daf.butler.Butler`
166  Gen3 Butler defining the repository to populate. New butlers with
167  different output collections will be created as necessary from this
168  butler to match the output collections defined in the configuration.
169  kwds
170  Additional keyword arguments are forwarded to the
171  `lsst.pipe.base.Task` constructor.
172  """
173 
174  ConfigClass = BootstrapRepoConfig
175 
176  _DefaultName = "bootstrapRepo"
177 
178  def __init__(self, config=None, *, butler, **kwds):
179  super().__init__(config, **kwds)
180  self.butler = butler
181  self.makeSubtask("raws", butler=self.butler)
182  self.skyMaps = {}
183 
184  def getButler(self, collection=None):
185  """Create a new butler that writes into the given collection.
186 
187  Parameters
188  ----------
189  collection : `str`, optional
190  The new output collection. If `None`, ``self.butler`` is returned
191  directly.
192 
193  Returns
194  -------
195  butler : `lsst.daf.butler.Butler`
196  Butler instance pointing at the same repository as
197  ``self.butler``, but possibly a different collection.
198  """
199  if collection is not None:
200  return Butler(butler=self.butler, run=collection)
201  return self.butler
202 
203  def run(self, inputs):
204  """Run all steps involved in populating the new repository.
205 
206  Parameters
207  ----------
208  inputs : `BootstrapRepoInputs`
209  Filenames and paths for the data to be ingested.
210  """
211  self.bootstrapInstrument(inputs.instrument)
212  self.bootstrapCalibrations(inputs.instrument, inputs.calibRoot)
213  self.bootstrapRaws(inputs.raws)
214  self.bootstrapRefCats(inputs.refCatRoot)
215  self.bootstrapSkyMaps()
216  self.bootstrapBrightObjectMasks(inputs.instrument, inputs.brightObjectMaskRoot)
217 
218  def bootstrapInstrument(self, instrument):
219  """Add an instrument, associated metadata, and human-curated
220  calibrations to the repository.
221 
222  Parameters
223  ----------
224  instrument : `lsst.daf.butler.instrument.Instrument`
225  Instrument class that defines detectors, physical filters, and
226  curated calibrations to ingest.
227  """
228  self.log.info("Registering instrument '%s' and adding curated calibrations.", instrument.getName())
229  with self.butler.transaction():
230  instrument.register(self.butler.registry)
231  instrument.writeCuratedCalibrations(self.getButler(self.config.calibrations.collection))
232 
233  def bootstrapSkyMaps(self):
234  """Add configured SkyMaps to the repository.
235 
236  This both registers skymap dimension entries (the skymap, tract, and
237  patch tables, and their associated join tables) and adds a
238  ``<something>Coadd_skyMap`` dataset.
239  """
240  for name, config in self.config.skymaps.items():
241  self.log.info("Registering skymap '%s'.", name)
242  with self.butler.transaction():
243  skyMap = config.skyMap.apply()
244  skyMap.register(name, self.butler.registry)
245  if config.datasetTypeName is not None:
246  datasetType = DatasetType(config.datasetTypeName, dimensions=["skymap"],
247  storageClass="SkyMap")
248  self.butler.registry.registerDatasetType(datasetType)
249  self.getButler(config.collection).put(skyMap, datasetType, skymap=name)
250  self.skyMaps[name] = skyMap
251 
252  def bootstrapRaws(self, files):
253  """Ingest raw images.
254 
255  This step must be run after `bootstrapInstrument`, but may be run
256  multiple times with different arguments (which may be overlapping if
257  the nested `RawIngestTask` is configured to ignore duplicates).
258 
259  Parameters
260  ----------
261  files : sequence of `str`
262  The complete path names of the files to be ingested.
263  """
264  self.log.info("Ingesting raw images.")
265  return self.raws.run(files) # transaction handled internally, according to config.
266 
268  """Compute and return the skypix dimension entries that overlap
269  already-ingested visits.
270  """
271  # TODO: provide a non-SQL way to efficiently perform this query?
272  return list(
273  row["skypix"] for row in self.butler.registry.query(
274  "SELECT DISTINCT skypix FROM visit_skypix_join"
275  )
276  )
277 
278  def bootstrapRefCats(self, root):
279  """Ingest reference catalogs.
280 
281  This step must be run after `bootstrapRaws` if the
282  ``filterByRawRegions`` config option is `True` for any reference
283  catalog.
284 
285  Parameters
286  ----------
287  root : `str`
288  Root of the directory containing the reference catalogs, with
289  immediate subdirectories that correspond to different reference
290  catalogs.
291  """
292  if not self.config.refCats:
293  return
294  if any(config.filterByRawRegions for config in self.config.refCats.values()):
295  rawSkyPixels = self.computeRawSkyPixels()
296  datasetType = DatasetType("ref_cat", dimensions=["skypix"], storageClass="SimpleCatalog")
297  self.butler.registry.registerDatasetType(datasetType)
298  for name, config in self.config.refCats.items():
299  self.log.info("Ingesting reference catalog '%s'.", name)
300  with self.butler.transaction():
301  onDiskConfig = DatasetConfig()
302  onDiskConfig.load(os.path.join(root, name, "config.py"))
303  if onDiskConfig.indexer.name != "HTM":
304  raise ValueError(f"Reference catalog '{name}' uses unsupported "
305  f"pixelization '{onDiskConfig.indexer.name}'.")
306  if not isinstance(self.butler.registry.pixelization, sphgeom.HtmPixelization):
307  raise ValueError(f"Registry uses unsupported pixelization class "
308  f"{self.butler.registry.pixelization.__class__}.")
309  if onDiskConfig.indexer["HTM"].depth != self.butler.registry.pixelization.getLevel():
310  raise ValueError(f"Registry HTM level {self.butler.registry.pixelization.getLevel()} "
311  f"does not match reference catalog level {onDiskConfig.indexer.depth}.")
312  butler = self.getButler(config.collection.format(name))
313  if config.filterByRawRegions:
314  missing = []
315  for index in rawSkyPixels:
316  path = os.path.join(root, name, f"{index}.fits")
317  if os.path.exists(path):
318  butler.ingest(path, datasetType, transfer=config.transfer, skypix=index)
319  else:
320  missing.append(index)
321  if missing:
322  self.log.warn("Some overlapping reference catalog shards missing: %s", missing)
323  else:
324  for path in glob.glob(os.path.join(root, name, "*.fits")):
325  if path.endswith("master_schema.fits"):
326  continue
327  _, filename = os.path.split(path)
328  basename, _ = os.path.splitext(filename)
329  try:
330  index = int(basename)
331  except ValueError:
332  self.log.warn("Unrecognized file in reference catalog root: '%s'.", path)
333  continue
334  butler.ingest(path, datasetType, transfer=config.transfer, skypix=index)
335 
336  def computeRawTracts(self, skymap):
337  """Compute and return the tract dimension entries that overlap
338  already-ingested visits.
339  """
340  # TODO: provide a non-SQL way to efficiently perform this query?
341  return list(
342  row["tract"] for row in self.butler.registry.query(
343  "SELECT DISTINCT tract FROM visit_tract_join WHERE skymap=:skymap",
344  skymap=skymap
345  )
346  )
347 
348  def bootstrapBrightObjectMasks(self, instrument, root):
349  """Ingest bright object masks from a Gen2 data repository.
350 
351  This step must be run after `bootstrapRaws` if the
352  ``filterByRawRegions`` config option is `True` for any reference
353  catalog, and must always be run after `bootstrapSkyMaps`.
354 
355  Parameters
356  ----------
357  root : `str`
358  Root of the Gen2 repository containing bright object masks.
359  instrument : `lsst.daf.butler.instrument.Instrument`
360  Instrument subclass instance; used to relate Gen2 filter
361  strings to Gen3 physical_filters and abstract_filters.
362  """
363  self.log.info("Ingesting bright object masks.")
364  butler = self.getButler(self.config.brightObjectMasks.collection)
365  baseDataId = {
366  "skymap": self.config.brightObjectMasks.skymap,
367  "instrument": instrument.getName()
368  }
369  converter = RepoConverter(root, universe=butler.registry.dimensions, baseDataId=baseDataId,
370  skyMap=self.skyMaps[self.config.brightObjectMasks.skymap])
371  converter.addDatasetType("brightObjectMask", "ObjectMaskCatalog")
372  if self.config.brightObjectMasks.filterByRawRegions:
373  for tract in self.computeRawTracts(self.config.brightObjectMasks.skymap):
374  with self.butler.transaction():
375  converter.convertRepo(butler, directory=f"{root}/deepCoadd/BrightObjectMasks/{tract:d}",
376  transfer=self.config.brightObjectMasks.transfer)
377  else:
378  with self.butler.transaction():
379  converter.convertRepo(butler, transfer=self.config.brightObjectMasks.transfer)
380 
381  def bootstrapCalibrations(self, instrument, root):
382  """Ingest master calibrations from a Gen2 calibration data repository.
383 
384  At present, all master calibrations in the Gen2 repostory are
385  transferred, even those unrelated to the ingested raws.
386 
387  This step must be run after `bootstrapInstrument`.
388 
389  Parameters
390  ----------
391  instrument : `lsst.daf.butler.instrument.Instrument`
392  Instrument subclass instance for the raws and calibrations to be
393  included in the initial repo.
394  root : `str`
395  Root of the Gen2 calibration data repository.
396  """
397  self.log.info("Ingesting calibrations.")
398  baseDataId = {"instrument": instrument.getName()}
399  butler = self.getButler(self.config.calibrations.collection)
400  converter = CalibRepoConverter(root, universe=butler.registry.dimensions, baseDataId=baseDataId)
401  converter.addDatasetType("flat", "MaskedImageF")
402  converter.addDatasetType("bias", "ImageF")
403  converter.addDatasetType("dark", "ImageF")
404  converter.addDatasetType("sky", "ExposureF")
405  converter.addDatasetType("fringe", "ExposureF")
406  # TODO, DM-16805: No StorageClass/Formatter for yBackground in Gen3.
407  with self.butler.transaction():
408  converter.convertRepo(butler, transfer=self.config.brightObjectMasks.transfer)
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def __init__(self, config=None, butler, kwds)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
Definition: ingest.py:49
HtmPixelization provides HTM indexing of points and regions.
daf::base::PropertyList * list
Definition: fits.cc:885
def bootstrapBrightObjectMasks(self, instrument, root)