LSSTApplications  18.1.0
LSSTDataManagementBasePackage
mapping.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 from collections import OrderedDict
24 import os
25 import re
26 from lsst.daf.base import PropertySet
27 from lsst.daf.persistence import ButlerLocation, NoResults
28 
29 __all__ = ["Mapping", "ImageMapping", "ExposureMapping", "CalibrationMapping", "DatasetMapping"]
30 
31 
32 class Mapping(object):
33 
34  """Mapping is a base class for all mappings. Mappings are used by
35  the Mapper to map (determine a path to some data given some
36  identifiers) and standardize (convert data into some standard
37  format or type) data, and to query the associated registry to see
38  what data is available.
39 
40  Subclasses must specify self.storage or else override self.map().
41 
42  Public methods: lookup, have, need, getKeys, map
43 
44  Mappings are specified mainly by policy. A Mapping policy should
45  consist of:
46 
47  template (string): a Python string providing the filename for that
48  particular dataset type based on some data identifiers. In the
49  case of redundancy in the path (e.g., file uniquely specified by
50  the exposure number, but filter in the path), the
51  redundant/dependent identifiers can be looked up in the registry.
52 
53  python (string): the Python type for the retrieved data (e.g.
54  lsst.afw.image.ExposureF)
55 
56  persistable (string): the Persistable registration for the on-disk data
57  (e.g. ImageU)
58 
59  storage (string, optional): Storage type for this dataset type (e.g.
60  "FitsStorage")
61 
62  level (string, optional): the level in the camera hierarchy at which the
63  data is stored (Amp, Ccd or skyTile), if relevant
64 
65  tables (string, optional): a whitespace-delimited list of tables in the
66  registry that can be NATURAL JOIN-ed to look up additional
67  information.
68 
69  Parameters
70  ----------
71  datasetType : `str`
72  Butler dataset type to be mapped.
73  policy : `daf_persistence.Policy`
74  Mapping Policy.
75  registry : `lsst.obs.base.Registry`
76  Registry for metadata lookups.
77  rootStorage : Storage subclass instance
78  Interface to persisted repository data.
79  provided : `list` of `str`
80  Keys provided by the mapper.
81  """
82 
83  def __init__(self, datasetType, policy, registry, rootStorage, provided=None):
84 
85  if policy is None:
86  raise RuntimeError("No policy provided for mapping")
87 
88  self.datasetType = datasetType
89  self.registry = registry
90  self.rootStorage = rootStorage
91 
92  self._template = policy['template'] # Template path
93  # in most cases, the template can not be used if it is empty, and is accessed via a property that will
94  # raise if it is used while `not self._template`. In this case we *do* allow it to be empty, for the
95  # purpose of fetching the key dict so that the mapping can be constructed, so that it can raise if
96  # it's invalid. I know it's a little odd, but it allows this template check to be introduced without a
97  # major refactor.
98  if self._template:
99  self.keyDict = dict([
100  (k, _formatMap(v, k, datasetType))
101  for k, v in
102  re.findall(r'\%\((\w+)\).*?([diouxXeEfFgGcrs])', self.template)
103  ])
104  else:
105  self.keyDict = {}
106  if provided is not None:
107  for p in provided:
108  if p in self.keyDict:
109  del self.keyDict[p]
110  self.python = policy['python'] # Python type
111  self.persistable = policy['persistable'] # Persistable type
112  self.storage = policy['storage']
113  if 'level' in policy:
114  self.level = policy['level'] # Level in camera hierarchy
115  if 'tables' in policy:
116  self.tables = policy.asArray('tables')
117  else:
118  self.tables = None
119  self.range = None
120  self.columns = None
121  self.obsTimeName = policy['obsTimeName'] if 'obsTimeName' in policy else None
122  self.recipe = policy['recipe'] if 'recipe' in policy else 'default'
123 
124  @property
125  def template(self):
126  if self._template: # template must not be an empty string or None
127  return self._template
128  else:
129  raise RuntimeError("Template is not defined for the {} dataset type, ".format(self.datasetType) +
130  "it must be set before it can be used.")
131 
132  def keys(self):
133  """Return the dict of keys and value types required for this mapping."""
134  return self.keyDict
135 
136  def map(self, mapper, dataId, write=False):
137  """Standard implementation of map function.
138 
139  Parameters
140  ----------
141  mapper: `lsst.daf.persistence.Mapper`
142  Object to be mapped.
143  dataId: `dict`
144  Dataset identifier.
145 
146  Returns
147  -------
148  lsst.daf.persistence.ButlerLocation
149  Location of object that was mapped.
150  """
151  actualId = self.need(iter(self.keyDict.keys()), dataId)
152  usedDataId = {key: actualId[key] for key in self.keyDict.keys()}
153  path = mapper._mapActualToPath(self.template, actualId)
154  if os.path.isabs(path):
155  raise RuntimeError("Mapped path should not be absolute.")
156  if not write:
157  # This allows mapped files to be compressed, ending in .gz or .fz, without any indication from the
158  # policy that the file should be compressed, easily allowing repositories to contain a combination
159  # of comporessed and not-compressed files.
160  # If needed we can add a policy flag to allow compressed files or not, and perhaps a list of
161  # allowed extensions that may exist at the end of the template.
162  for ext in (None, '.gz', '.fz'):
163  if ext and path.endswith(ext):
164  continue # if the path already ends with the extension
165  extPath = path + ext if ext else path
166  newPath = self.rootStorage.instanceSearch(extPath)
167  if newPath:
168  path = newPath
169  break
170  assert path, "Fully-qualified filename is empty."
171 
172  addFunc = "add_" + self.datasetType # Name of method for additionalData
173  if hasattr(mapper, addFunc):
174  addFunc = getattr(mapper, addFunc)
175  additionalData = addFunc(self.datasetType, actualId)
176  assert isinstance(additionalData, PropertySet), \
177  "Bad type for returned data: %s" (type(additionalData),)
178  else:
179  additionalData = None
180 
181  return ButlerLocation(pythonType=self.python, cppType=self.persistable, storageName=self.storage,
182  locationList=path, dataId=actualId.copy(), mapper=mapper,
183  storage=self.rootStorage, usedDataId=usedDataId, datasetType=self.datasetType,
184  additionalData=additionalData)
185 
186  def lookup(self, properties, dataId):
187  """Look up properties for in a metadata registry given a partial
188  dataset identifier.
189 
190  Parameters
191  ----------
192  properties : `list` of `str`
193  What to look up.
194  dataId : `dict`
195  Dataset identifier
196 
197  Returns
198  -------
199  `list` of `tuple`
200  Values of properties.
201  """
202  if self.registry is None:
203  raise RuntimeError("No registry for lookup")
204 
205  skyMapKeys = ("tract", "patch")
206 
207  where = []
208  values = []
209 
210  # Prepare to remove skymap entries from properties list. These must
211  # be in the data ID, so we store which ones we're removing and create
212  # an OrderedDict that tells us where to re-insert them. That maps the
213  # name of the property to either its index in the properties list
214  # *after* the skymap ones have been removed (for entries that aren't
215  # skymap ones) or the value from the data ID (for those that are).
216  removed = set()
217  substitutions = OrderedDict()
218  index = 0
219  properties = list(properties) # don't modify the original list
220  for p in properties:
221  if p in skyMapKeys:
222  try:
223  substitutions[p] = dataId[p]
224  removed.add(p)
225  except KeyError:
226  raise RuntimeError(
227  "Cannot look up skymap key '%s'; it must be explicitly included in the data ID" % p
228  )
229  else:
230  substitutions[p] = index
231  index += 1
232  # Can't actually remove while iterating above, so we do it here.
233  for p in removed:
234  properties.remove(p)
235 
236  fastPath = True
237  for p in properties:
238  if p not in ('filter', 'expTime', 'taiObs'):
239  fastPath = False
240  break
241  if fastPath and 'visit' in dataId and "raw" in self.tables:
242  lookupDataId = {'visit': dataId['visit']}
243  result = self.registry.lookup(properties, 'raw_visit', lookupDataId, template=self.template)
244  else:
245  if dataId is not None:
246  for k, v in dataId.items():
247  if self.columns and k not in self.columns:
248  continue
249  if k == self.obsTimeName:
250  continue
251  if k in skyMapKeys:
252  continue
253  where.append((k, '?'))
254  values.append(v)
255  lookupDataId = {k[0]: v for k, v in zip(where, values)}
256  if self.range:
257  # format of self.range is ('?', isBetween-lowKey, isBetween-highKey)
258  # here we transform that to {(lowKey, highKey): value}
259  lookupDataId[(self.range[1], self.range[2])] = dataId[self.obsTimeName]
260  result = self.registry.lookup(properties, self.tables, lookupDataId, template=self.template)
261  if not removed:
262  return result
263  # Iterate over the query results, re-inserting the skymap entries.
264  result = [tuple(v if k in removed else item[v] for k, v in substitutions.items())
265  for item in result]
266  return result
267 
268  def have(self, properties, dataId):
269  """Returns whether the provided data identifier has all
270  the properties in the provided list.
271 
272  Parameters
273  ----------
274  properties : `list of `str`
275  Properties required.
276  dataId : `dict`
277  Dataset identifier.
278 
279  Returns
280  -------
281  bool
282  True if all properties are present.
283  """
284  for prop in properties:
285  if prop not in dataId:
286  return False
287  return True
288 
289  def need(self, properties, dataId):
290  """Ensures all properties in the provided list are present in
291  the data identifier, looking them up as needed. This is only
292  possible for the case where the data identifies a single
293  exposure.
294 
295  Parameters
296  ----------
297  properties : `list` of `str`
298  Properties required.
299  dataId : `dict`
300  Partial dataset identifier
301 
302  Returns
303  -------
304  `dict`
305  Copy of dataset identifier with enhanced values.
306  """
307  newId = dataId.copy()
308  newProps = [] # Properties we don't already have
309  for prop in properties:
310  if prop not in newId:
311  newProps.append(prop)
312  if len(newProps) == 0:
313  return newId
314 
315  lookups = self.lookup(newProps, newId)
316  if len(lookups) != 1:
317  raise NoResults("No unique lookup for %s from %s: %d matches" %
318  (newProps, newId, len(lookups)),
319  self.datasetType, dataId)
320  for i, prop in enumerate(newProps):
321  newId[prop] = lookups[0][i]
322  return newId
323 
324 
325 def _formatMap(ch, k, datasetType):
326  """Convert a format character into a Python type."""
327  if ch in "diouxX":
328  return int
329  elif ch in "eEfFgG":
330  return float
331  elif ch in "crs":
332  return str
333  else:
334  raise RuntimeError("Unexpected format specifier %s"
335  " for field %s in template for dataset %s" %
336  (ch, k, datasetType))
337 
338 
340  """ImageMapping is a Mapping subclass for non-camera images.
341 
342  Parameters
343  ----------
344  datasetType : `str`
345  Butler dataset type to be mapped.
346  policy : `daf_persistence.Policy`
347  Mapping Policy.
348  registry : `lsst.obs.base.Registry`
349  Registry for metadata lookups
350  root : `str`
351  Path of root directory
352  """
353 
354  def __init__(self, datasetType, policy, registry, root, **kwargs):
355  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
356  self.columns = policy.asArray('columns') if 'columns' in policy else None
357 
358 
360  """ExposureMapping is a Mapping subclass for normal exposures.
361 
362  Parameters
363  ----------
364  datasetType : `str`
365  Butler dataset type to be mapped.
366  policy : `daf_persistence.Policy`
367  Mapping Policy.
368  registry : `lsst.obs.base.Registry`
369  Registry for metadata lookups
370  root : `str`
371  Path of root directory
372  """
373 
374  def __init__(self, datasetType, policy, registry, root, **kwargs):
375  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
376  self.columns = policy.asArray('columns') if 'columns' in policy else None
377 
378  def standardize(self, mapper, item, dataId):
379  return mapper._standardizeExposure(self, item, dataId)
380 
381 
383  """CalibrationMapping is a Mapping subclass for calibration-type products.
384 
385  The difference is that data properties in the query or template
386  can be looked up using a reference Mapping in addition to this one.
387 
388  CalibrationMapping Policies can contain the following:
389 
390  reference (string, optional)
391  a list of tables for finding missing dataset
392  identifier components (including the observation time, if a validity range
393  is required) in the exposure registry; note that the "tables" entry refers
394  to the calibration registry
395 
396  refCols (string, optional)
397  a list of dataset properties required from the
398  reference tables for lookups in the calibration registry
399 
400  validRange (bool)
401  true if the calibration dataset has a validity range
402  specified by a column in the tables of the reference dataset in the
403  exposure registry) and two columns in the tables of this calibration
404  dataset in the calibration registry)
405 
406  obsTimeName (string, optional)
407  the name of the column in the reference
408  dataset tables containing the observation time (default "taiObs")
409 
410  validStartName (string, optional)
411  the name of the column in the
412  calibration dataset tables containing the start of the validity range
413  (default "validStart")
414 
415  validEndName (string, optional)
416  the name of the column in the
417  calibration dataset tables containing the end of the validity range
418  (default "validEnd")
419 
420  Parameters
421  ----------
422  datasetType : `str`
423  Butler dataset type to be mapped.
424  policy : `daf_persistence.Policy`
425  Mapping Policy.
426  registry : `lsst.obs.base.Registry`
427  Registry for metadata lookups
428  calibRegistry : `lsst.obs.base.Registry`
429  Registry for calibration metadata lookups.
430  calibRoot : `str`
431  Path of calibration root directory.
432  dataRoot : `str`
433  Path of data root directory; used for outputs only.
434  """
435 
436  def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, dataRoot=None, **kwargs):
437  Mapping.__init__(self, datasetType, policy, calibRegistry, calibRoot, **kwargs)
438  self.reference = policy.asArray("reference") if "reference" in policy else None
439  self.refCols = policy.asArray("refCols") if "refCols" in policy else None
440  self.refRegistry = registry
441  self.dataRoot = dataRoot
442  if "validRange" in policy and policy["validRange"]:
443  self.range = ("?", policy["validStartName"], policy["validEndName"])
444  if "columns" in policy:
445  self.columns = policy.asArray("columns")
446  if "filter" in policy:
447  self.setFilter = policy["filter"]
448  self.metadataKeys = None
449  if "metadataKey" in policy:
450  self.metadataKeys = policy.asArray("metadataKey")
451 
452  def map(self, mapper, dataId, write=False):
453  location = Mapping.map(self, mapper, dataId, write=write)
454  # Want outputs to be in the output directory
455  if write and self.dataRoot:
456  location.storage = self.dataRoot
457  return location
458 
459  def lookup(self, properties, dataId):
460  """Look up properties for in a metadata registry given a partial
461  dataset identifier.
462 
463  Parameters
464  ----------
465  properties : `list` of `str`
466  Properties to look up.
467  dataId : `dict`
468  Dataset identifier.
469 
470  Returns
471  -------
472  `list` of `tuple`
473  Values of properties.
474  """
475 
476 # Either look up taiObs in reference and then all in calibRegistry
477 # Or look up all in registry
478 
479  newId = dataId.copy()
480  if self.reference is not None:
481  where = []
482  values = []
483  for k, v in dataId.items():
484  if self.refCols and k not in self.refCols:
485  continue
486  where.append(k)
487  values.append(v)
488 
489  # Columns we need from the regular registry
490  if self.columns is not None:
491  columns = set(self.columns)
492  for k in dataId.keys():
493  columns.discard(k)
494  else:
495  columns = set(properties)
496 
497  if not columns:
498  # Nothing to lookup in reference registry; continue with calib registry
499  return Mapping.lookup(self, properties, newId)
500 
501  lookupDataId = dict(zip(where, values))
502  lookups = self.refRegistry.lookup(columns, self.reference, lookupDataId)
503  if len(lookups) != 1:
504  raise RuntimeError("No unique lookup for %s from %s: %d matches" %
505  (columns, dataId, len(lookups)))
506  if columns == set(properties):
507  # Have everything we need
508  return lookups
509  for i, prop in enumerate(columns):
510  newId[prop] = lookups[0][i]
511  return Mapping.lookup(self, properties, newId)
512 
513  def standardize(self, mapper, item, dataId):
514  return mapper._standardizeExposure(self, item, dataId, filter=self.setFilter)
515 
516 
518  """DatasetMapping is a Mapping subclass for non-Exposure datasets that can
519  be retrieved by the standard daf_persistence mechanism.
520 
521  The differences are that the Storage type must be specified and no
522  Exposure standardization is performed.
523 
524  The "storage" entry in the Policy is mandatory; the "tables" entry is
525  optional; no "level" entry is allowed.
526 
527  Parameters
528  ----------
529  datasetType : `str`
530  Butler dataset type to be mapped.
531  policy : `daf_persistence.Policy`
532  Mapping Policy.
533  registry : `lsst.obs.base.Registry`
534  Registry for metadata lookups
535  root : `str`
536  Path of root directory
537  """
538 
539  def __init__(self, datasetType, policy, registry, root, **kwargs):
540  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
541  self.storage = policy["storage"] # Storage type
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:374
def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, dataRoot=None, kwargs)
Definition: mapping.py:436
daf::base::PropertySet * set
Definition: fits.cc:884
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:354
def standardize(self, mapper, item, dataId)
Definition: mapping.py:513
table::Key< int > type
Definition: Detector.cc:167
def map(self, mapper, dataId, write=False)
Definition: mapping.py:452
def have(self, properties, dataId)
Definition: mapping.py:268
def standardize(self, mapper, item, dataId)
Definition: mapping.py:378
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def need(self, properties, dataId)
Definition: mapping.py:289
def lookup(self, properties, dataId)
Definition: mapping.py:459
def __init__(self, datasetType, policy, registry, root, kwargs)
Definition: mapping.py:539
def lookup(self, properties, dataId)
Definition: mapping.py:186
def __init__(self, datasetType, policy, registry, rootStorage, provided=None)
Definition: mapping.py:83
daf::base::PropertyList * list
Definition: fits.cc:885
def map(self, mapper, dataId, write=False)
Definition: mapping.py:136