LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
mapping.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 from builtins import zip
24 from builtins import object
25 from collections import OrderedDict
26 import os
27 import re
28 from lsst.daf.persistence import ButlerLocation
29 from lsst.daf.persistence.policy import Policy
30 import lsst.pex.policy as pexPolicy
31 
32 """This module defines the Mapping base class."""
33 
34 
35 class Mapping(object):
36 
37  """Mapping is a base class for all mappings. Mappings are used by
38  the Mapper to map (determine a path to some data given some
39  identifiers) and standardize (convert data into some standard
40  format or type) data, and to query the associated registry to see
41  what data is available.
42 
43  Subclasses must specify self.storage or else override self.map().
44 
45  Public methods: lookup, have, need, getKeys, map
46 
47  Mappings are specified mainly by policy. A Mapping policy should
48  consist of:
49 
50  template (string): a Python string providing the filename for that
51  particular dataset type based on some data identifiers. In the
52  case of redundancy in the path (e.g., file uniquely specified by
53  the exposure number, but filter in the path), the
54  redundant/dependent identifiers can be looked up in the registry.
55 
56  python (string): the Python type for the retrieved data (e.g.
57  lsst.afw.image.ExposureF)
58 
59  persistable (string): the Persistable registration for the on-disk data
60  (e.g. ImageU)
61 
62  storage (string, optional): Storage type for this dataset type (e.g.
63  "BoostStorage")
64 
65  level (string, optional): the level in the camera hierarchy at which the
66  data is stored (Amp, Ccd or skyTile), if relevant
67 
68  tables (string, optional): a whitespace-delimited list of tables in the
69  registry that can be NATURAL JOIN-ed to look up additional
70  information. """
71 
72  def __init__(self, datasetType, policy, registry, root, provided=None):
73  """Constructor for Mapping class.
74  @param datasetType (string)
75  @param policy (daf_persistence.Policy, or pexPolicy.Policy (only for backward compatibility))
76  Mapping Policy
77  @param registry (lsst.obs.base.Registry) Registry for metadata lookups
78  @param root (string) Path of root directory
79  @param provided (list of strings) Keys provided by the mapper
80  """
81 
82  if policy is None:
83  raise RuntimeError("No policy provided for mapping")
84 
85  if isinstance(policy, pexPolicy.Policy):
86  policy = Policy(policy)
87 
88  self.datasetType = datasetType
89  self.registry = registry
90  self.root = root
91 
92  self.template = policy['template'] # Template path
93  self.keyDict = dict([
94  (k, _formatMap(v, k, datasetType))
95  for k, v in
96  re.findall(r'\%\((\w+)\).*?([diouxXeEfFgGcrs])', self.template)
97  ])
98  if provided is not None:
99  for p in provided:
100  if p in self.keyDict:
101  del self.keyDict[p]
102  self.python = policy['python'] # Python type
103  self.persistable = policy['persistable'] # Persistable type
104  self.storage = policy['storage']
105  if 'level' in policy:
106  self.level = policy['level'] # Level in camera hierarchy
107  if 'tables' in policy:
108  self.tables = policy.asArray('tables')
109  else:
110  self.tables = None
111  self.range = None
112  self.columns = None
113  self.obsTimeName = policy['obsTimeName'] if 'obsTimeName' in policy else None
114 
115  def keys(self):
116  """Return the dict of keys and value types required for this mapping."""
117  return self.keyDict
118 
119  def map(self, mapper, dataId, write=False):
120  """Standard implementation of map function.
121  @param mapper (lsst.daf.persistence.Mapper)
122  @param dataId (dict) Dataset identifier
123  @return (lsst.daf.persistence.ButlerLocation)"""
124  actualId = self.need(iter(self.keyDict.keys()), dataId)
125  usedDataId = {key: actualId[key] for key in self.keyDict.keys()}
126  path = mapper._mapActualToPath(self.template, actualId)
127  if not os.path.isabs(path):
128  path = os.path.join(self.root, path)
129  if not write:
130  # This allows mapped files to be compressed, ending in .gz or .fz, without any indication from the
131  # policy that the file should be compressed, easily allowing repositories to contain a combination
132  # of comporessed and not-compressed files.
133  # If needed we can add a policy flag to allow compressed files or not, and perhaps a list of
134  # allowed extensions that may exist at the end of the template.
135  for ext in (None, '.gz', '.fz'):
136  if ext and path.endswith(ext):
137  continue # if the path already ends with the extension
138  extPath = path + ext if ext else path
139  newPath = mapper._parentSearch(extPath)
140  if newPath:
141  path = newPath
142  break
143  assert path, "Fully-qualified filename is empty."
144 
145  addFunc = "add_" + self.datasetType # Name of method for additionalData
146  if hasattr(mapper, addFunc):
147  addFunc = getattr(mapper, addFunc)
148  additionalData = addFunc(actualId)
149  assert isinstance(additionalData, dict), "Bad type for returned data"
150  else:
151  additionalData = actualId.copy()
152 
153  return ButlerLocation(self.python, self.persistable, self.storage, path, additionalData, mapper,
154  usedDataId=usedDataId, datasetType=self.datasetType)
155 
156  def lookup(self, properties, dataId):
157  """Look up properties for in a metadata registry given a partial
158  dataset identifier.
159  @param properties (list of strings)
160  @param dataId (dict) Dataset identifier
161  @return (list of tuples) values of properties"""
162 
163  if self.registry is None:
164  raise RuntimeError("No registry for lookup")
165 
166  skyMapKeys = ("tract", "patch")
167 
168  where = []
169  values = []
170 
171  # Prepare to remove skymap entries from properties list. These must
172  # be in the data ID, so we store which ones we're removing and create
173  # an OrderedDict that tells us where to re-insert them. That maps the
174  # name of the property to either its index in the properties list
175  # *after* the skymap ones have been removed (for entries that aren't
176  # skymap ones) or the value from the data ID (for those that are).
177  removed = set()
178  substitutions = OrderedDict()
179  index = 0
180  properties = list(properties) # don't modify the original list
181  for p in properties:
182  if p in skyMapKeys:
183  try:
184  substitutions[p] = dataId[p]
185  removed.add(p)
186  except KeyError:
187  raise RuntimeError(
188  "Cannot look up skymap key '%s'; it must be explicitly included in the data ID" % p
189  )
190  else:
191  substitutions[p] = index
192  index += 1
193  # Can't actually remove while iterating above, so we do it here.
194  for p in removed:
195  properties.remove(p)
196 
197  fastPath = True
198  for p in properties:
199  if p not in ('filter', 'expTime', 'taiObs'):
200  fastPath = False
201  break
202  if fastPath and 'visit' in dataId and "raw" in self.tables:
203  lookupDataId = {'visit': dataId['visit']}
204  result = self.registry.lookup(properties, 'raw_visit', lookupDataId, template=self.template)
205  else:
206  if dataId is not None:
207  for k, v in dataId.items():
208  if self.columns and k not in self.columns:
209  continue
210  if k == self.obsTimeName:
211  continue
212  if k in skyMapKeys:
213  continue
214  where.append((k, '?'))
215  values.append(v)
216  lookupDataId = {k[0]: v for k, v in zip(where, values)}
217  if self.range:
218  # format of self.range is ('?', isBetween-lowKey, isBetween-highKey)
219  # here we transform that to {(lowKey, highKey): value}
220  lookupDataId[(self.range[1], self.range[2])] = dataId[self.obsTimeName]
221  result = self.registry.lookup(properties, self.tables, lookupDataId, template=self.template)
222  if not removed:
223  return result
224  # Iterate over the query results, re-inserting the skymap entries.
225  result = [tuple(v if k in removed else item[v] for k, v in substitutions.items())
226  for item in result]
227  return result
228 
229  def have(self, properties, dataId):
230  """Returns whether the provided data identifier has all
231  the properties in the provided list.
232  @param properties (list of strings) Properties required
233  @parm dataId (dict) Dataset identifier
234  @return (bool) True if all properties are present"""
235  for prop in properties:
236  if prop not in dataId:
237  return False
238  return True
239 
240  def need(self, properties, dataId):
241  """Ensures all properties in the provided list are present in
242  the data identifier, looking them up as needed. This is only
243  possible for the case where the data identifies a single
244  exposure.
245  @param properties (list of strings) Properties required
246  @param dataId (dict) Partial dataset identifier
247  @return (dict) copy of dataset identifier with enhanced values
248  """
249  newId = dataId.copy()
250  newProps = [] # Properties we don't already have
251  for prop in properties:
252  if prop not in newId:
253  newProps.append(prop)
254  if len(newProps) == 0:
255  return newId
256 
257  lookups = self.lookup(newProps, newId)
258  if len(lookups) != 1:
259  raise RuntimeError("No unique lookup for %s from %s: %d matches" %
260  (newProps, newId, len(lookups)))
261  for i, prop in enumerate(newProps):
262  newId[prop] = lookups[0][i]
263  return newId
264 
265 
266 def _formatMap(ch, k, datasetType):
267  """Convert a format character into a Python type."""
268  if ch in "diouxX":
269  return int
270  elif ch in "eEfFgG":
271  return float
272  elif ch in "crs":
273  return str
274  else:
275  raise RuntimeError("Unexpected format specifier %s"
276  " for field %s in template for dataset %s" %
277  (ch, k, datasetType))
278 
279 
281  """ImageMapping is a Mapping subclass for non-camera images."""
282 
283  def __init__(self, datasetType, policy, registry, root, **kwargs):
284  """Constructor for Mapping class.
285  @param datasetType (string)
286  @param policy (daf_persistence.Policy, or pexPolicy.Policy (only for backward compatibility))
287  Mapping Policy
288  @param registry (lsst.obs.base.Registry) Registry for metadata lookups
289  @param root (string) Path of root directory"""
290  if isinstance(policy, pexPolicy.Policy):
291  policy = Policy(policy)
292  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
293  self.columns = policy.asArray('columns') if 'columns' in policy else None
294 
295 
297  """ExposureMapping is a Mapping subclass for normal exposures."""
298 
299  def __init__(self, datasetType, policy, registry, root, **kwargs):
300  """Constructor for Mapping class.
301  @param datasetType (string)
302  @param policy (daf_persistence.Policy, or pexPolicy.Policy (only for backward compatibility))
303  Mapping Policy
304  @param registry (lsst.obs.base.Registry) Registry for metadata lookups
305  @param root (string) Path of root directory"""
306  if isinstance(policy, pexPolicy.Policy):
307  policy = Policy(policy)
308  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
309  self.columns = policy.asArray('columns') if 'columns' in policy else None
310 
311  def standardize(self, mapper, item, dataId):
312  return mapper._standardizeExposure(self, item, dataId)
313 
314 
316  """CalibrationMapping is a Mapping subclass for calibration-type products.
317 
318  The difference is that data properties in the query or template
319  can be looked up using a reference Mapping in addition to this one.
320 
321  CalibrationMapping Policies can contain the following:
322 
323  reference (string, optional): a list of tables for finding missing dataset
324  identifier components (including the observation time, if a validity range
325  is required) in the exposure registry; note that the "tables" entry refers
326  to the calibration registry
327 
328  refCols (string, optional): a list of dataset properties required from the
329  reference tables for lookups in the calibration registry
330 
331  validRange (bool): true if the calibration dataset has a validity range
332  specified by a column in the tables of the reference dataset in the
333  exposure registry) and two columns in the tables of this calibration
334  dataset in the calibration registry)
335 
336  obsTimeName (string, optional): the name of the column in the reference
337  dataset tables containing the observation time (default "taiObs")
338 
339  validStartName (string, optional): the name of the column in the
340  calibration dataset tables containing the start of the validity range
341  (default "validStart")
342 
343  validEndName (string, optional): the name of the column in the
344  calibration dataset tables containing the end of the validity range
345  (default "validEnd") """
346 
347  def __init__(self, datasetType, policy, registry, calibRegistry, calibRoot, **kwargs):
348  """Constructor for Mapping class.
349  @param datasetType (string)
350  @param policy (daf_persistence.Policy, or pexPolicy.Policy (only for backward compatibility))
351  Mapping Policy
352  @param registry (lsst.obs.base.Registry) Registry for metadata lookups
353  @param calibRegistry (lsst.obs.base.Registry) Registry for calibration metadata lookups
354  @param calibRoot (string) Path of calibration root directory"""
355  if isinstance(policy, pexPolicy.Policy):
356  policy = Policy(policy)
357  Mapping.__init__(self, datasetType, policy, calibRegistry, calibRoot, **kwargs)
358  self.reference = policy.asArray("reference") if "reference" in policy else None
359  self.refCols = policy.asArray("refCols") if "refCols" in policy else None
360  self.refRegistry = registry
361  if "validRange" in policy and policy["validRange"]:
362  self.range = ("?", policy["validStartName"], policy["validEndName"])
363  if "columns" in policy:
364  self.columns = policy.asArray("columns")
365  if "filter" in policy:
366  self.setFilter = policy["filter"]
367  self.metadataKeys = None
368  if "metadataKey" in policy:
369  self.metadataKeys = policy.asArray("metadataKey")
370 
371  def lookup(self, properties, dataId):
372  """Look up properties for in a metadata registry given a partial
373  dataset identifier.
374  @param properties (list of strings)
375  @param dataId (dict) Dataset identifier
376  @return (list of tuples) values of properties"""
377 
378 # Either look up taiObs in reference and then all in calibRegistry
379 # Or look up all in registry
380 
381  newId = dataId.copy()
382  if self.reference is not None:
383  where = []
384  values = []
385  for k, v in dataId.items():
386  if self.refCols and k not in self.refCols:
387  continue
388  where.append(k)
389  values.append(v)
390 
391  # Columns we need from the regular registry
392  if self.columns is not None:
393  columns = set(self.columns)
394  for k in dataId.keys():
395  columns.discard(k)
396  else:
397  columns = set(properties)
398 
399  if not columns:
400  # Nothing to lookup in reference registry; continue with calib registry
401  return Mapping.lookup(self, properties, newId)
402 
403  lookupDataId = dict(zip(where, values))
404  lookups = self.refRegistry.lookup(columns, self.reference, lookupDataId)
405  if len(lookups) != 1:
406  raise RuntimeError("No unique lookup for %s from %s: %d matches" %
407  (columns, dataId, len(lookups)))
408  if columns == set(properties):
409  # Have everything we need
410  return lookups
411  for i, prop in enumerate(columns):
412  newId[prop] = lookups[0][i]
413  return Mapping.lookup(self, properties, newId)
414 
415  def standardize(self, mapper, item, dataId):
416  return mapper._standardizeExposure(self, item, dataId, filter=self.setFilter)
417 
418 
420  """DatasetMapping is a Mapping subclass for non-Exposure datasets that can
421  be retrieved by the standard daf_persistence mechanism.
422 
423  The differences are that the Storage type must be specified and no
424  Exposure standardization is performed.
425 
426  The "storage" entry in the Policy is mandatory; the "tables" entry is
427  optional; no "level" entry is allowed. """
428 
429  def __init__(self, datasetType, policy, registry, root, **kwargs):
430  """Constructor for DatasetMapping class.
431  @param[in,out] mapper (lsst.daf.persistence.Mapper) Mapper object
432  @param policy (daf_persistence.Policy, or pexPolicy.Policy (only for backward compatibility))
433  Mapping Policy
434  @param datasetType (string)
435  @param registry (lsst.obs.base.Registry) Registry for metadata lookups
436  @param root (string) Path of root directory"""
437  if isinstance(policy, pexPolicy.Policy):
438  policy = Policy(policy)
439  Mapping.__init__(self, datasetType, policy, registry, root, **kwargs)
440  self.storage = policy["storage"] # Storage type
int iter
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169