LSSTApplications  18.1.0
LSSTDataManagementBasePackage
repoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 __all__ = ("RepoConverter", "DataIdExtractor")
23 
24 import os
25 import pickle
26 from collections import OrderedDict # for move_to_end
27 
28 import yaml
29 
30 # register YAML loader for repositoryCfg.yaml files.
31 import lsst.daf.persistence.repositoryCfg # noqa: F401
32 
33 from lsst.daf.butler import DataId, DatasetType, DatasetRef
34 from lsst.daf.butler.gen2convert import FilePathParser, Translator
35 from lsst.log import Log
36 from lsst.log.utils import temporaryLogLevel
37 from lsst.utils import doImport
38 
39 
40 def findMapperClass(root):
41  """Find the mapper class associated with a Gen2 data repository root.
42 
43  Parameters
44  ----------
45  root : `str`
46  Path to a Gen2 repository root directory.
47 
48  Returns
49  -------
50  cls : `type`
51  A subclass of `lsst.obs.base.CameraMapper`.
52 
53  Raises
54  ------
55  ValueError
56  Raised if the directory does not appear to be the root of a
57  Gen2 data repository.
58  """
59  cfgPath = os.path.join(root, "repositoryCfg.yaml")
60  if os.path.exists(cfgPath):
61  with open(cfgPath, "r") as f:
62  repoCfg = yaml.load(f, Loader=yaml.UnsafeLoader)
63  return repoCfg.mapper
64  parentLinkPath = os.path.join(root, "_parent")
65  if os.path.exists(parentLinkPath):
66  return findMapperClass(os.readlink(parentLinkPath))
67  mapperFilePath = os.path.join(root, "_mapper")
68  if os.path.exists(mapperFilePath):
69  with open(mapperFilePath, "r") as f:
70  mapperClassPath = f.read().strip()
71  return doImport(mapperClassPath)
72  calibRegistryPath = os.path.join(root, "calibRegistry.sqlite3")
73  if os.path.exists(calibRegistryPath):
74  return findMapperClass(os.path.normpath(os.path.join(root, os.path.pardir)))
75  raise ValueError(f"Could not determine (Gen2) mapper class for repo at '{root}'.")
76 
77 
79  """A class that extracts Gen3 data IDs from Gen2 filenames for a
80  particular dataset type.
81 
82  Parameters
83  ----------
84  datasetTypeName : `str`
85  Name of the dataset type the object will process.
86  storageClass : `str` or `lsst.daf.butler.StorageClass`
87  Gen3 storage class of the dataset type.
88  universe : `lsst.daf.butler.DimensionUniverse`
89  Object containing all dimension definitions.
90  baseDataId : `dict`
91  Key-value pairs that may need to appear in the Gen3 data ID, but can
92  never be inferred from a Gen2 filename. This should always include
93  the instrument name (even Gen3 data IDs that don't involve the
94  instrument dimension have instrument-dependent Gen2 filenames) and
95  should also include the skymap name for any data ID that involves
96  tracts or patches.
97  filePathParser : `lsst.daf.butler.gen2convert.FilePathParser`, optional
98  Object responsible for reading a Gen2 data ID from a filename. Will
99  be created from ``mapper`` if not provided.
100  translator : `lsst.daf.butler.gen2convert.Translator`, optional
101  Object responsible for converting a Gen2 data ID into a Gen3 data ID.
102  Will be created if not provided.
103  mapper : `lsst.obs.base.CameraMapper`, optional
104  Object that defines Gen2 filename templates. Must be provided if
105  ``filePathParser`` is not.
106  skyMap : `lsst.skymap.BaseSkyMap`, optional
107  SkyMap that defines tracts and patches. Must be provided for datasets
108  with a ``patch`` key in their data IDs.
109  """
110 
111  def __init__(self, datasetTypeName, storageClass, *, universe, baseDataId,
112  filePathParser=None, translator=None, mapper=None, skyMap=None):
113  if filePathParser is None:
114  filePathParser = FilePathParser.fromMapping(mapper.mappings[datasetTypeName])
115  self.filePathParser = filePathParser
116  if translator is None:
117  translator = Translator.makeMatching(filePathParser.datasetType, baseDataId, skyMap=skyMap)
118  self.translator = translator
119  self.datasetType = DatasetType(datasetTypeName, dimensions=self.translator.dimensions,
120  storageClass=storageClass)
121  self.datasetType.normalize(universe=universe)
122 
123  def apply(self, fileNameInRoot):
124  """Extract a Gen3 data ID from the given filename,
125 
126  Parameters
127  ----------
128  fileNameInRoot : `str`
129  Filename relative to a Gen2 data repository root.
130 
131  Returns
132  -------
133  dataId : `lsst.daf.butler.DataId` or `None`
134  The Gen3 data ID, or `None` if the file was not recognized as an
135  instance of the extractor's dataset type.
136  """
137  gen2id = self.filePathParser(fileNameInRoot)
138  if gen2id is None:
139  return None
140  return DataId(self.translator(gen2id), dimensions=self.datasetType.dimensions)
141 
142 
144  """A helper class that ingests (some of) the contents of a Gen2 data
145  repository into a Gen3 data repository.
146 
147  Parameters
148  ----------
149  root : `str`
150  Root of the Gen2 data repository.
151  universe : `lsst.daf.butler.DimensionUniverse`
152  Object containing all dimension definitions.
153  baseDataId : `dict`
154  Key-value pairs that may need to appear in the Gen3 data ID, but can
155  never be inferred from a Gen2 filename. This should always include
156  the instrument name (even Gen3 data IDs that don't involve the
157  instrument dimension have instrument-dependent Gen2 filenames) and
158  should also include the skymap name in order to process any data IDs
159  that involve tracts or patches.
160  mapper : `lsst.obs.base.CameraMapper`, optional
161  Object that defines Gen2 filename templates. Will be identified,
162  imported, and constructed from ``root`` if not provided.
163  skyMap : `lsst.skymap.BaseSkyMap`, optional
164  SkyMap that defines tracts and patches. Must be provided in order to
165  provess datasets with a ``patch`` key in their data IDs.
166  """
167 
168  COADD_NAMES = ("deep", "goodSeeing", "dcr")
169  REPO_ROOT_FILES = ("registry.sqlite3", "_mapper", "repositoryCfg.yaml",
170  "calibRegistry.sqlite3", "_parent")
171 
172  def __init__(self, root, *, universe, baseDataId, mapper=None, skyMap=None):
173  self.root = root
174  if mapper is None:
175  # Shush spurious log messages from Gen2 Mapper classes.
176  # These aren't spurious in other contexts - we're just playing fast
177  # and loose with mapper initialization, because we don't care about
178  # things like parent lookups (we just want the set of templates).
179  with temporaryLogLevel("CameraMapper", Log.ERROR):
180  with temporaryLogLevel("HscMapper", Log.ERROR):
181  cls = findMapperClass(root)
182  mapper = cls(root=root)
183  self.mapper = mapper
184  self.universe = universe
185  self.baseDataId = baseDataId
186  self.extractors = OrderedDict() # for move_to_end
187  if "skymap" in baseDataId:
188  if skyMap is None:
189  for name in self.COADD_NAMES:
190  mapping = self.mapper.mappings.get(f"{name}Coadd_skyMap", None)
191  if mapping is None:
192  continue
193  filename = os.path.join(self.root, mapping.template)
194  if os.path.exists(filename):
195  if skyMap is not None:
196  raise ValueError("Multiple SkyMaps found in repository; please use multiple "
197  "RepoConverters with an explicit skyMap argument for each.")
198  with open(filename, "rb") as f:
199  skyMap = pickle.load(f, encoding="latin1")
200  self.skyMap = skyMap
201 
202  def addDatasetType(self, datasetTypeName, storageClass):
203  """Add a dataset type to those recognized by the converter.
204 
205  Parameters
206  ----------
207  datasetTypeName : `str`
208  String name of the dataset type.
209  storageClass : `str` or `lsst.daf.butler.StorageClass`
210  Gen3 storage class of the dataset type.
211 
212  Returns
213  -------
214  extractor : `DataIdExtractor`
215  The object that will be used to extract data IDs for instances of
216  this dataset type (also held internally, so the return value can
217  usually be ignored).
218  """
219  r = DataIdExtractor(datasetTypeName, storageClass, mapper=self.mapper,
220  universe=self.universe, baseDataId=self.baseDataId, skyMap=self.skyMap)
221  self.extractors[datasetTypeName] = r
222  return r
223 
224  def extractDatasetRef(self, fileNameInRoot):
225  """Extract a Gen3 `~lsst.daf.butler.DatasetRef` from a filename in a
226  Gen2 data repository.
227 
228  Parameters
229  ----------
230  fileNameInRoot : `str`
231  Name of the file, relative to the root of its Gen2 repository.
232 
233  Return
234  ------
235  ref : `lsst.daf.butler.DatasetRef` or `None`
236  Reference to the Gen3 dataset that would be created by converting
237  this file, or `None` if the file is not recognized as an instance
238  of a dataset type known to this converter.
239  """
240  for datasetTypeName, extractor in self.extractors.items():
241  dataId = extractor.apply(fileNameInRoot)
242  if dataId is not None:
243  # Move the extractor that matched to the front of the
244  # dictionary, as we're likely to see instances of the
245  # same DatasetType together.
246  self.extractors.move_to_end(datasetTypeName, last=False)
247  return DatasetRef(extractor.datasetType, dataId=dataId)
248  return None
249 
250  def walkRepo(self, directory=None, skipDirs=()):
251  """Recursively a (subset of) a Gen2 data repository, yielding files
252  that may be convertible.
253 
254  Parameters
255  ----------
256  directory : `str`, optional
257  A subdirectory of the repository root to process, instead of
258  processing the entire repository.
259  skipDirs : sequence of `str`
260  Subdirectories that should be skipped.
261 
262  Yields
263  ------
264  fileNameInRoot : `str`
265  Name of a file in the repository, relative to the root of the
266  repository.
267  """
268  if directory is None:
269  directory = self.root
270  for dirPath, subdirNamesInDir, fileNamesInDir in os.walk(directory, followlinks=True):
271  # Remove subdirectories that appear to be repositories themselves
272  # from the walking
273  def isRepoRoot(dirName):
274  return any(os.path.exists(os.path.join(dirPath, dirName, f))
275  for f in self.REPO_ROOT_FILES)
276  subdirNamesInDir[:] = [d for d in subdirNamesInDir if not isRepoRoot(d) and d not in skipDirs]
277  # Loop over files in this directory, and ask per-DatasetType
278  # extractors if they recognize them and can extract a data ID;
279  # if so, ingest.
280  dirPathInRoot = dirPath[len(self.root) + len(os.path.sep):]
281  for fileNameInDir in fileNamesInDir:
282  fileNameInRoot = os.path.join(dirPathInRoot, fileNameInDir)
283  if fileNameInRoot in self.REPO_ROOT_FILES:
284  continue
285  yield fileNameInRoot
286 
287  def convertRepo(self, butler, *, directory=None, transfer=None, formatter=None, skipDirs=()):
288  """Ingest all recognized files into a Gen3 repository.
289 
290  Parameters
291  ----------
292  butler : `lsst.daf.butler.Butler`
293  Gen3 butler that files should be ingested into.
294  directory : `str`, optional
295  A subdirectory of the repository root to process, instead of
296  processing the entire repository.
297  transfer : str, optional
298  If not `None`, must be one of 'move', 'copy', 'hardlink', or
299  'symlink' indicating how to transfer the file.
300  formatter : `lsst.daf.butler.Formatter`, optional
301  Formatter that should be used to retreive the Dataset. If not
302  provided, the formatter will be constructed according to
303  Datastore configuration. This should only be used when converting
304  only a single dataset type multiple dataset types of the same
305  storage class.
306  skipDirs : sequence of `str`
307  Subdirectories that should be skipped.
308  """
309  log = Log.getLogger("RepoConverter")
310  for extractor in self.extractors.values():
311  butler.registry.registerDatasetType(extractor.datasetType)
312  skipped = {}
313  for file in self.walkRepo(directory=directory, skipDirs=skipDirs):
314  ref = self.extractDatasetRef(file)
315  if ref is not None:
316  try:
317  butler.ingest(os.path.join(self.root, file), ref, transfer=transfer, formatter=formatter)
318  except Exception as err:
319  skipped.setdefault(type(err), []).append(str(err))
320  if skipped:
321  for cls, messages in skipped.items():
322  log.warn("Skipped %s files due to exceptions of type %s.", len(messages), cls.__name__)
323  if log.isDebugEnabled():
324  for message in messages:
325  log.debug(message)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def temporaryLogLevel(name, level)
Definition: utils.py:45
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
Definition: Log.h:691
table::Key< int > type
Definition: Detector.cc:167
def convertRepo(self, butler, directory=None, transfer=None, formatter=None, skipDirs=())
def doImport(pythonType)
Definition: utils.py:106
def __init__(self, datasetTypeName, storageClass, universe, baseDataId, filePathParser=None, translator=None, mapper=None, skyMap=None)
def walkRepo(self, directory=None, skipDirs=())
def addDatasetType(self, datasetTypeName, storageClass)
std::vector< SchemaItem< Flag > > * items
def __init__(self, root, universe, baseDataId, mapper=None, skyMap=None)
bool strip
Definition: fits.cc:883