LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
mapping.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 from collections import OrderedDict
23 import os
24 import re
25 from lsst.daf.base import PropertySet
26 from lsst.daf.persistence import ButlerLocation, NoResults
27 from lsst.utils import doImport
28 from lsst.afw.image import Exposure, MaskedImage, Image, DecoratedImage
29 
30 __all__ = ["Mapping", "ImageMapping", "ExposureMapping", "CalibrationMapping", "DatasetMapping"]
31 
32 
33 class Mapping(object):
34 
35  """Mapping is a base class for all mappings. Mappings are used by
36  the Mapper to map (determine a path to some data given some
37  identifiers) and standardize (convert data into some standard
38  format or type) data, and to query the associated registry to see
39  what data is available.
40 
41  Subclasses must specify self.storage or else override self.map().
42 
43  Public methods: lookup, have, need, getKeys, map
44 
45  Mappings are specified mainly by policy. A Mapping policy should
46  consist of:
47 
48  template (string): a Python string providing the filename for that
49  particular dataset type based on some data identifiers. In the
50  case of redundancy in the path (e.g., file uniquely specified by
51  the exposure number, but filter in the path), the
52  redundant/dependent identifiers can be looked up in the registry.
53 
54  python (string): the Python type for the retrieved data (e.g.
55  lsst.afw.image.ExposureF)
56 
57  persistable (string): the Persistable registration for the on-disk data
58  (e.g. ImageU)
59 
60  storage (string, optional): Storage type for this dataset type (e.g.
61  "FitsStorage")
62 
63  level (string, optional): the level in the camera hierarchy at which the
64  data is stored (Amp, Ccd or skyTile), if relevant
65 
66  tables (string, optional): a whitespace-delimited list of tables in the
67  registry that can be NATURAL JOIN-ed to look up additional
68  information.
69 
70  Parameters
71  ----------
72  datasetType : `str`
73  Butler dataset type to be mapped.
74  policy : `daf_persistence.Policy`
75  Mapping Policy.
76  registry : `lsst.obs.base.Registry`
77  Registry for metadata lookups.
78  rootStorage : Storage subclass instance
79  Interface to persisted repository data.
80  provided : `list` of `str`
81  Keys provided by the mapper.
82  """
83 
84  def __init__(self, datasetType, policy, registry, rootStorage, provided=None):
85 
86  if policy is None:
87  raise RuntimeError("No policy provided for mapping")
88 
89  self.datasetType = datasetType
90  self.registry = registry
91  self.rootStorage = rootStorage
92 
93  self._template = policy['template'] # Template path
94  # in most cases, the template can not be used if it is empty, and is accessed via a property that will
95  # raise if it is used while `not self._template`. In this case we *do* allow it to be empty, for the
96  # purpose of fetching the key dict so that the mapping can be constructed, so that it can raise if
97  # it's invalid. I know it's a little odd, but it allows this template check to be introduced without a
98  # major refactor.
99  if self._template:
100  self.keyDict = dict([
101  (k, _formatMap(v, k, datasetType))
102  for k, v in
103  re.findall(r'\%\((\w+)\).*?([diouxXeEfFgGcrs])', self.template)
104  ])
105  else:
106  self.keyDict = {}
107  if provided is not None:
108  for p in provided:
109  if p in self.keyDict:
110  del self.keyDict[p]
111  self.python = policy['python'] # Python type
112  self.persistable = policy['persistable'] # Persistable type
113  self.storage = policy['storage']
114  if 'level' in policy:
115  self.level = policy['level'] # Level in camera hierarchy
116  if 'tables' in policy:
117  self.tables = policy.asArray('tables')
118  else:
119  self.tables = None
120  self.range = None
121  self.columns = None
122  self.obsTimeName = policy['obsTimeName'] if 'obsTimeName' in policy else None
123  self.recipe = policy['recipe'] if 'recipe' in policy else 'default'
124 
125  @property
126  def template(self):
127  if self._template: # template must not be an empty string or None
128  return self._template
129  else:
130  raise RuntimeError("Template is not defined for the {} dataset type, ".format(self.datasetType) +
131  "it must be set before it can be used.")
132 
133  def keys(self):
134  """Return the dict of keys and value types required for this mapping."""
135  return self.keyDict
136 
137  def map(self, mapper, dataId, write=False):
138  """Standard implementation of map function.
139 
140  Parameters
141  ----------
142  mapper: `lsst.daf.persistence.Mapper`
143  Object to be mapped.
144  dataId: `dict`
145  Dataset identifier.
146 
147  Returns
148  -------
149  lsst.daf.persistence.ButlerLocation
150  Location of object that was mapped.
151  """
152  actualId = self.need(iter(self.keyDict.keys()), dataId)
153  usedDataId = {key: actualId[key] for key in self.keyDict.keys()}
154  path = mapper._mapActualToPath(self.template, actualId)
155  if os.path.isabs(path):
156  raise RuntimeError("Mapped path should not be absolute.")
157  if not write:
158  # This allows mapped files to be compressed, ending in .gz or .fz, without any indication from the
159  # policy that the file should be compressed, easily allowing repositories to contain a combination
160  # of comporessed and not-compressed files.
161  # If needed we can add a policy flag to allow compressed files or not, and perhaps a list of
162  # allowed extensions that may exist at the end of the template.
163  for ext in (None, '.gz', '.fz'):
164  if ext and path.endswith(ext):
165  continue # if the path already ends with the extension
166  extPath = path + ext if ext else path
167  newPath = self.rootStorage.instanceSearch(extPath)
168  if newPath:
169  path = newPath
170  break
171  assert path, "Fully-qualified filename is empty."
172 
173  addFunc = "add_" + self.datasetType # Name of method for additionalData
174  if hasattr(mapper, addFunc):
175  addFunc = getattr(mapper, addFunc)
176  additionalData = addFunc(self.datasetType, actualId)
177  assert isinstance(additionalData, PropertySet), \
178  "Bad type for returned data: %s" (type(additionalData),)
179  else:
180  additionalData = None
181 
182  return ButlerLocation(pythonType=self.python, cppType=self.persistable, storageName=self.storage,
183  locationList=path, dataId=actualId.copy(), mapper=mapper,
184  storage=self.rootStorage, usedDataId=usedDataId, datasetType=self.datasetType,
185  additionalData=additionalData)
186 
187  def lookup(self, properties, dataId):
188  """Look up properties for in a metadata registry given a partial
189  dataset identifier.
190 
191  Parameters
192  ----------
193  properties : `list` of `str`
194  What to look up.
195  dataId : `dict`
196  Dataset identifier
197 
198  Returns
199  -------
200  `list` of `tuple`
201  Values of properties.
202  """
203  if self.registry is None:
204  raise RuntimeError("No registry for lookup")
205 
206  skyMapKeys = ("tract", "patch")
207 
208  where = []
209  values = []
210 
211  # Prepare to remove skymap entries from properties list. These must
212  # be in the data ID, so we store which ones we're removing and create
213  # an OrderedDict that tells us where to re-insert them. That maps the
214  # name of the property to either its index in the properties list
215  # *after* the skymap ones have been removed (for entries that aren't
216  # skymap ones) or the value from the data ID (for those that are).
217  removed = set()
218  substitutions = OrderedDict()
219  index = 0
220  properties = list(properties) # don't modify the original list
221  for p in properties:
222  if p in skyMapKeys:
223  try:
224  substitutions[p] = dataId[p]
225  removed.add(p)
226  except KeyError:
227  raise RuntimeError(
228  "Cannot look up skymap key '%s'; it must be explicitly included in the data ID" % p
229  )
230  else:
231  substitutions[p] = index
232  index += 1
233  # Can't actually remove while iterating above, so we do it here.
234  for p in removed:
235  properties.remove(p)
236 
237  fastPath = True
238  for p in properties:
239  if p not in ('filter', 'expTime', 'taiObs'):
240  fastPath = False
241  break
242  if fastPath and 'visit' in dataId and "raw" in self.tables:
243  lookupDataId = {'visit': dataId['visit']}
244  result = self.registry.lookup(properties, 'raw_visit', lookupDataId, template=self.template)
245  else:
246  if dataId is not None:
247  for k, v in dataId.items():
248  if self.columns and k not in self.columns:
249  continue
250  if k == self.obsTimeName:
251  continue
252  if k in skyMapKeys:
253  continue
254  where.append((k, '?'))
255  values.append(v)
256  lookupDataId = {k[0]: v for k, v in zip(where, values)}
257  if self.range:
258  # format of self.range is ('?', isBetween-lowKey, isBetween-highKey)
259  # here we transform that to {(lowKey, highKey): value}
260  lookupDataId[(self.range[1], self.range[2])] = dataId[self.obsTimeName]
261  result = self.registry.lookup(properties, self.tables, lookupDataId, template=self.template)
262  if not removed:
263  return result
264  # Iterate over the query results, re-inserting the skymap entries.
265  result = [tuple(v if k in removed else item[v] for k, v in substitutions.items())
266  for item in result]
267  return result
268 
269  def have(self, properties, dataId):
270  """Returns whether the provided data identifier has all
271  the properties in the provided list.
272 
273  Parameters
274  ----------
275  properties : `list of `str`
276  Properties required.
277  dataId : `dict`
278  Dataset identifier.
279 
280  Returns
281  -------
282  bool
283  True if all properties are present.
284  """
285  for prop in properties:
286  if prop not in dataId:
287  return False
288  return True
289 
290  def need(self, properties, dataId):
291  """Ensures all properties in the provided list are present in
292  the data identifier, looking them up as needed. This is only
293  possible for the case where the data identifies a single
294  exposure.
295 
296  Parameters
297  ----------
298  properties : `list` of `str`
299  Properties required.
300  dataId : `dict`
301  Partial dataset identifier
302 
303  Returns
304  -------
305  `dict`
306  Copy of dataset identifier with enhanced values.
307  """
308  newId = dataId.copy()
309  newProps = [] # Properties we don't already have
310  for prop in properties:
311  if prop not in newId:
312  newProps.append(prop)
313  if len(newProps) == 0:
314  return newId
315 
316  lookups = self.lookup(newProps, newId)
317  if len(lookups) != 1:
318  raise NoResults("No unique lookup for %s from %s: %d matches" %
319  (newProps, newId, len(lookups)),
320  self.datasetType, dataId)
321  for i, prop in enumerate(newProps):
322  newId[prop] = lookups[0][i]
323  return newId
324 
325 
326 def _formatMap(ch, k, datasetType):
327  """Convert a format character into a Python type."""
328  if ch in "diouxX":
329  return int
330  elif ch in "eEfFgG":
331  return float
332  elif ch in "crs":
333  return str
334  else:
335  raise RuntimeError("Unexpected format specifier %s"
336  " for field %s in template for dataset %s" %
337  (ch, k, datasetType))
338 
339 
341  """ImageMapping is a Mapping subclass for non-camera images.
342 
343  Parameters
344  ----------
345  datasetType : `str`
346  Butler dataset type to be mapped.
347  policy : `daf_persistence.Policy`
348  Mapping Policy.
349  registry : `lsst.obs.base.Registry`
350  Registry for metadata lookups
351  root : `str`
352  Path of root directory
353  """
354 
355  def __init__(self, datasetType, policy, registry, root, **kwargs):
356  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
357  self.columns = policy.asArray('columns') if 'columns' in policy else None
358 
359 
361  """ExposureMapping is a Mapping subclass for normal exposures.
362 
363  Parameters
364  ----------
365  datasetType : `str`
366  Butler dataset type to be mapped.
367  policy : `daf_persistence.Policy`
368  Mapping Policy.
369  registry : `lsst.obs.base.Registry`
370  Registry for metadata lookups
371  root : `str`
372  Path of root directory
373  """
374 
375  def __init__(self, datasetType, policy, registry, root, **kwargs):
376  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
377  self.columns = policy.asArray('columns') if 'columns' in policy else None
378 
379  def standardize(self, mapper, item, dataId):
380  return mapper._standardizeExposure(self, item, dataId)
381 
382 
384  """CalibrationMapping is a Mapping subclass for calibration-type products.
385 
386  The difference is that data properties in the query or template
387  can be looked up using a reference Mapping in addition to this one.
388 
389  CalibrationMapping Policies can contain the following:
390 
391  reference (string, optional)
392  a list of tables for finding missing dataset
393  identifier components (including the observation time, if a validity range
394  is required) in the exposure registry; note that the "tables" entry refers
395  to the calibration registry
396 
397  refCols (string, optional)
398  a list of dataset properties required from the
399  reference tables for lookups in the calibration registry
400 
401  validRange (bool)
402  true if the calibration dataset has a validity range
403  specified by a column in the tables of the reference dataset in the
404  exposure registry) and two columns in the tables of this calibration
405  dataset in the calibration registry)
406 
407  obsTimeName (string, optional)
408  the name of the column in the reference
409  dataset tables containing the observation time (default "taiObs")
410 
411  validStartName (string, optional)
412  the name of the column in the
413  calibration dataset tables containing the start of the validity range
414  (default "validStart")
415 
416  validEndName (string, optional)
417  the name of the column in the
418  calibration dataset tables containing the end of the validity range
419  (default "validEnd")
420 
421  Parameters
422  ----------
423  datasetType : `str`
424  Butler dataset type to be mapped.
425  policy : `daf_persistence.Policy`
426  Mapping Policy.
427  registry : `lsst.obs.base.Registry`
428  Registry for metadata lookups
429  calibRegistry : `lsst.obs.base.Registry`
430  Registry for calibration metadata lookups.
431  calibRoot : `str`
432  Path of calibration root directory.
433  dataRoot : `str`
434  Path of data root directory; used for outputs only.
435  """
436 
437  def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, dataRoot=None, **kwargs):
438  Mapping.__init__(self, datasetType, policy, calibRegistry, calibRoot, **kwargs)
439  self.reference = policy.asArray("reference") if "reference" in policy else None
440  self.refCols = policy.asArray("refCols") if "refCols" in policy else None
441  self.refRegistry = registry
442  self.dataRoot = dataRoot
443  if "validRange" in policy and policy["validRange"]:
444  self.range = ("?", policy["validStartName"], policy["validEndName"])
445  if "columns" in policy:
446  self.columns = policy.asArray("columns")
447  if "filter" in policy:
448  self.setFilter = policy["filter"]
449  self.metadataKeys = None
450  if "metadataKey" in policy:
451  self.metadataKeys = policy.asArray("metadataKey")
452 
453  def map(self, mapper, dataId, write=False):
454  location = Mapping.map(self, mapper, dataId, write=write)
455  # Want outputs to be in the output directory
456  if write and self.dataRoot:
457  location.storage = self.dataRoot
458  return location
459 
460  def lookup(self, properties, dataId):
461  """Look up properties for in a metadata registry given a partial
462  dataset identifier.
463 
464  Parameters
465  ----------
466  properties : `list` of `str`
467  Properties to look up.
468  dataId : `dict`
469  Dataset identifier.
470 
471  Returns
472  -------
473  `list` of `tuple`
474  Values of properties.
475  """
476 
477 # Either look up taiObs in reference and then all in calibRegistry
478 # Or look up all in registry
479 
480  newId = dataId.copy()
481  if self.reference is not None:
482  where = []
483  values = []
484  for k, v in dataId.items():
485  if self.refCols and k not in self.refCols:
486  continue
487  where.append(k)
488  values.append(v)
489 
490  # Columns we need from the regular registry
491  if self.columns is not None:
492  columns = set(self.columns)
493  for k in dataId.keys():
494  columns.discard(k)
495  else:
496  columns = set(properties)
497 
498  if not columns:
499  # Nothing to lookup in reference registry; continue with calib registry
500  return Mapping.lookup(self, properties, newId)
501 
502  lookupDataId = dict(zip(where, values))
503  lookups = self.refRegistry.lookup(columns, self.reference, lookupDataId)
504  if len(lookups) != 1:
505  raise RuntimeError("No unique lookup for %s from %s: %d matches" %
506  (columns, dataId, len(lookups)))
507  if columns == set(properties):
508  # Have everything we need
509  return lookups
510  for i, prop in enumerate(columns):
511  newId[prop] = lookups[0][i]
512  return Mapping.lookup(self, properties, newId)
513 
514  def standardize(self, mapper, item, dataId):
515  """Default standardization function for calibration datasets.
516 
517  If the item is of a type that should be standardized, the base class
518  ``standardizeExposure`` method is called, otherwise the item is returned
519  unmodified.
520 
521  Parameters
522  ----------
523  mapping : `lsst.obs.base.Mapping`
524  Mapping object to pass through.
525  item : object
526  Will be standardized if of type lsst.afw.image.Exposure,
527  lsst.afw.image.DecoratedImage, lsst.afw.image.Image
528  or lsst.afw.image.MaskedImage
529 
530  dataId : `dict`
531  Dataset identifier
532 
533  Returns
534  -------
535  `lsst.afw.image.Exposure` or item
536  The standardized object.
537  """
538  if issubclass(doImport(self.python), (Exposure, MaskedImage, Image, DecoratedImage)):
539  return mapper._standardizeExposure(self, item, dataId, filter=self.setFilter)
540  return item
541 
542 
544  """DatasetMapping is a Mapping subclass for non-Exposure datasets that can
545  be retrieved by the standard daf_persistence mechanism.
546 
547  The differences are that the Storage type must be specified and no
548  Exposure standardization is performed.
549 
550  The "storage" entry in the Policy is mandatory; the "tables" entry is
551  optional; no "level" entry is allowed.
552 
553  Parameters
554  ----------
555  datasetType : `str`
556  Butler dataset type to be mapped.
557  policy : `daf_persistence.Policy`
558  Mapping Policy.
559  registry : `lsst.obs.base.Registry`
560  Registry for metadata lookups
561  root : `str`
562  Path of root directory
563  """
564 
565  def __init__(self, datasetType, policy, registry, root, **kwargs):
566  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
567  self.storage = policy["storage"] # Storage type
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:375
def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, dataRoot=None, kwargs)
Definition: mapping.py:437
daf::base::PropertySet * set
Definition: fits.cc:902
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:355
def standardize(self, mapper, item, dataId)
Definition: mapping.py:514
table::Key< int > type
Definition: Detector.cc:163
def map(self, mapper, dataId, write=False)
Definition: mapping.py:453
def have(self, properties, dataId)
Definition: mapping.py:269
def standardize(self, mapper, item, dataId)
Definition: mapping.py:379
def doImport(pythonType)
Definition: utils.py:104
def need(self, properties, dataId)
Definition: mapping.py:290
def lookup(self, properties, dataId)
Definition: mapping.py:460
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:565
def lookup(self, properties, dataId)
Definition: mapping.py:187
def __init__(self, datasetType, policy, registry, rootStorage, provided=None)
Definition: mapping.py:84
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
daf::base::PropertyList * list
Definition: fits.cc:903
def map(self, mapper, dataId, write=False)
Definition: mapping.py:137