LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
repoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["RepoConverter"]
24 
25 import os
26 import fnmatch
27 from dataclasses import dataclass
28 from collections import defaultdict
29 from abc import ABC, abstractmethod
30 from typing import TYPE_CHECKING, Generic, TypeVar, List, Tuple, Optional, Iterator, Set, Any, Callable, Dict
31 
32 from lsst.daf.butler import DatasetRef, Butler as Butler3, DataCoordinate, FileDataset
33 from lsst.sphgeom import RangeSet, Region
34 
35 from .filePathParser import FilePathParser
36 
37 if TYPE_CHECKING:
38  from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping
39  from .dataIdExtractor import DataIdExtractor
40  from .convertRepo import ConvertRepoTask
41  from lsst.daf.butler import StorageClass, Registry, SkyPixDimension
42 
43 
44 REPO_ROOT_FILES = ("registry.sqlite3", "_mapper", "repositoryCfg.yaml", "calibRegistry.sqlite3", "_parent")
45 
46 
47 T = TypeVar("T")
48 
49 
50 class MostRecentlyUsedStack(Generic[T]):
51  """A simple container that maintains a most-recently-used ordering.
52  """
53 
54  def __init__(self):
55  self._elements = []
56 
57  def __iter__(self):
58  # Iterate in reverse order so we can keep the most recent element used
59  # at the end of the list. We want to use the end rather than the
60  # beginning because appending to lists is much more efficient than
61  # inserting at the beginning.
62  yield from reversed(self._elements)
63 
64  def apply(self, func: Callable[[T], Any]) -> Any:
65  """Apply a function to elements until it returns a value that coerces
66  to `True`, and move the corresponding element to the front of the
67  stack.
68 
69  Parameters
70  ----------
71  func : callable
72  Callable object.
73 
74  Returns
75  -------
76  value : `object`
77  The first value returned by ``func`` that coerces to `True`.
78  """
79  for n, element in enumerate(self):
80  result = func(element)
81  if result:
82  break
83  else:
84  return None
85  # Move the extractor that matched to the back of the list (note that
86  # n indexes from the back of the internal list).
87  if n != 0:
88  # i indexes from the front of the internal list.
89  i = len(self._elements) - 1 - n
90  assert self._elements[i] is element
91  del self._elements[i]
92  self._elements.append(element)
93  return result
94 
95  def push(self, element):
96  """Add a new element to the front of the stack.
97  """
98  self._elements.append(element)
99 
100 
101 @dataclass
103  """A helper class for `ConvertRepoTask` and `RepoConverter` that maintains
104  lists of related data ID values that should be included in the conversion.
105 
106  Parameters
107  ----------
108  instrument : `str`
109  Instrument name used in Gen3 data IDs.
110  visits : `set` of `int`
111  Visit IDs that define the filter.
112  """
113 
114  def __init__(self, instrument: str, visits: Set[int]):
115  self.instrument = instrument
116  self.visits = visits
117  self.regions = None
118  self.tracts = {}
119  self.skypix = {}
120 
121  def addSkyMap(self, registry: Registry, name: str):
122  """Populate the included tract IDs for the given skymap from those that
123  overlap the visits the `ConversionSubset` was initialized with.
124 
125  Parameters
126  ----------
127  registry : `lsst.daf.butler.Registry`
128  Registry that can be queried for visit/tract overlaps.
129  name : `str`
130  SkyMap name used in Gen3 data IDs.
131  """
132  tracts = set()
133  self.tracts[name] = tracts
134  for visit in self.visits:
135  for dataId in self.registry.queryDimensions(["tract"], expand=False,
136  dataId={"skymap": name, "visit": visit}):
137  tracts.add(dataId["tract"])
138  self.task.log.info("Limiting datasets defined on skymap %s to %s tracts.", name, len(tracts))
139 
140  def addSkyPix(self, registry: Registry, dimension: SkyPixDimension):
141  """Populate the included skypix IDs for the given dimension from those
142  that overlap the visits the `ConversionSubset` was initialized with.
143 
144  Parameters
145  ----------
146  registry : `lsst.daf.butler.Registry`
147  Registry that can be queried for visit regions.
148  name : `str`
149  SkyMap name used in Gen3 data IDs.
150  """
151  if self.regions is None:
152  self.regions = []
153  for visit in self.visits:
154  dataId = registry.expandDataId(instrument=self.instrument, visit=visit)
155  self.regions.append(dataId.region)
156  ranges = RangeSet()
157  for region in self.regions:
158  ranges = ranges.join(dimension.pixelization.envelope(region))
159  self.skypix[dimension] = ranges
160 
161  def isRelated(self, dataId: DataCoordinate) -> bool:
162  """Test whether the given data ID is related to this subset and hence
163  should be included in a repository conversion.
164 
165  Parameters
166  ----------
167  dataId : `lsst.daf.butler.DataCoordinate`
168  Data ID to test.
169 
170  Returns
171  -------
172  related : `bool`
173  `True` if this data ID should be included in a repository
174  conversion.
175 
176  Notes
177  -----
178  More formally, this tests that the given data ID is not unrelated;
179  if a data ID does not involve tracts, visits, or skypix dimensions,
180  we always include it.
181  """
182  if self.visits is None:
183  # We're not filtering at all.
184  return True
185  if "visit" in dataId.graph and dataId["visit"] not in self.visits:
186  return False
187  if "tract" in dataId.graph and dataId["tract"] not in self.tracts[dataId["skymap"]]:
188  return False
189  for dimension, ranges in self.skypix.items():
190  if dimension in dataId.graph and not ranges.intersects(dataId[dimension]):
191  return False
192  return True
193 
194  # Class attributes that will be shadowed by public instance attributes;
195  # defined here only for documentation purposes.
196 
197  instrument: str
198  """The name of the instrument, as used in Gen3 data IDs (`str`).
199  """
200 
201  visits: Set[int]
202  """The set of visit IDs that should be included in the conversion (`set`
203  of `int`).
204  """
205 
206  regions: Optional[List[Region]]
207  """Regions for all visits (`list` of `lsst.sphgeom.Region`).
208 
209  Set to `None` before it has been initialized. Any code that attempts to
210  use it when it is `None` has a logic bug.
211  """
212 
213  tracts: Dict[str, Set[int]]
214  """Tracts that should be included in the conversion, grouped by skymap
215  name (`dict` mapping `str` to `set` of `int`).
216  """
217 
218  skypix: Dict[SkyPixDimension, RangeSet]
219  """SkyPix ranges that should be included in the conversion, grouped by
220  dimension (`dict` mapping `SkyPixDimension` to `lsst.sphgeom.RangeSet`).
221  """
222 
223 
224 class RepoConverter(ABC):
225  """An abstract base class for objects that help `ConvertRepoTask` convert
226  datasets from a single Gen2 repository.
227 
228  Parameters
229  ----------
230  task : `ConvertRepoTask`
231  Task instance that is using this helper object.
232  root : `str`
233  Root of the Gen2 repo being converted.
234  collections : `list` of `str`
235  Gen3 collections with which all converted datasets should be
236  associated.
237  subset : `ConversionSubset, optional
238  Helper object that implements a filter that restricts the data IDs that
239  are converted.
240 
241  Notes
242  -----
243  `RepoConverter` defines the only public API users of its subclasses should
244  use (`prep`, `insertDimensionRecords`, and `ingest`). These delegate to
245  several abstract methods that subclasses must implement. In some cases,
246  subclasses may reimplement the public methods as well, but are expected to
247  delegate to ``super()`` either at the beginning or end of their own
248  implementation.
249  """
250 
251  def __init__(self, *, task: ConvertRepoTask, root: str, collections: List[str],
252  subset: Optional[ConversionSubset] = None):
253  self.task = task
254  self.root = root
255  self.subset = subset
256  self._collections = list(collections)
257  self._extractors: MostRecentlyUsedStack[DataIdExtractor] = MostRecentlyUsedStack()
258  self._skipParsers: MostRecentlyUsedStack[Tuple[FilePathParser, str, str]] = MostRecentlyUsedStack()
259 
260  @abstractmethod
261  def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
262  """Test whether the given dataset is handled specially by this
263  converter and hence should be ignored by generic base-class logic that
264  searches for dataset types to convert.
265 
266  Parameters
267  ----------
268  datasetTypeName : `str`
269  Name of the dataset type to test.
270 
271  Returns
272  -------
273  special : `bool`
274  `True` if the dataset type is special.
275  """
276  raise NotImplementedError()
277 
278  @abstractmethod
279  def isDirectorySpecial(self, subdirectory: str) -> bool:
280  """Test whether the given directory is handled specially by this
281  converter and hence should be ignored by generic base-class logic that
282  searches for datasets to convert.
283 
284  Parameters
285  ----------
286  subdirectory : `str`
287  Subdirectory. This is only ever a single subdirectory, and it
288  could appear anywhere within a repo root. (A full path relative
289  to the repo root might be more useful, but it is harder to
290  implement, and we don't currently need it to identify any special
291  directories).
292 
293  Returns
294  -------
295  special : `bool`
296  `True` if the direct is special.
297  """
298  raise NotImplementedError()
299 
300  @abstractmethod
301  def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
302  """Iterate over all `CameraMapper` `Mapping` objects that should be
303  considered for conversion by this repository.
304 
305  This this should include any datasets that may appear in the
306  repository, including those that are special (see
307  `isDatasetTypeSpecial`) and those that are being ignored (see
308  `ConvertRepoTask.isDatasetTypeIncluded`); this allows the converter
309  to identify and hence skip these datasets quietly instead of warning
310  about them as unrecognized.
311 
312  Yields
313  ------
314  datasetTypeName: `str`
315  Name of the dataset type.
316  mapping : `lsst.obs.base.mapping.Mapping`
317  Mapping object used by the Gen2 `CameraMapper` to describe the
318  dataset type.
319  """
320  raise NotImplementedError()
321 
322  @abstractmethod
323  def makeDataIdExtractor(self, datasetTypeName: str, parser: FilePathParser,
324  storageClass: StorageClass) -> DataIdExtractor:
325  """Construct a `DataIdExtractor` instance appropriate for a particular
326  dataset type.
327 
328  Parameters
329  ----------
330  datasetTypeName : `str`
331  Name of the dataset type; typically forwarded directly to
332  the `DataIdExtractor` constructor.
333  parser : `FilePathParser`
334  Object that parses filenames into Gen2 data IDs; typically
335  forwarded directly to the `DataIdExtractor` constructor.
336  storageClass : `lsst.daf.butler.StorageClass`
337  Storage class for this dataset type in the Gen3 butler; typically
338  forwarded directly to the `DataIdExtractor` constructor.
339 
340  Returns
341  -------
342  extractor : `DataIdExtractor`
343  A new `DataIdExtractor` instance.
344  """
345  raise NotImplementedError()
346 
347  def iterDatasets(self) -> Iterator[FileDataset]:
348  """Iterate over all datasets in the repository that should be
349  ingested into the Gen3 repository.
350 
351  Subclasses may override this method, but must delegate to the base
352  class implementation at some point in their own logic.
353 
354  Yields
355  ------
356  dataset : `FileDataset`
357  Structures representing datasets to be ingested. Paths should be
358  absolute.
359  ref : `lsst.daf.butler.DatasetRef`
360  Reference for the Gen3 datasets, including a complete `DatasetType`
361  and data ID.
362  """
363  for dirPath, subdirNamesInDir, fileNamesInDir in os.walk(self.root, followlinks=True):
364  # Remove subdirectories that appear to be repositories themselves
365  # from the walking
366  def isRepoRoot(dirName):
367  return any(os.path.exists(os.path.join(dirPath, dirName, f))
368  for f in REPO_ROOT_FILES)
369  subdirNamesInDir[:] = [d for d in subdirNamesInDir
370  if not isRepoRoot(d) and not self.isDirectorySpecial(d)]
371  # Loop over files in this directory, and ask per-DatasetType
372  # extractors if they recognize them and can extract a data ID;
373  # if so, ingest.
374  dirPathInRoot = dirPath[len(self.root) + len(os.path.sep):]
375  for fileNameInDir in fileNamesInDir:
376  if any(fnmatch.fnmatchcase(fileNameInDir, pattern)
377  for pattern in self.task.config.fileIgnorePatterns):
378  continue
379  fileNameInRoot = os.path.join(dirPathInRoot, fileNameInDir)
380  if fileNameInRoot in REPO_ROOT_FILES:
381  continue
382  ref = self._extractDatasetRef(fileNameInRoot)
383  if ref is not None:
384  if self.subset is None or self.subset.isRelated(ref.dataId):
385  yield FileDataset(path=os.path.join(self.root, fileNameInRoot), ref=ref)
386  else:
387  self._handleUnrecognizedFile(fileNameInRoot)
388 
389  def prep(self):
390  """Prepare the repository by identifying the dataset types to be
391  converted and building `DataIdExtractor` instance for them.
392 
393  Subclasses may override this method, but must delegate to the base
394  class implementation at some point in their own logic. More often,
395  subclasses will specialize the behavior of `prep` simply by overriding
396  `iterMappings`, `isDatasetTypeSpecial`, and `makeDataIdExtractor`, to
397  which the base implementation delegates.
398 
399  This should not perform any write operations to the Gen3 repository.
400  It is guaranteed to be called before `insertDimensionData` and
401  `ingest`.
402  """
403  self.task.log.info(f"Preparing other datasets from root {self.root}.")
404  for datasetTypeName, mapping in self.iterMappings():
405  try:
406  parser = FilePathParser.fromMapping(mapping)
407  except RuntimeError:
408  # No template, so there should be no way we'd get one of these
409  # in the Gen2 repo anyway (and if we do, we'll still produce a
410  # warning - just a less informative one than we might be able
411  # to produce if we had a template).
412  continue
413  if (not self.task.isDatasetTypeIncluded(datasetTypeName) or
414  self.isDatasetTypeSpecial(datasetTypeName)):
415  # User indicated not to include this data, but we still want
416  # to recognize files of that type to avoid warning about them.
417  self._skipParsers.push((parser, datasetTypeName, None))
418  continue
419  storageClass = self._guessStorageClass(datasetTypeName, mapping)
420  if storageClass is None:
421  # This may be a problem, but only if we actually encounter any
422  # files corresponding to this dataset. Of course, we need
423  # to be able to parse those files in order to recognize that
424  # situation.
425  self._skipParsers.push((parser, datasetTypeName, "no storage class found."))
426  continue
427  self._extractors.push(self.makeDataIdExtractor(datasetTypeName, parser, storageClass))
428 
430  """Insert any dimension records uniquely derived from this repository
431  into the registry.
432 
433  Subclasses may override this method, but may not need to; the default
434  implementation does nothing.
435 
436  SkyMap and SkyPix dimensions should instead be handled by calling
437  `ConvertRepoTask.useSkyMap` or `ConvertRepoTask.useSkyPix`, because
438  these dimensions are in general shared by multiple Gen2 repositories.
439 
440  This method is guaranteed to be called between `prep` and `ingest`.
441  """
442  pass
443 
444  def ingest(self):
445  """Insert converted datasets into the Gen3 repository.
446 
447  Subclasses may override this method, but must delegate to the base
448  class implementation at some point in their own logic. More often,
449  subclasses will specialize the behavior of `ingest` simply by
450  overriding `iterDatasets` and `isDirectorySpecial`, to which the base
451  implementation delegates.
452 
453  This method is guaranteed to be called after both `prep` and
454  `insertDimensionData`.
455  """
456  self.task.log.info("Finding datasets in repo %s.", self.root)
457  datasetsByType = defaultdict(list)
458  for dataset in self.iterDatasets():
459  datasetsByType[dataset.ref.datasetType].append(dataset)
460  for datasetType, datasetsForType in datasetsByType.items():
461  self.task.registry.registerDatasetType(datasetType)
462  self.task.log.info("Ingesting %s %s datasets.", len(datasetsForType), datasetType.name)
463  try:
464  butler3, collections = self.getButler(datasetType.name)
465  except LookupError as err:
466  self.task.log.warn(str(err))
467  continue
468  try:
469  butler3.ingest(*datasetsForType, transfer=self.task.config.transfer)
470  except LookupError as err:
471  raise LookupError(f"Error expanding data ID for dataset type {datasetType.name}.") from err
472  for collection in collections:
473  self.task.registry.associate(collection, [dataset.ref for dataset in datasetsForType])
474 
475  def getButler(self, datasetTypeName: str) -> Tuple[Butler3, List[str]]:
476  """Create a new Gen3 Butler appropriate for a particular dataset type.
477 
478  This should be used exclusively by subclasses when obtaining a butler
479  to use for dataset ingest (`ConvertRepoTask.butler3` should never be
480  used directly).
481 
482  Parameters
483  ----------
484  datasetTypeName : `str`
485  Name of the dataset type.
486 
487  Returns
488  -------
489  butler : `lsst.daf.butler.Butler`
490  Gen3 Butler instance appropriate for ingesting the given dataset
491  type.
492  collections : `list` of `str`
493  Collections the dataset should be associated with, in addition to
494  the one used to define the `lsst.daf.butler.Run` used in
495  ``butler``.
496  """
497  if datasetTypeName in self.task.config.collections:
498  return (
499  Butler3(butler=self.task.butler3, run=self.task.config.collections[datasetTypeName]),
500  self._collections,
501  )
502  elif self._collections:
503  return (
504  Butler3(butler=self.task.butler3, run=self._collections[0]),
505  self._collections[1:],
506  )
507  else:
508  raise LookupError("No collection configured for dataset type {datasetTypeName}.")
509 
510  def _extractDatasetRef(self, fileNameInRoot: str) -> Optional[DatasetRef]:
511  """Extract a `DatasetRef` from a file name.
512 
513  This method is for internal use by `RepoConverter` itself (not its
514  subclasses).
515 
516  Parameters
517  ----------
518  fileNameInRoot : `str`
519  Name of the file to be ingested, relative to the repository root.
520 
521  Returns
522  -------
523  ref : `lsst.daf.butler.DatasetRef` or `None`
524  Reference for the Gen3 datasets, including a complete `DatasetType`
525  and data ID. `None` if the converter does not recognize the
526  file as one to be converted.
527  """
528  def closure(extractor):
529  try:
530  dataId = extractor.apply(fileNameInRoot)
531  except LookupError as err:
532  raise RuntimeError(f"Error extracting data ID for {extractor.datasetType.name} "
533  f"on file {fileNameInRoot}.") from err
534  if dataId is None:
535  return None
536  else:
537  return DatasetRef(extractor.datasetType, dataId=dataId)
538  return self._extractors.apply(closure)
539 
540  def _handleUnrecognizedFile(self, fileNameInRoot: str):
541  """Generate appropriate warnings (or not) for files not matched by
542  `_extractDatasetRef`.
543 
544  This method is for internal use by `RepoConverter` itself (not its
545  subclasses).
546 
547  Parameters
548  ----------
549  fileNameInRoot : `str`
550  Name of the file, relative to the repository root.
551  """
552  def closure(skipTuple):
553  parser, datasetTypeName, message = skipTuple
554  if parser(fileNameInRoot) is not None:
555  if message is not None:
556  self.task.log.warn("Skipping dataset %s file %s: %s", datasetTypeName,
557  fileNameInRoot, message)
558  return True
559  return False
560  if not self._skipParsers.apply(closure):
561  self.task.log.warn("Skipping unrecognized file %s.", fileNameInRoot)
562 
563  def _guessStorageClass(self, datasetTypeName: str, mapping: CameraMapperMapping
564  ) -> Optional[StorageClass]:
565  """Infer the Gen3 `StorageClass` from a dataset from a combination of
566  configuration and Gen2 dataset type information.
567 
568  datasetTypeName: `str`
569  Name of the dataset type.
570  mapping : `lsst.obs.base.mapping.Mapping`
571  Mapping object used by the Gen2 `CameraMapper` to describe the
572  dataset type.
573  """
574  storageClassName = self.task.config.storageClasses.get(datasetTypeName)
575  if storageClassName is None and mapping.python is not None:
576  storageClassName = self.task.config.storageClasses.get(mapping.python, None)
577  if storageClassName is None and mapping.persistable is not None:
578  storageClassName = self.task.config.storageClasses.get(mapping.persistable, None)
579  if storageClassName is None and mapping.python is not None:
580  unqualified = mapping.python.split(".")[-1]
581  storageClassName = self.task.config.storageClasses.get(unqualified, None)
582  if storageClassName is not None:
583  storageClass = self.task.butler3.storageClasses.getStorageClass(storageClassName)
584  else:
585  try:
586  storageClass = self.task.butler3.storageClasses.getStorageClass(mapping.persistable)
587  except KeyError:
588  storageClass = None
589  if storageClass is None and mapping.python is not None:
590  try:
591  storageClass = self.task.butler3.storageClasses.getStorageClass(unqualified)
592  except KeyError:
593  pass
594  if storageClass is None:
595  self.task.log.debug("No StorageClass found for %s; skipping.", datasetTypeName)
596  else:
597  self.task.log.debug("Using StorageClass %s for %s.", storageClass.name, datasetTypeName)
598  return storageClass
599 
600  # Class attributes that will be shadowed by public instance attributes;
601  # defined here only for documentation purposes.
602 
603  task: ConvertRepoTask
604  """The parent task that constructed and uses this converter
605  (`ConvertRepoTask`).
606  """
607 
608  root: str
609  """Root path to the Gen2 repository this converter manages (`str`).
610 
611  This is a complete path, not relative to some other repository root.
612  """
613 
614  subset: Optional[ConversionSubset]
615  """An object that represents a filter to be applied to the datasets that
616  are converted (`ConversionSubset` or `None`).
617  """
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:902
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
A RangeSet is a set of unsigned 64 bit integers.
Definition: RangeSet.h:99
daf::base::PropertyList * list
Definition: fits.cc:903