LSST Applications  21.0.0+75b29a8a7f,21.0.0+e70536a077,21.0.0-1-ga51b5d4+62c747d40b,21.0.0-10-gbfb87ad6+3307648ee3,21.0.0-15-gedb9d5423+47cba9fc36,21.0.0-2-g103fe59+fdf0863a2a,21.0.0-2-g1367e85+d38a93257c,21.0.0-2-g45278ab+e70536a077,21.0.0-2-g5242d73+d38a93257c,21.0.0-2-g7f82c8f+e682ffb718,21.0.0-2-g8dde007+d179fbfa6a,21.0.0-2-g8f08a60+9402881886,21.0.0-2-ga326454+e682ffb718,21.0.0-2-ga63a54e+08647d4b1b,21.0.0-2-gde069b7+26c92b3210,21.0.0-2-gecfae73+0445ed2f95,21.0.0-2-gfc62afb+d38a93257c,21.0.0-27-gbbd0d29+ae871e0f33,21.0.0-28-g5fc5e037+feb0e9397b,21.0.0-3-g21c7a62+f4b9c0ff5c,21.0.0-3-g357aad2+57b0bddf0b,21.0.0-3-g4be5c26+d38a93257c,21.0.0-3-g65f322c+3f454acf5d,21.0.0-3-g7d9da8d+75b29a8a7f,21.0.0-3-gaa929c8+9e4ef6332c,21.0.0-3-ge02ed75+4b120a55c4,21.0.0-4-g3300ddd+e70536a077,21.0.0-4-g591bb35+4b120a55c4,21.0.0-4-gc004bbf+4911b9cd27,21.0.0-4-gccdca77+f94adcd104,21.0.0-4-ge8fba5a+2b3a696ff9,21.0.0-5-gb155db7+2c5429117a,21.0.0-5-gdf36809+637e4641ee,21.0.0-6-g00874e7+c9fd7f7160,21.0.0-6-g4e60332+4b120a55c4,21.0.0-7-gc8ca178+40eb9cf840,21.0.0-8-gfbe0b4b+9e4ef6332c,21.0.0-9-g2fd488a+d83b7cd606,w.2021.05
LSST Data Management Base Package
convertRepo.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["CalibRepo", "ConvertRepoConfig", "ConvertRepoTask", "ConvertRepoSkyMapConfig", "Rerun"]
24 
25 import os
26 import fnmatch
27 from dataclasses import dataclass
28 from multiprocessing import Pool
29 from typing import Iterable, Optional, List, Tuple
30 
31 from lsst.daf.butler import (
32  Butler as Butler3,
33  ButlerURI,
34  CollectionType,
35  SkyPixDimension
36 )
37 from lsst.pex.config import Config, ConfigurableField, ConfigDictField, DictField, ListField, Field
38 from lsst.pipe.base import Task
39 from lsst.skymap import skyMapRegistry, BaseSkyMap
40 
41 from ..ingest import RawIngestTask
42 from ..defineVisits import DefineVisitsTask
43 from .repoConverter import ConversionSubset
44 from .rootRepoConverter import RootRepoConverter
45 from .calibRepoConverter import CalibRepoConverter
46 from .standardRepoConverter import StandardRepoConverter
47 from .._instrument import Instrument
48 
49 
50 @dataclass
52  """Struct containing information about a skymap that may appear in a Gen2
53  repository.
54  """
55 
56  name: str
57  """Name of the skymap used in Gen3 data IDs.
58  """
59 
60  sha1: bytes
61  """Hash computed by `BaseSkyMap.getSha1`.
62  """
63 
64  instance: BaseSkyMap
65  """Name of the skymap used in Gen3 data IDs.
66  """
67 
68  used: bool = False
69  """Whether this skymap has been found in at least one repository being
70  converted.
71  """
72 
73 
74 def _dropPrefix(s: str, prefix: str) -> Tuple[str, bool]:
75  """If ``s`` starts with ``prefix``, return the rest of ``s`` and `True`.
76  Otherwise return ``s`` and `False`.
77  """
78  if s.startswith(prefix):
79  return s[len(prefix):], True
80  return s, False
81 
82 
83 @dataclass
84 class Rerun:
85  """Specification for a Gen2 processing-output repository to convert.
86  """
87 
88  path: str
89  """Absolute or relative (to the root repository) path to the Gen2
90  repository (`str`).
91  """
92 
93  runName: Optional[str]
94  """Name of the `~lsst.daf.butler.CollectionType.RUN` collection datasets
95  will be inserted into (`str` or `None`).
96 
97  If `None`, a name will be guessed by calling `guessCollectionNames`.
98  """
99 
100  chainName: Optional[str]
101  """Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that will
102  combine this repository's datasets with those of its parent repositories
103  (`str` or `None`).
104 
105  If `None`, a name will be guessed by calling `guessCollectionNames`.
106  """
107 
108  parents: List[str]
109  """Collection names associated with parent repositories, used to define the
110  chained collection (`list` [ `str` ]).
111 
112  Ignored if `chainName` is `None`. Runs used in the root repo are
113  automatically included.
114  """
115 
116  def guessCollectionNames(self, instrument: Instrument, root: str) -> None:
117  """Update `runName` and `chainName` with guesses that match Gen3 naming
118  conventions.
119 
120  If `chainName` is not `None`, and `runName` is, `runName` will be set
121  from it. If `runName` is already set, nothing will be changed, and
122  if `chainName` is `None`, no chained collection will be created.
123 
124  Parameters
125  ----------
126  instrument : `Instrument`
127  Instrument object for the repository being converted.
128  root : `str`
129  Path to the root repository. If this is present at the start of
130  ``self.path``, it will be stripped as part of generating the run
131  name.
132 
133  Raises
134  ------
135  ValueError
136  Raised if the appropriate collection names cannot be inferred.
137  """
138  if self.runNamerunName is not None:
139  return
140  if self.chainNamechainName is None:
141  if os.path.isabs(self.path):
142  rerunURI = ButlerURI(self.path)
143  rootURI = ButlerURI(root)
144  chainName = rerunURI.relative_to(rootURI)
145  if chainName is None:
146  raise ValueError(
147  f"Cannot guess run name collection for rerun at '{self.path}': "
148  f"no clear relationship to root '{root}'."
149  )
150  else:
151  chainName = self.path
152  chainName, _ = _dropPrefix(chainName, "rerun/")
153  chainName, isPersonal = _dropPrefix(chainName, "private/")
154  if isPersonal:
155  chainName = f"u/{chainName}"
156  else:
157  chainName, _ = _dropPrefix(chainName, "shared/")
158  chainName = instrument.makeCollectionName("runs", chainName)
159  self.chainNamechainName = chainName
160  self.runNamerunName = f"{self.chainName}/direct"
161 
162 
163 @dataclass
164 class CalibRepo:
165  """Specification for a Gen2 calibration repository to convert.
166  """
167 
168  path: Optional[str]
169  """Absolute or relative (to the root repository) path to the Gen2
170  repository (`str` or `None`).
171 
172  If `None`, no calibration datasets will be converted from Gen2, but
173  curated calibrations may still be written.
174  """
175 
176  curated: bool = True
177  """If `True`, write curated calibrations into the associated
178  ``CALIBRATION`` collection (`bool`).
179  """
180 
181  labels: Tuple[str, ...] = ()
182  """Extra strings to insert into collection names, including both the
183  ``RUN`` collections that datasets are ingested directly into and the
184  ``CALIBRATION`` collection that associates them with validity ranges.
185 
186  An empty tuple will directly populate the default calibration collection
187  for this instrument with the converted datasets, and is incompatible with
188  ``default=False``. This is a good choice for test data repositories where
189  only one ``CALIBRATION`` collection will ever exist. In other cases, this
190  should be a non-empty tuple, so the default calibration collection can
191  actually be a ``CHAINED`` collection pointer that points to the current
192  recommended ``CALIBRATION`` collection.
193  """
194 
195  default: bool = True
196  """If `True`, the created ``CALIBRATION`` collection should be the default
197  for this instrument.
198 
199  This field may only be `True` for one converted calibration collection if
200  more than one is passed to `ConvertRepoTask.run`. It defaults to `True`
201  because the vast majority of the time only one calibration collection is
202  being converted. If ``labels`` is not empty, ``default=True`` will cause
203  a ``CHAINED`` collection that points to the converted ``CALIBRATION``
204  collection to be defined. If ``labels`` is empty, ``default`` *must* be
205  `True` and no ``CHAINED`` collection pointer is necessary.
206  """
207 
208  def __post_init__(self) -> None:
209  if not self.labels and not self.default:
210  raise ValueError("labels=() requires default=True")
211 
212 
214  """Sub-config used to hold the parameters of a SkyMap.
215 
216  Notes
217  -----
218  This config only needs to exist because we can't put a
219  `~lsst.pex.config.RegistryField` directly inside a
220  `~lsst.pex.config.ConfigDictField`.
221 
222  It needs to have its only field named "skyMap" for compatibility with the
223  configuration of `lsst.pipe.tasks.MakeSkyMapTask`, which we want so we can
224  use one config file in an obs package to configure both.
225 
226  This name leads to unfortunate repetition with the field named
227  "skymap" that holds it - "skyMap[name].skyMap" - but that seems
228  unavoidable.
229  """
230  skyMap = skyMapRegistry.makeField(
231  doc="Type and parameters for the SkyMap itself.",
232  default="dodeca",
233  )
234 
235 
238  "Configuration for subtask responsible for ingesting raws and adding "
239  "exposure dimension entries.",
240  target=RawIngestTask,
241  )
242  defineVisits = ConfigurableField(
243  "Configuration for the subtask responsible for defining visits from "
244  "exposures.",
245  target=DefineVisitsTask,
246  )
247  skyMaps = ConfigDictField(
248  "Mapping from Gen3 skymap name to the parameters used to construct a "
249  "BaseSkyMap instance. This will be used to associate names with "
250  "existing skymaps found in the Gen2 repo.",
251  keytype=str,
252  itemtype=ConvertRepoSkyMapConfig,
253  default={}
254  )
255  rootSkyMapName = Field(
256  "Name of a Gen3 skymap (an entry in ``self.skyMaps``) to assume for "
257  "datasets in the root repository when no SkyMap is found there. ",
258  dtype=str,
259  optional=True,
260  default=None,
261  )
262  runs = DictField(
263  "A mapping from dataset type name to the RUN collection they should "
264  "be inserted into. This must include all datasets that can be found "
265  "in the root repository; other repositories will use per-repository "
266  "runs.",
267  keytype=str,
268  itemtype=str,
269  default={},
270  )
271  runsForced = DictField(
272  "Like ``runs``, but is used even when the dataset is present in a "
273  "non-root repository (i.e. rerun), overriding the non-root "
274  "repository's main collection.",
275  keytype=str,
276  itemtype=str,
277  default={
278  "brightObjectMask": "masks",
279  }
280  )
281  storageClasses = DictField(
282  "Mapping from dataset type name or Gen2 policy entry (e.g. 'python' "
283  "or 'persistable') to the Gen3 StorageClass name.",
284  keytype=str,
285  itemtype=str,
286  default={
287  "bias": "ExposureF",
288  "dark": "ExposureF",
289  "flat": "ExposureF",
290  "defects": "Defects",
291  "crosstalk": "CrosstalkCalib",
292  "BaseSkyMap": "SkyMap",
293  "BaseCatalog": "Catalog",
294  "BackgroundList": "Background",
295  "raw": "Exposure",
296  "MultilevelParquetTable": "DataFrame",
297  "ParquetTable": "DataFrame",
298  "SkyWcs": "Wcs",
299  }
300  )
301  formatterClasses = DictField(
302  "Mapping from dataset type name to formatter class. "
303  "By default these are derived from the formatters listed in the"
304  " Gen3 datastore configuration.",
305  keytype=str,
306  itemtype=str,
307  default={}
308  )
309  targetHandlerClasses = DictField(
310  "Mapping from dataset type name to target handler class.",
311  keytype=str,
312  itemtype=str,
313  default={}
314  )
315  doRegisterInstrument = Field(
316  "If True (default), add dimension records for the Instrument and its "
317  "filters and detectors to the registry instead of assuming they are "
318  "already present.",
319  dtype=bool,
320  default=True,
321  )
322  refCats = ListField(
323  "The names of reference catalogs (subdirectories under ref_cats) to "
324  "be converted",
325  dtype=str,
326  default=[]
327  )
328  fileIgnorePatterns = ListField(
329  "Filename globs that should be ignored instead of being treated as "
330  "datasets.",
331  dtype=str,
332  default=["README.txt", "*~?", "butler.yaml", "gen3.sqlite3",
333  "registry.sqlite3", "calibRegistry.sqlite3", "_mapper",
334  "_parent", "repositoryCfg.yaml"]
335  )
336  rawDatasetType = Field(
337  "Gen2 dataset type to use for raw data.",
338  dtype=str,
339  default="raw",
340  )
341  datasetIncludePatterns = ListField(
342  "Glob-style patterns for dataset type names that should be converted.",
343  dtype=str,
344  default=["*"]
345  )
346  datasetIgnorePatterns = ListField(
347  "Glob-style patterns for dataset type names that should not be "
348  "converted despite matching a pattern in datasetIncludePatterns.",
349  dtype=str,
350  default=[]
351  )
352  ccdKey = Field(
353  "Key used for the Gen2 equivalent of 'detector' in data IDs.",
354  dtype=str,
355  default="ccd",
356  )
357  relatedOnly = Field(
358  "If True (default), only convert datasets that are related to the "
359  "ingested visits. Ignored unless a list of visits is passed to "
360  "run().",
361  dtype=bool,
362  default=False,
363  )
364  doMakeUmbrellaCollection = Field(
365  "If True (default), define an '<instrument>/defaults' CHAINED "
366  "collection that includes everything found in the root repo as well "
367  "as the default calibration collection.",
368  dtype=bool,
369  default=True,
370  )
371  extraUmbrellaChildren = ListField(
372  "Additional child collections to include in the umbrella collection. "
373  "Ignored if doMakeUmbrellaCollection=False.",
374  dtype=str,
375  default=[]
376  )
377 
378  @property
379  def transfer(self):
380  return self.rawsraws.transfer
381 
382  @transfer.setter
383  def transfer(self, value):
384  self.rawsraws.transfer = value
385 
386  def setDefaults(self):
387  self.transfertransfertransfertransfer = None
388 
389 
391  """A task that converts one or more related Gen2 data repositories to a
392  single Gen3 data repository (with multiple collections).
393 
394  Parameters
395  ----------
396  config: `ConvertRepoConfig`
397  Configuration for this task.
398  butler3: `lsst.daf.butler.Butler`
399  A writeable Gen3 Butler instance that represents the data repository
400  that datasets will be ingested into. If the 'raw' dataset is
401  configured to be included in the conversion, ``butler3.run`` should be
402  set to the name of the collection raws should be ingested into, and
403  ``butler3.collections`` should include a calibration collection from
404  which the ``camera`` dataset can be loaded, unless a calibration repo
405  is converted and ``doWriteCuratedCalibrations`` is `True`.
406  instrument : `lsst.obs.base.Instrument`
407  The Gen3 instrument that should be used for this conversion.
408  **kwargs
409  Other keyword arguments are forwarded to the `Task` constructor.
410 
411  Notes
412  -----
413  Most of the work of converting repositories is delegated to instances of
414  the `RepoConverter` hierarchy. The `ConvertRepoTask` instance itself holds
415  only state that is relevant for all Gen2 repositories being ingested, while
416  each `RepoConverter` instance holds only state relevant for the conversion
417  of a single Gen2 repository. Both the task and the `RepoConverter`
418  instances are single use; `ConvertRepoTask.run` and most `RepoConverter`
419  methods may only be called once on a particular instance.
420  """
421 
422  ConfigClass = ConvertRepoConfig
423 
424  _DefaultName = "convertRepo"
425 
426  def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, **kwargs):
427  config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
428  super().__init__(config, **kwargs)
429  # Make self.butler3 one that doesn't have any collections associated
430  # with it - those are needed by RawIngestTask and DefineVisitsTask, but
431  # we don't want them messing with converted datasets, because those
432  # have their own logic for figuring out which collections to write to.
433  self.butler3butler3 = Butler3(butler=butler3)
434  self.registryregistry = self.butler3butler3.registry
435  self.universeuniverse = self.registryregistry.dimensions
436  if self.isDatasetTypeIncludedisDatasetTypeIncluded("raw"):
437  self.makeSubtaskmakeSubtask("raws", butler=butler3)
438  self.makeSubtaskmakeSubtask("defineVisits", butler=butler3)
439  else:
440  self.rawsraws = None
441  self.defineVisitsdefineVisits = None
442  self.instrumentinstrument = instrument
443  self._configuredSkyMapsBySha1_configuredSkyMapsBySha1 = {}
444  self._configuredSkyMapsByName_configuredSkyMapsByName = {}
445  for name, config in self.configconfig.skyMaps.items():
446  instance = config.skyMap.apply()
447  self._populateSkyMapDicts_populateSkyMapDicts(name, instance)
448  self._usedSkyPix_usedSkyPix = set()
449  self.translatorFactorytranslatorFactory = self.instrumentinstrument.makeDataIdTranslatorFactory()
450  self.translatorFactorytranslatorFactory.log = self.loglog.getChild("translators")
451 
452  def _reduce_kwargs(self):
453  # Add extra parameters to pickle
454  return dict(**super()._reduce_kwargs(), butler3=self.butler3butler3, instrument=self.instrumentinstrument)
455 
456  def _populateSkyMapDicts(self, name, instance):
457  struct = ConfiguredSkyMap(name=name, sha1=instance.getSha1(), instance=instance)
458  self._configuredSkyMapsBySha1_configuredSkyMapsBySha1[struct.sha1] = struct
459  self._configuredSkyMapsByName_configuredSkyMapsByName[struct.name] = struct
460 
461  def isDatasetTypeIncluded(self, datasetTypeName: str):
462  """Return `True` if configuration indicates that the given dataset type
463  should be converted.
464 
465  This method is intended to be called primarily by the
466  `RepoConverter` instances used interally by the task.
467 
468  Parameters
469  ----------
470  datasetTypeName: str
471  Name of the dataset type.
472 
473  Returns
474  -------
475  included : `bool`
476  Whether the dataset should be included in the conversion.
477  """
478  return (
479  any(fnmatch.fnmatchcase(datasetTypeName, pattern)
480  for pattern in self.configconfig.datasetIncludePatterns)
481  and not any(fnmatch.fnmatchcase(datasetTypeName, pattern)
482  for pattern in self.configconfig.datasetIgnorePatterns)
483  )
484 
485  def useSkyMap(self, skyMap: BaseSkyMap, skyMapName: str) -> str:
486  """Indicate that a repository uses the given SkyMap.
487 
488  This method is intended to be called primarily by the
489  `RepoConverter` instances used interally by the task.
490 
491  Parameters
492  ----------
493  skyMap : `lsst.skymap.BaseSkyMap`
494  SkyMap instance being used, typically retrieved from a Gen2
495  data repository.
496  skyMapName : `str`
497  The name of the gen2 skymap, for error reporting.
498 
499  Returns
500  -------
501  name : `str`
502  The name of the skymap in Gen3 data IDs.
503 
504  Raises
505  ------
506  LookupError
507  Raised if the specified skymap cannot be found.
508  """
509  sha1 = skyMap.getSha1()
510  if sha1 not in self._configuredSkyMapsBySha1_configuredSkyMapsBySha1:
511  self._populateSkyMapDicts_populateSkyMapDicts(skyMapName, skyMap)
512  try:
513  struct = self._configuredSkyMapsBySha1_configuredSkyMapsBySha1[sha1]
514  except KeyError as err:
515  msg = f"SkyMap '{skyMapName}' with sha1={sha1} not included in configuration."
516  raise LookupError(msg) from err
517  struct.used = True
518  return struct.name
519 
520  def registerUsedSkyMaps(self, subset: Optional[ConversionSubset]):
521  """Register all skymaps that have been marked as used.
522 
523  This method is intended to be called primarily by the
524  `RepoConverter` instances used interally by the task.
525 
526  Parameters
527  ----------
528  subset : `ConversionSubset`, optional
529  Object that will be used to filter converted datasets by data ID.
530  If given, it will be updated with the tracts of this skymap that
531  overlap the visits in the subset.
532  """
533  for struct in self._configuredSkyMapsBySha1_configuredSkyMapsBySha1.values():
534  if struct.used:
535  struct.instance.register(struct.name, self.butler3butler3)
536  if subset is not None and self.configconfig.relatedOnly:
537  subset.addSkyMap(self.registryregistry, struct.name)
538 
539  def useSkyPix(self, dimension: SkyPixDimension):
540  """Indicate that a repository uses the given SkyPix dimension.
541 
542  This method is intended to be called primarily by the
543  `RepoConverter` instances used interally by the task.
544 
545  Parameters
546  ----------
547  dimension : `lsst.daf.butler.SkyPixDimension`
548  Dimension represening a pixelization of the sky.
549  """
550  self._usedSkyPix_usedSkyPix.add(dimension)
551 
552  def registerUsedSkyPix(self, subset: Optional[ConversionSubset]):
553  """Register all skymaps that have been marked as used.
554 
555  This method is intended to be called primarily by the
556  `RepoConverter` instances used interally by the task.
557 
558  Parameters
559  ----------
560  subset : `ConversionSubset`, optional
561  Object that will be used to filter converted datasets by data ID.
562  If given, it will be updated with the pixelization IDs that
563  overlap the visits in the subset.
564  """
565  if subset is not None and self.configconfig.relatedOnly:
566  for dimension in self._usedSkyPix_usedSkyPix:
567  subset.addSkyPix(self.registryregistry, dimension)
568 
569  def run(self, root: str, *,
570  calibs: Optional[List[CalibRepo]] = None,
571  reruns: Optional[List[Rerun]] = None,
572  visits: Optional[Iterable[int]] = None,
573  pool: Optional[Pool] = None,
574  processes: int = 1):
575  """Convert a group of related data repositories.
576 
577  Parameters
578  ----------
579  root : `str`
580  Complete path to the root Gen2 data repository. This should be
581  a data repository that includes a Gen2 registry and any raw files
582  and/or reference catalogs.
583  calibs : `list` of `CalibRepo`
584  Specifications for Gen2 calibration repos to convert. If `None`
585  (default), curated calibrations only will be written to the default
586  calibration collection for this instrument; set to ``()`` explictly
587  to disable this.
588  reruns : `list` of `Rerun`
589  Specifications for rerun (processing output) repos to convert. If
590  `None` (default), no reruns are converted.
591  visits : iterable of `int`, optional
592  The integer IDs of visits to convert. If not provided, all visits
593  in the Gen2 root repository will be converted.
594  pool : `multiprocessing.Pool`, optional
595  If not `None`, a process pool with which to parallelize some
596  operations.
597  processes : `int`, optional
598  The number of processes to use for conversion.
599  """
600  if pool is None and processes > 1:
601  pool = Pool(processes)
602  if calibs is None:
603  calibs = [CalibRepo(path=None)]
604  if visits is not None:
605  subset = ConversionSubset(instrument=self.instrumentinstrument.getName(), visits=frozenset(visits))
606  else:
607  if self.configconfig.relatedOnly:
608  self.loglog.warn("config.relatedOnly is True but all visits are being ingested; "
609  "no filtering will be done.")
610  subset = None
611 
612  # Check that at most one CalibRepo is marked as default, to fail before
613  # we actually write anything.
614  defaultCalibRepos = [c.path for c in calibs if c.default]
615  if len(defaultCalibRepos) > 1:
616  raise ValueError(f"Multiple calib repos marked as default: {defaultCalibRepos}.")
617 
618  # Make converters for all Gen2 repos.
619  converters = []
620  # Start with the root repo, which must always be given even if we are
621  # not configured to convert anything from it.
622  rootConverter = RootRepoConverter(task=self, root=root, subset=subset, instrument=self.instrumentinstrument)
623  converters.append(rootConverter)
624  # Calibration repos are next.
625  for spec in calibs:
626  calibRoot = spec.path
627  if calibRoot is not None:
628  if not os.path.isabs(calibRoot):
629  calibRoot = os.path.join(rootConverter.root, calibRoot)
630  converter = CalibRepoConverter(task=self, root=calibRoot,
631  labels=spec.labels,
632  instrument=self.instrumentinstrument,
633  mapper=rootConverter.mapper,
634  subset=rootConverter.subset)
635  converters.append(converter)
636  # CalibRepo entries that don't have a path are just there for
637  # curated calibs and maybe to set up a collection pointer; that's
638  # handled further down (after we've done everything we can that
639  # doesn't involve actually writing to the output Gen3 repo).
640  # And now reruns.
641  rerunConverters = {}
642  for spec in reruns:
643  runRoot = spec.path
644  if not os.path.isabs(runRoot):
645  runRoot = os.path.join(rootConverter.root, runRoot)
646  spec.guessCollectionNames(self.instrumentinstrument, rootConverter.root)
647  converter = StandardRepoConverter(task=self, root=runRoot, run=spec.runName,
648  instrument=self.instrumentinstrument, subset=rootConverter.subset)
649  converters.append(converter)
650  rerunConverters[spec.runName] = converter
651 
652  # Register the instrument if we're configured to do so.
653  if self.configconfig.doRegisterInstrument:
654  self.instrumentinstrument.register(self.registryregistry)
655 
656  # Run raw ingest (does nothing if we weren't configured to convert the
657  # 'raw' dataset type).
658  rootConverter.runRawIngest(pool=pool)
659 
660  # Write curated calibrations to all calibration collections where they
661  # were requested (which may be implicit, by passing calibs=None). Also
662  # set up a CHAINED collection that points to the default CALIBRATION
663  # collection if one is needed.
664  for spec in calibs:
665  if spec.curated:
666  self.instrumentinstrument.writeCuratedCalibrations(self.butler3butler3, labels=spec.labels)
667  if spec.default and spec.labels:
668  # This is guaranteed to be True at most once in the loop by
669  # logic at the top of this method.
670  defaultCalibName = self.instrumentinstrument.makeCalibrationCollectionName()
671  self.butler3butler3.registry.registerCollection(defaultCalibName, CollectionType.CHAINED)
672  recommendedCalibName = self.instrumentinstrument.makeCalibrationCollectionName(*spec.labels)
673  self.butler3butler3.registry.registerCollection(recommendedCalibName, CollectionType.CALIBRATION)
674  self.butler3butler3.registry.setCollectionChain(defaultCalibName, [recommendedCalibName])
675 
676  # Define visits (also does nothing if we weren't configurd to convert
677  # the 'raw' dataset type).
678  rootConverter.runDefineVisits(pool=pool)
679 
680  # Walk Gen2 repos to find datasets convert.
681  for converter in converters:
682  converter.prep()
683 
684  # Insert dimensions that are potentially shared by all Gen2
685  # repositories (and are hence managed directly by the Task, rather
686  # than a converter instance).
687  # This also finishes setting up the (shared) converter.subsets object
688  # that is used to filter data IDs for config.relatedOnly.
689  self.registerUsedSkyMapsregisterUsedSkyMaps(rootConverter.subset)
690  self.registerUsedSkyPixregisterUsedSkyPix(rootConverter.subset)
691 
692  # Look for datasets, generally by scanning the filesystem.
693  # This requires dimensions to have already been inserted so we can use
694  # dimension information to identify related datasets.
695  for converter in converters:
696  converter.findDatasets()
697 
698  # Expand data IDs.
699  for converter in converters:
700  converter.expandDataIds()
701 
702  # Actually ingest datasets.
703  for converter in converters:
704  converter.ingest()
705 
706  # Perform any post-ingest processing.
707  for converter in converters:
708  converter.finish()
709 
710  # Make the umbrella collection, if desired.
711  if self.configconfig.doMakeUmbrellaCollection:
712  umbrella = self.instrumentinstrument.makeUmbrellaCollectionName()
713  self.registryregistry.registerCollection(umbrella, CollectionType.CHAINED)
714  children = list(self.registryregistry.getCollectionChain(umbrella))
715  children.extend(rootConverter.getCollectionChain())
716  children.append(self.instrumentinstrument.makeCalibrationCollectionName())
717  if BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME not in children:
718  # Ensure the umbrella collection includes the global skymap
719  # collection, even if it's currently empty.
720  self.registryregistry.registerRun(BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME)
721  children.append(BaseSkyMap.SKYMAP_RUN_COLLECTION_NAME)
722  children.extend(self.configconfig.extraUmbrellaChildren)
723  self.loglog.info("Defining %s from chain %s.", umbrella, children)
724  self.registryregistry.setCollectionChain(umbrella, children)
725 
726  # Add chained collections for reruns.
727  for spec in reruns:
728  if spec.chainName is not None:
729  self.butler3butler3.registry.registerCollection(spec.chainName, type=CollectionType.CHAINED)
730  chain = [spec.runName]
731  chain.extend(rerunConverters[spec.runName].getCollectionChain())
732  for parent in spec.parents:
733  chain.append(parent)
734  parentConverter = rerunConverters.get(parent)
735  if parentConverter is not None:
736  chain.extend(parentConverter.getCollectionChain())
737  chain.extend(rootConverter.getCollectionChain())
738  if len(calibs) == 1:
739  # Exactly one calibration repo being converted, so it's
740  # safe-ish to assume that's the one the rerun used.
741  chain.append(self.instrumentinstrument.makeCalibrationCollectionName(*calibs[0].labels))
742  self.loglog.info("Defining %s from chain %s.", spec.chainName, chain)
743  self.butler3butler3.registry.setCollectionChain(spec.chainName, chain)
def __init__(self, config=None, *Butler3 butler3, Instrument instrument, **kwargs)
Definition: convertRepo.py:426
def isDatasetTypeIncluded(self, str datasetTypeName)
Definition: convertRepo.py:461
def registerUsedSkyPix(self, Optional[ConversionSubset] subset)
Definition: convertRepo.py:552
def run(self, str root, *Optional[List[CalibRepo]] calibs=None, Optional[List[Rerun]] reruns=None, Optional[Iterable[int]] visits=None, Optional[Pool] pool=None, int processes=1)
Definition: convertRepo.py:574
str useSkyMap(self, BaseSkyMap skyMap, str skyMapName)
Definition: convertRepo.py:485
def registerUsedSkyMaps(self, Optional[ConversionSubset] subset)
Definition: convertRepo.py:520
def useSkyPix(self, SkyPixDimension dimension)
Definition: convertRepo.py:539
None guessCollectionNames(self, Instrument instrument, str root)
Definition: convertRepo.py:116
def makeSubtask(self, name, **keyArgs)
Definition: task.py:299
def getName(self)
Definition: task.py:274
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def writeCuratedCalibrations(repo, instrument, collection, labels)
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912