LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 
29 from __future__ import with_statement
30 import cPickle
31 import importlib
32 import os
33 import lsst.pex.logging as pexLog
34 import lsst.pex.policy as pexPolicy
35 from lsst.daf.persistence import StorageList, LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, \
36  Persistence
37 
38 class Butler(object):
39  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
40 
41  A Butler manages a collection of datasets known as a repository. Each
42  dataset has a type representing its intended usage and a location. Note
43  that the dataset type is not the same as the C++ or Python type of the
44  object containing the data. For example, an ExposureF object might be
45  used to hold the data for a raw image, a post-ISR image, a calibrated
46  science image, or a difference image. These would all be different
47  dataset types.
48 
49  A Butler can produce a collection of possible values for a key (or tuples
50  of values for multiple keys) if given a partial data identifier. It can
51  check for the existence of a file containing a dataset given its type and
52  data identifier. The Butler can then retrieve the dataset. Similarly, it
53  can persist an object to an appropriate location when given its associated
54  data identifier.
55 
56  Note that the Butler has two more advanced features when retrieving a data
57  set. First, the retrieval is lazy. Input does not occur until the data
58  set is actually accessed. This allows datasets to be retrieved and
59  placed on a clipboard prospectively with little cost, even if the
60  algorithm of a stage ends up not using them. Second, the Butler will call
61  a standardization hook upon retrieval of the dataset. This function,
62  contained in the input mapper object, must perform any necessary
63  manipulations to force the retrieved object to conform to standards,
64  including translating metadata.
65 
66  Public methods:
67 
68  __init__(self, root, mapper=None, **mapperArgs)
69 
70  getKeys(self, datasetType=None, level=None)
71 
72  queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
73 
74  datasetExists(self, datasetType, dataId={}, **rest)
75 
76  get(self, datasetType, dataId={}, immediate=False, **rest)
77 
78  put(self, obj, datasetType, dataId={}, **rest)
79 
80  subset(self, datasetType, level=None, dataId={}, **rest))
81 
82  """
83 
84  @staticmethod
85  def getMapperClass(root):
86  """Return the mapper class associated with a repository root."""
87 
88  # Find a "_mapper" file containing the mapper class name
89  basePath = root
90  mapperFile = "_mapper"
91  globals = {}
92  while not os.path.exists(os.path.join(basePath, mapperFile)):
93  # Break abstraction by following _parent links from CameraMapper
94  if os.path.exists(os.path.join(basePath, "_parent")):
95  basePath = os.path.join(basePath, "_parent")
96  else:
97  raise RuntimeError(
98  "No mapper provided and no %s available" %
99  (mapperFile,))
100  mapperFile = os.path.join(basePath, mapperFile)
101 
102  # Read the name of the mapper class and instantiate it
103  with open(mapperFile, "r") as f:
104  mapperName = f.readline().strip()
105  components = mapperName.split(".")
106  if len(components) <= 1:
107  raise RuntimeError("Unqualified mapper name %s in %s" %
108  (mapperName, mapperFile))
109  pkg = importlib.import_module(".".join(components[:-1]))
110  return getattr(pkg, components[-1])
111 
112  def __init__(self, root, mapper=None, **mapperArgs):
113  """Construct the Butler. If no mapper class is provided, then a file
114  named "_mapper" is expected to be found in the repository, which
115  must be a filesystem path. The first line in that file is read and
116  must contain the fully-qualified name of a Mapper subclass, which is
117  then imported and instantiated using the root and the mapperArgs.
118 
119  @param root (str) the repository to be managed (at least
120  initially). May be None if a mapper is
121  provided.
122  @param mapper (Mapper) if present, the Mapper subclass instance
123  to be used as the butler's mapper.
124  @param **mapperArgs arguments to be passed to the mapper's
125  __init__ method, in addition to the root.
126  """
127 
129 
130  if mapper is not None:
131  self.mapper = mapper
132  else:
133  cls = Butler.getMapperClass(root)
134  self.mapper = cls(root=root, **mapperArgs)
135 
136  # Always use an empty Persistence policy until we can get rid of it
137  persistencePolicy = pexPolicy.Policy()
138  self.persistence = Persistence.getPersistence(persistencePolicy)
139  self.log = pexLog.Log(pexLog.Log.getDefaultLog(),
140  "daf.persistence.butler")
141 
142  def defineAlias(self, alias, datasetType):
143  """Register an alias that will be substituted in datasetTypes.
144 
145  @param alias (str) the alias keyword. it may start with @ or not. It may not contain @ except as the
146  first character.
147  @param datasetType (str) the string that will be substituted when @alias is passed into datasetType. It may
148  not contain '@'
149  """
150 
151  #verify formatting of alias:
152  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
153  atLoc = alias.rfind('@')
154  if atLoc is -1:
155  alias = "@" + str(alias)
156  elif atLoc > 0:
157  raise RuntimeError("Badly formatted alias string: %s" %(alias,))
158 
159  # verify that datasetType does not contain '@'
160  if datasetType.count('@') != 0:
161  raise RuntimeError("Badly formatted type string: %s" %(datasetType))
162 
163  # verify that the alias keyword does not start with another alias keyword,
164  # and vice versa
165  for key in self.datasetTypeAliasDict:
166  if key.startswith(alias) or alias.startswith(key):
167  raise RuntimeError("Alias: %s overlaps with existing alias: %s" %(alias, key))
168 
169  self.datasetTypeAliasDict[alias] = datasetType
170 
171  def getKeys(self, datasetType=None, level=None):
172  """Returns a dict. The dict keys are the valid data id keys at or
173  above the given level of hierarchy for the dataset type or the entire
174  collection if None. The dict values are the basic Python types
175  corresponding to the keys (int, float, str).
176 
177  @param datasetType (str) the type of dataset to get keys for, entire
178  collection if None.
179  @param level (str) the hierarchy level to descend to or None.
180  @returns (dict) valid data id keys; values are corresponding types.
181  """
182 
183  datasetType = self._resolveDatasetTypeAlias(datasetType)
184  return self.mapper.getKeys(datasetType, level)
185 
186  def queryMetadata(self, datasetType, key, format=None, dataId={}, **rest):
187  """Returns the valid values for one or more keys when given a partial
188  input collection data id.
189 
190  @param datasetType (str) the type of dataset to inquire about.
191  @param key (str) a key giving the level of granularity of the inquiry.
192  @param format (str, tuple) an optional key or tuple of keys to be returned.
193  @param dataId (dict) the partial data id.
194  @param **rest keyword arguments for the partial data id.
195  @returns (list) a list of valid values or tuples of valid values as
196  specified by the format (defaulting to the same as the key) at the
197  key's level of granularity.
198  """
199 
200  datasetType = self._resolveDatasetTypeAlias(datasetType)
201  dataId = self._combineDicts(dataId, **rest)
202  if format is None:
203  format = (key,)
204  elif not hasattr(format, '__iter__'):
205  format = (format,)
206  tuples = self.mapper.queryMetadata(datasetType, key, format, dataId)
207  if len(format) == 1:
208  return [x[0] for x in tuples]
209  return tuples
210 
211  def datasetExists(self, datasetType, dataId={}, **rest):
212  """Determines if a dataset file exists.
213 
214  @param datasetType (str) the type of dataset to inquire about.
215  @param dataId (dict) the data id of the dataset.
216  @param **rest keyword arguments for the data id.
217  @returns (bool) True if the dataset exists or is non-file-based.
218  """
219 
220  datasetType = self._resolveDatasetTypeAlias(datasetType)
221  dataId = self._combineDicts(dataId, **rest)
222  location = self.mapper.map(datasetType, dataId)
223  additionalData = location.getAdditionalData()
224  storageName = location.getStorageName()
225  if storageName in ('BoostStorage', 'FitsStorage', 'PafStorage',
226  'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage'):
227  locations = location.getLocations()
228  for locationString in locations:
229  logLoc = LogicalLocation(locationString, additionalData).locString()
230  if storageName == 'FitsStorage':
231  # Strip off directives for cfitsio (in square brackets, e.g., extension name)
232  bracket = logLoc.find('[')
233  if bracket > 0:
234  logLoc = logLoc[:bracket]
235  if not os.path.exists(logLoc):
236  return False
237  return True
238  self.log.log(pexLog.Log.WARN,
239  "datasetExists() for non-file storage %s, dataset type=%s, keys=%s" %
240  (storageName, datasetType, str(dataId)))
241  return True
242 
243  def get(self, datasetType, dataId={}, immediate=False, **rest):
244  """Retrieves a dataset given an input collection data id.
245 
246  @param datasetType (str) the type of dataset to retrieve.
247  @param dataId (dict) the data id.
248  @param immediate (bool) don't use a proxy for delayed loading.
249  @param **rest keyword arguments for the data id.
250  @returns an object retrieved from the dataset (or a proxy for one).
251  """
252 
253  datasetType = self._resolveDatasetTypeAlias(datasetType)
254  dataId = self._combineDicts(dataId, **rest)
255  location = self.mapper.map(datasetType, dataId)
256  self.log.log(pexLog.Log.DEBUG, "Get type=%s keys=%s from %s" %
257  (datasetType, dataId, str(location)))
258 
259  if location.getPythonType() is not None:
260  # import this pythonType dynamically
261  pythonTypeTokenList = location.getPythonType().split('.')
262  importClassString = pythonTypeTokenList.pop()
263  importClassString = importClassString.strip()
264  importPackage = ".".join(pythonTypeTokenList)
265  importType = __import__(importPackage, globals(), locals(), \
266  [importClassString], -1)
267  pythonType = getattr(importType, importClassString)
268  else:
269  pythonType = None
270  if hasattr(self.mapper, "bypass_" + datasetType):
271  bypassFunc = getattr(self.mapper, "bypass_" + datasetType)
272  callback = lambda: bypassFunc(datasetType, pythonType,
273  location, dataId)
274  else:
275  callback = lambda: self._read(pythonType, location)
276  if self.mapper.canStandardize(datasetType):
277  innerCallback = callback
278  callback = lambda: self.mapper.standardize(datasetType,
279  innerCallback(), dataId)
280  if immediate:
281  return callback()
282  return ReadProxy(callback)
283 
284  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
285  """Persists a dataset given an output collection data id.
286 
287  @param obj the object to persist.
288  @param datasetType (str) the type of dataset to persist.
289  @param dataId (dict) the data id.
290  @param doBackup if True, rename existing instead of overwriting
291  @param **rest keyword arguments for the data id.
292 
293  WARNING: Setting doBackup=True is not safe for parallel processing, as it
294  may be subject to race conditions.
295  """
296 
297  datasetType = self._resolveDatasetTypeAlias(datasetType)
298  if doBackup:
299  self.mapper.backup(datasetType, dataId)
300  dataId = self._combineDicts(dataId, **rest)
301  location = self.mapper.map(datasetType, dataId, write=True)
302  self.log.log(pexLog.Log.DEBUG, "Put type=%s keys=%s to %s" %
303  (datasetType, dataId, str(location)))
304  additionalData = location.getAdditionalData()
305  storageName = location.getStorageName()
306  locations = location.getLocations()
307  # TODO support multiple output locations
308  locationString = locations[0]
309  logLoc = LogicalLocation(locationString, additionalData)
310  trace = pexLog.BlockTimingLog(self.log, "put",
311  pexLog.BlockTimingLog.INSTRUM+1)
312  trace.setUsageFlags(trace.ALLUDATA)
313 
314  if storageName == "PickleStorage":
315  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
316  outDir = os.path.dirname(logLoc.locString())
317  if outDir != "" and not os.path.exists(outDir):
318  try:
319  os.makedirs(outDir)
320  except OSError, e:
321  # Don't fail if directory exists due to race
322  if e.errno != 17:
323  raise e
324  with open(logLoc.locString(), "wb") as outfile:
325  cPickle.dump(obj, outfile, cPickle.HIGHEST_PROTOCOL)
326  trace.done()
327  return
328 
329  if storageName == "ConfigStorage":
330  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
331  outDir = os.path.dirname(logLoc.locString())
332  if outDir != "" and not os.path.exists(outDir):
333  try:
334  os.makedirs(outDir)
335  except OSError, e:
336  # Don't fail if directory exists due to race
337  if e.errno != 17:
338  raise e
339  obj.save(logLoc.locString())
340  trace.done()
341  return
342 
343  if storageName == "FitsCatalogStorage":
344  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
345  outDir = os.path.dirname(logLoc.locString())
346  if outDir != "" and not os.path.exists(outDir):
347  try:
348  os.makedirs(outDir)
349  except OSError, e:
350  # Don't fail if directory exists due to race
351  if e.errno != 17:
352  raise e
353  flags = additionalData.getInt("flags", 0)
354  obj.writeFits(logLoc.locString(), flags=flags)
355  trace.done()
356  return
357 
358  # Create a list of Storages for the item.
359  storageList = StorageList()
360  storage = self.persistence.getPersistStorage(storageName, logLoc)
361  storageList.append(storage)
362  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
363 
364  # Persist the item.
365  if hasattr(obj, '__deref__'):
366  # We have a smart pointer, so dereference it.
367  self.persistence.persist(
368  obj.__deref__(), storageList, additionalData)
369  else:
370  self.persistence.persist(obj, storageList, additionalData)
371  trace.done()
372 
373  def subset(self, datasetType, level=None, dataId={}, **rest):
374  """Extracts a subset of a dataset collection.
375 
376  Given a partial dataId specified in dataId and **rest, find all
377  datasets at a given level specified by a dataId key (e.g. visit or
378  sensor or amp for a camera) and return a collection of their dataIds
379  as ButlerDataRefs.
380 
381  @param datasetType (str) the type of dataset collection to subset
382  @param level (str) the level of dataId at which to subset
383  @param dataId (dict) the data id.
384  @param **rest keyword arguments for the data id.
385  @returns (ButlerSubset) collection of ButlerDataRefs for datasets
386  matching the data id.
387  """
388 
389  datasetType = self._resolveDatasetTypeAlias(datasetType)
390  if level is None:
391  level = self.mapper.getDefaultLevel()
392  dataId = self._combineDicts(dataId, **rest)
393  return ButlerSubset(self, datasetType, level, dataId)
394 
395  def dataRef(self, datasetType, level=None, dataId={}, **rest):
396  """Returns a single ButlerDataRef.
397 
398  Given a complete dataId specified in dataId and **rest, find the
399  unique dataset at the given level specified by a dataId key (e.g.
400  visit or sensor or amp for a camera) and return a ButlerDataRef.
401 
402  @param datasetType (str) the type of dataset collection to reference
403  @param level (str) the level of dataId at which to reference
404  @param dataId (dict) the data id.
405  @param **rest keyword arguments for the data id.
406  @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
407  """
408 
409  datasetType = self._resolveDatasetTypeAlias(datasetType)
410  subset = self.subset(datasetType, level, dataId, **rest)
411  if len(subset) != 1:
412  raise RuntimeError, """No unique dataset for:
413  Dataset type = %s
414  Level = %s
415  Data ID = %s
416  Keywords = %s""" % (str(datasetType), str(level), str(dataId), str(rest))
417  return ButlerDataRef(subset, subset.cache[0])
418 
419  def _combineDicts(self, dataId, **rest):
420  finalId = {}
421  finalId.update(dataId)
422  finalId.update(rest)
423  return finalId
424 
425  def _read(self, pythonType, location):
426  trace = pexLog.BlockTimingLog(self.log, "read",
427  pexLog.BlockTimingLog.INSTRUM+1)
428 
429  additionalData = location.getAdditionalData()
430  # Create a list of Storages for the item.
431  storageName = location.getStorageName()
432  results = []
433  locations = location.getLocations()
434  returnList = True
435  if len(locations) == 1:
436  returnList = False
437 
438  for locationString in locations:
439  logLoc = LogicalLocation(locationString, additionalData)
440  trace.start("read from %s(%s)" % (storageName, logLoc.locString()))
441 
442  if storageName == "PafStorage":
443  finalItem = pexPolicy.Policy.createPolicy(logLoc.locString())
444  elif storageName == "PickleStorage":
445  if not os.path.exists(logLoc.locString()):
446  raise RuntimeError, \
447  "No such pickle file: " + logLoc.locString()
448  with open(logLoc.locString(), "rb") as infile:
449  finalItem = cPickle.load(infile)
450  elif storageName == "FitsCatalogStorage":
451  if not os.path.exists(logLoc.locString()):
452  raise RuntimeError, \
453  "No such FITS catalog file: " + logLoc.locString()
454  hdu = additionalData.getInt("hdu", 0)
455  flags = additionalData.getInt("flags", 0)
456  finalItem = pythonType.readFits(logLoc.locString(), hdu, flags)
457  elif storageName == "ConfigStorage":
458  if not os.path.exists(logLoc.locString()):
459  raise RuntimeError, \
460  "No such config file: " + logLoc.locString()
461  finalItem = pythonType()
462  finalItem.load(logLoc.locString())
463  else:
464  storageList = StorageList()
465  storage = self.persistence.getRetrieveStorage(storageName, logLoc)
466  storageList.append(storage)
467  itemData = self.persistence.unsafeRetrieve(
468  location.getCppType(), storageList, additionalData)
469  finalItem = pythonType.swigConvert(itemData)
470  trace.done()
471  results.append(finalItem)
472 
473  if not returnList:
474  results = results[0]
475  return results
476 
477  def __reduce__(self):
478  return (_unreduce, (self.mapper, self.datasetTypeAliasDict))
479 
480  def _resolveDatasetTypeAlias(self, datasetType):
481  """ Replaces all the known alias keywords in the given string with the alias value.
482  @param (str)datasetType
483  @return (str) the de-aliased string
484  """
485 
486  for key in self.datasetTypeAliasDict:
487  # if all aliases have been replaced, bail out
488  if datasetType.find('@') == -1:
489  break
490  datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key])
491 
492  # If an alias specifier can not be resolved then throw.
493  if datasetType.find('@') != -1:
494  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" %(datasetType))
495 
496  return datasetType
497 
498 def _unreduce(mapper, datasetTypeAliasDict):
499  butler = Butler(root=None, mapper=mapper)
500  butler.datasetTypeAliasDict = datasetTypeAliasDict
501  return butler
Class for logical location of a persisted Persistable instance.
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154