LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
convertRepo.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["ConvertRepoConfig", "ConvertRepoTask", "ConvertRepoSkyMapConfig"]
24 
25 import os
26 import fnmatch
27 from dataclasses import dataclass
28 from typing import Iterable, Optional, List, Dict
29 
30 from lsst.utils import doImport
31 from lsst.daf.butler import (
32  Butler as Butler3,
33  SkyPixDimension
34 )
35 from lsst.pex.config import Config, ConfigurableField, ConfigDictField, DictField, ListField, Field
36 from lsst.pipe.base import Task
37 from lsst.skymap import skyMapRegistry, BaseSkyMap
38 
39 from ..ingest import RawIngestTask
40 from .repoConverter import ConversionSubset
41 from .rootRepoConverter import RootRepoConverter
42 from .calibRepoConverter import CalibRepoConverter
43 from .standardRepoConverter import StandardRepoConverter
44 
45 
46 @dataclass
48  """Struct containing information about a skymap that may appear in a Gen2
49  repository.
50  """
51 
52  name: str
53  """Name of the skymap used in Gen3 data IDs.
54  """
55 
56  sha1: bytes
57  """Hash computed by `BaseSkyMap.getSha1`.
58  """
59 
60  instance: BaseSkyMap
61  """Name of the skymap used in Gen3 data IDs.
62  """
63 
64  used: bool = False
65  """Whether this skymap has been found in at least one repository being
66  converted.
67  """
68 
69 
71  """Sub-config used to hold the parameters of a SkyMap.
72 
73  Notes
74  -----
75  This config only needs to exist because we can't put a
76  `~lsst.pex.config.RegistryField` directly inside a
77  `~lsst.pex.config.ConfigDictField`.
78 
79  It needs to have its only field named "skyMap" for compatibility with the
80  configuration of `lsst.pipe.tasks.MakeSkyMapTask`, which we want so we can
81  use one config file in an obs package to configure both.
82 
83  This name leads to unfortunate repetition with the field named
84  "skymap" that holds it - "skyMap[name].skyMap" - but that seems
85  unavoidable.
86  """
87  skyMap = skyMapRegistry.makeField(
88  doc="Type and parameters for the SkyMap itself.",
89  default="dodeca",
90  )
91 
92 
93 class ConvertRepoConfig(Config):
94  raws = ConfigurableField(
95  "Configuration for subtask responsible for ingesting raws and adding "
96  "visit and exposure dimension entries.",
97  target=RawIngestTask,
98  )
99  skyMaps = ConfigDictField(
100  "Mapping from Gen3 skymap name to the parameters used to construct a "
101  "BaseSkyMap instance. This will be used to associated names with "
102  "existing skymaps found in the Gen2 repo.",
103  keytype=str,
104  itemtype=ConvertRepoSkyMapConfig,
105  default={}
106  )
107  collections = DictField(
108  "Special collections (values) for certain dataset types (keys). "
109  "These are used in addition to rerun collections for datasets in "
110  "reruns. The 'raw' dataset must have an entry here if it is to be "
111  "converted.",
112  keytype=str,
113  itemtype=str,
114  default={
115  "deepCoadd_skyMap": "skymaps",
116  "brightObjectMask": "masks",
117  }
118  )
119  storageClasses = DictField(
120  "Mapping from dataset type name or Gen2 policy entry (e.g. 'python' "
121  "or 'persistable') to the Gen3 StorageClass name.",
122  keytype=str,
123  itemtype=str,
124  default={
125  "BaseSkyMap": "SkyMap",
126  "BaseCatalog": "Catalog",
127  "BackgroundList": "Background",
128  "raw": "Exposure",
129  "MultilevelParquetTable": "DataFrame",
130  }
131  )
132  doRegisterInstrument = Field(
133  "If True (default), add dimension records for the Instrument and its "
134  "filters and detectors to the registry instead of assuming they are "
135  "already present.",
136  dtype=bool,
137  default=True,
138  )
139  doWriteCuratedCalibrations = Field(
140  "If True (default), ingest human-curated calibrations directly via "
141  "the Instrument interface. Note that these calibrations are never "
142  "converted from Gen2 repositories.",
143  dtype=bool,
144  default=True,
145  )
146  refCats = ListField(
147  "The names of reference catalogs (subdirectories under ref_cats) to "
148  "be converted",
149  dtype=str,
150  default=[]
151  )
152  fileIgnorePatterns = ListField(
153  "Filename globs that should be ignored instead of being treated as "
154  "datasets.",
155  dtype=str,
156  default=["README.txt", "*~?", "butler.yaml", "gen3.sqlite3"]
157  )
158  datasetIncludePatterns = ListField(
159  "Glob-style patterns for dataset type names that should be converted.",
160  dtype=str,
161  default=["*"]
162  )
163  datasetIgnorePatterns = ListField(
164  "Glob-style patterns for dataset type names that should not be "
165  "converted despite matching a pattern in datasetIncludePatterns.",
166  dtype=str,
167  default=[]
168  )
169  ccdKey = Field(
170  "Key used for the Gen2 equivalent of 'detector' in data IDs.",
171  dtype=str,
172  default="ccd",
173  )
174  relatedOnly = Field(
175  "If True (default), only convert datasets that are related to the "
176  "ingested visits. Ignored unless a list of visits is passed to "
177  "run().",
178  dtype=bool,
179  default=False,
180  )
181 
182  @property
183  def transfer(self):
184  return self.raws.transfer
185 
186  @transfer.setter
187  def transfer(self, value):
188  self.raws.transfer = value
189 
190  @property
191  def instrument(self):
192  return self.raws.instrument
193 
194  @instrument.setter
195  def instrument(self, value):
196  self.raws.instrument = value
197 
198  def setDefaults(self):
199  self.transfer = None
200 
201  # TODO: check that there are no collection overrides for curated
202  # calibrations, since we don't have a good way to utilize them.
203 
204 
206  """A task that converts one or more related Gen2 data repositories to a
207  single Gen3 data repository (with multiple collections).
208 
209  Parameters
210  ----------
211  config: `ConvertRepoConfig`
212  Configuration for this task.
213  butler3: `lsst.daf.butler.Butler`
214  Gen3 Butler instance that represents the data repository datasets will
215  be ingested into. The collection and/or run associated with this
216  Butler will be ignored in favor of collections/runs passed via config
217  or to `run`.
218  kwds
219  Other keyword arguments are forwarded to the `Task` constructor.
220 
221  Notes
222  -----
223  Most of the work of converting repositories is delegated to instances of
224  the `RepoConverter` hierarchy. The `ConvertRepoTask` instance itself holds
225  only state that is relevant for all Gen2 repositories being ingested, while
226  each `RepoConverter` instance holds only state relevant for the conversion
227  of a single Gen2 repository. Both the task and the `RepoConverter`
228  instances are single use; `ConvertRepoTask.run` and most `RepoConverter`
229  methods may only be called once on a particular instance.
230  """
231 
232  ConfigClass = ConvertRepoConfig
233 
234  _DefaultName = "convertRepo"
235 
236  def __init__(self, config=None, *, butler3: Butler3, **kwds):
237  super().__init__(config, **kwds)
238  self.butler3 = butler3
239  self.registry = self.butler3.registry
240  self.universe = self.registry.dimensions
241  if self.isDatasetTypeIncluded("raw"):
242  self.makeSubtask("raws", butler=butler3)
243  self.instrument = self.raws.instrument
244  else:
245  self.raws = None
246  self.instrument = doImport(self.config.instrument)()
247  self._configuredSkyMapsBySha1 = {}
248  self._configuredSkyMapsByName = {}
249  for name, config in self.config.skyMaps.items():
250  instance = config.skyMap.apply()
251  struct = ConfiguredSkyMap(name=name, sha1=instance.getSha1(), instance=instance)
252  self._configuredSkyMapsBySha1[struct.sha1] = struct
253  self._configuredSkyMapsByName[struct.name] = struct
254  self._usedSkyPix = set()
255 
256  def isDatasetTypeIncluded(self, datasetTypeName: str):
257  """Return `True` if configuration indicates that the given dataset type
258  should be converted.
259 
260  This method is intended to be called primarily by the
261  `RepoConverter` instances used interally by the task.
262 
263  Parameters
264  ----------
265  datasetTypeName: str
266  Name of the dataset type.
267 
268  Returns
269  -------
270  included : `bool`
271  Whether the dataset should be included in the conversion.
272  """
273  return (
274  any(fnmatch.fnmatchcase(datasetTypeName, pattern)
275  for pattern in self.config.datasetIncludePatterns) and
276  not any(fnmatch.fnmatchcase(datasetTypeName, pattern)
277  for pattern in self.config.datasetIgnorePatterns)
278  )
279 
280  def useSkyMap(self, skyMap: BaseSkyMap) -> str:
281  """Indicate that a repository uses the given SkyMap.
282 
283  This method is intended to be called primarily by the
284  `RepoConverter` instances used interally by the task.
285 
286  Parameters
287  ----------
288  skyMap : `lsst.skymap.BaseSkyMap`
289  SkyMap instance being used, typically retrieved from a Gen2
290  data repository.
291 
292  Returns
293  -------
294  name : `str`
295  The name of the skymap in Gen3 data IDs.
296  """
297  sha1 = skyMap.getSha1()
298  try:
299  struct = self._configuredSkyMapsBySha1[sha1]
300  except KeyError as err:
301  raise LookupError(f"SkyMap with sha1={sha1} not included in configuration.") from err
302  struct.used = True
303  return struct.name
304 
305  def registerUsedSkyMaps(self, subset: Optional[ConversionSubset]):
306  """Register all skymaps that have been marked as used.
307 
308  This method is intended to be called primarily by the
309  `RepoConverter` instances used interally by the task.
310 
311  Parameters
312  ----------
313  subset : `ConversionSubset`, optional
314  Object that will be used to filter converted datasets by data ID.
315  If given, it will be updated with the tracts of this skymap that
316  overlap the visits in the subset.
317  """
318  for struct in self._configuredSkyMapsBySha1.values():
319  if struct.used:
320  struct.instance.register(struct.name, self.registry)
321  if subset is not None and self.config.relatedOnly:
322  subset.addSkyMap(self.registry, struct.name)
323 
324  def useSkyPix(self, dimension: SkyPixDimension):
325  """Indicate that a repository uses the given SkyPix dimension.
326 
327  This method is intended to be called primarily by the
328  `RepoConverter` instances used interally by the task.
329 
330  Parameters
331  ----------
332  dimension : `lsst.daf.butler.SkyPixDimension`
333  Dimension represening a pixelization of the sky.
334  """
335  self._usedSkyPix.add(dimension)
336 
337  def registerUsedSkyPix(self, subset: Optional[ConversionSubset]):
338  """Register all skymaps that have been marked as used.
339 
340  This method is intended to be called primarily by the
341  `RepoConverter` instances used interally by the task.
342 
343  Parameters
344  ----------
345  subset : `ConversionSubset`, optional
346  Object that will be used to filter converted datasets by data ID.
347  If given, it will be updated with the pixelization IDs that
348  overlap the visits in the subset.
349  """
350  if subset is not None and self.config.relatedOnly:
351  for dimension in self._usedSkyPix:
352  subset.addSkyPix(self.registry, dimension)
353 
354  def run(self, root: str, collections: List[str], *,
355  calibs: Dict[str, List[str]] = None,
356  reruns: Dict[str, List[str]] = None,
357  visits: Optional[Iterable[int]] = None):
358  """Convert a group of related data repositories.
359 
360  Parameters
361  ----------
362  root : `str`
363  Complete path to the root Gen2 data repository. This should be
364  a data repository that includes a Gen2 registry and any raw files
365  and/or reference catalogs.
366  collections : `list` of `str`
367  Gen3 collections that datasets from the root repository should be
368  associated with. This should include any rerun collection that
369  these datasets should also be considered to be part of; because of
370  structural difference between Gen2 parent/child relationships and
371  Gen3 collections, these cannot be reliably inferred.
372  calibs : `dict`
373  Dictionary mapping calibration repository path to the collections
374  that the repository's datasets should be associated with. The path
375  may be relative to ``root`` or absolute. Collections should
376  include child repository collections as appropriate (see
377  documentation for ``collections``).
378  reruns : `dict`
379  Dictionary mapping rerun repository path to the collections that
380  the repository's datasets should be associated with. The path may
381  be relative to ``root`` or absolute. Collections should include
382  child repository collections as appropriate (see documentation for
383  ``collections``).
384  visits : iterable of `int`, optional
385  The integer IDs of visits to convert. If not provided, all visits
386  in the Gen2 root repository will be converted.
387  """
388 
389  if calibs is None:
390  calibs = {}
391  if reruns is None:
392  reruns = {}
393  if visits is not None:
394  subset = ConversionSubset(instrument=self.instrument.getName(), visits=frozenset(visits))
395  else:
396  if self.config.relatedOnly:
397  self.log.warn("config.relatedOnly is True but all visits are being ingested; "
398  "no filtering will be done.")
399  subset = None
400 
401  if self.config.doRegisterInstrument:
402  self.instrument.register(self.registry)
403 
404  # Make and prep converters for all Gen2 repos. This should not modify
405  # the Registry database or filesystem at all, though it may query it.
406  converters = []
407  rootConverter = RootRepoConverter(task=self, root=root, collections=collections, subset=subset)
408  rootConverter.prep()
409  converters.append(rootConverter)
410 
411  for root, collections in calibs.items():
412  if not os.path.isabs(root):
413  root = os.path.join(rootConverter.root, root)
414  converter = CalibRepoConverter(task=self, root=root, collections=collections,
415  mapper=rootConverter.mapper,
416  subset=rootConverter.subset)
417  converter.prep()
418  converters.append(converter)
419 
420  for root, collections in reruns.items():
421  if not os.path.isabs(root):
422  root = os.path.join(rootConverter.root, root)
423  converter = StandardRepoConverter(task=self, root=root, collections=collections,
424  subset=rootConverter.subset)
425  converter.prep()
426  converters.append(converter)
427 
428  # Actual database writes start here. We can't wrap these sanely in
429  # transactions (yet) because we keep initializing new Butler instances
430  # just so we can write into new runs/collections, and transactions
431  # are managed at the Butler level (DM-21246 should let us fix this).
432 
433  # Insert dimensions needed by any converters. These are only the
434  # dimensions that a converter expects to be uniquely derived from the
435  # Gen2 repository it is reponsible for - e.g. visits, exposures, and
436  # calibration_labels.
437  #
438  # Note that we do not try to filter dimensions down to just those
439  # related to the given visits, even if config.relatedOnly is True; we
440  # need them in the Gen3 repo in order to be able to know which datasets
441  # to convert, because Gen2 alone doesn't know enough about the
442  # relationships between data IDs.
443  for converter in converters:
444  converter.insertDimensionData()
445 
446  # Insert dimensions that are potentially shared by all Gen2
447  # repositories (and are hence managed directly by the Task, rather
448  # than a converter instance).
449  # This also finishes setting up the (shared) converter.subsets object
450  # that is used to filter data IDs for config.relatedOnly.
451  self.registerUsedSkyMaps(rootConverter.subset)
452  self.registerUsedSkyPix(rootConverter.subset)
453 
454  # Actually ingest datasets.
455  for converter in converters:
456  converter.ingest()
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
daf::base::PropertySet * set
Definition: fits.cc:902
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def getName(self)
Definition: task.py:250
def doImport(pythonType)
Definition: utils.py:104