LSSTApplications  10.0-2-g4f67435,11.0.rc2+1,11.0.rc2+12,11.0.rc2+3,11.0.rc2+4,11.0.rc2+5,11.0.rc2+6,11.0.rc2+7,11.0.rc2+8
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008, 2009, 2010 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 
29 from __future__ import with_statement
30 import cPickle
31 import importlib
32 import os
33 import lsst.pex.logging as pexLog
34 import lsst.pex.policy as pexPolicy
35 from lsst.daf.persistence import StorageList, LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, \
36  Persistence
37 
38 class Butler(object):
39  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
40 
41  A Butler manages a collection of datasets known as a repository. Each
42  dataset has a type representing its intended usage and a location. Note
43  that the dataset type is not the same as the C++ or Python type of the
44  object containing the data. For example, an ExposureF object might be
45  used to hold the data for a raw image, a post-ISR image, a calibrated
46  science image, or a difference image. These would all be different
47  dataset types.
48 
49  A Butler can produce a collection of possible values for a key (or tuples
50  of values for multiple keys) if given a partial data identifier. It can
51  check for the existence of a file containing a dataset given its type and
52  data identifier. The Butler can then retrieve the dataset. Similarly, it
53  can persist an object to an appropriate location when given its associated
54  data identifier.
55 
56  Note that the Butler has two more advanced features when retrieving a data
57  set. First, the retrieval is lazy. Input does not occur until the data
58  set is actually accessed. This allows datasets to be retrieved and
59  placed on a clipboard prospectively with little cost, even if the
60  algorithm of a stage ends up not using them. Second, the Butler will call
61  a standardization hook upon retrieval of the dataset. This function,
62  contained in the input mapper object, must perform any necessary
63  manipulations to force the retrieved object to conform to standards,
64  including translating metadata.
65 
66  Public methods:
67 
68  __init__(self, root, mapper=None, **mapperArgs)
69 
70  getKeys(self, datasetType=None, level=None)
71 
72  queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
73 
74  datasetExists(self, datasetType, dataId={}, **rest)
75 
76  get(self, datasetType, dataId={}, immediate=False, **rest)
77 
78  put(self, obj, datasetType, dataId={}, **rest)
79 
80  subset(self, datasetType, level=None, dataId={}, **rest))
81 
82  """
83 
84  @staticmethod
85  def getMapperClass(root):
86  """Return the mapper class associated with a repository root."""
87 
88  # Find a "_mapper" file containing the mapper class name
89  basePath = root
90  mapperFile = "_mapper"
91  globals = {}
92  while not os.path.exists(os.path.join(basePath, mapperFile)):
93  # Break abstraction by following _parent links from CameraMapper
94  if os.path.exists(os.path.join(basePath, "_parent")):
95  basePath = os.path.join(basePath, "_parent")
96  else:
97  raise RuntimeError(
98  "No mapper provided and no %s available" %
99  (mapperFile,))
100  mapperFile = os.path.join(basePath, mapperFile)
101 
102  # Read the name of the mapper class and instantiate it
103  with open(mapperFile, "r") as f:
104  mapperName = f.readline().strip()
105  components = mapperName.split(".")
106  if len(components) <= 1:
107  raise RuntimeError("Unqualified mapper name %s in %s" %
108  (mapperName, mapperFile))
109  pkg = importlib.import_module(".".join(components[:-1]))
110  return getattr(pkg, components[-1])
111 
112  def __init__(self, root, mapper=None, **mapperArgs):
113  """Construct the Butler. If no mapper class is provided, then a file
114  named "_mapper" is expected to be found in the repository, which
115  must be a filesystem path. The first line in that file is read and
116  must contain the fully-qualified name of a Mapper subclass, which is
117  then imported and instantiated using the root and the mapperArgs.
118 
119  @param root (str) the repository to be managed (at least
120  initially). May be None if a mapper is
121  provided.
122  @param mapper (Mapper) if present, the Mapper subclass instance
123  to be used as the butler's mapper.
124  @param **mapperArgs arguments to be passed to the mapper's
125  __init__ method, in addition to the root."""
126 
127  if mapper is not None:
128  self.mapper = mapper
129  else:
130  cls = Butler.getMapperClass(root)
131  self.mapper = cls(root=root, **mapperArgs)
132 
133  # Always use an empty Persistence policy until we can get rid of it
134  persistencePolicy = pexPolicy.Policy()
135  self.persistence = Persistence.getPersistence(persistencePolicy)
136  self.log = pexLog.Log(pexLog.Log.getDefaultLog(),
137  "daf.persistence.butler")
138 
139  def getKeys(self, datasetType=None, level=None):
140 
141  """Returns a dict. The dict keys are the valid data id keys at or
142  above the given level of hierarchy for the dataset type or the entire
143  collection if None. The dict values are the basic Python types
144  corresponding to the keys (int, float, str).
145 
146  @param datasetType (str) the type of dataset to get keys for, entire
147  collection if None.
148  @param level (str) the hierarchy level to descend to or None.
149  @returns (dict) valid data id keys; values are corresponding types."""
150 
151  return self.mapper.getKeys(datasetType, level)
152 
153  def queryMetadata(self, datasetType, key, format=None, dataId={}, **rest):
154  """Returns the valid values for one or more keys when given a partial
155  input collection data id.
156 
157  @param datasetType (str) the type of dataset to inquire about.
158  @param key (str) a key giving the level of granularity of the inquiry.
159  @param format (str, tuple) an optional key or tuple of keys to be returned.
160  @param dataId (dict) the partial data id.
161  @param **rest keyword arguments for the partial data id.
162  @returns (list) a list of valid values or tuples of valid values as
163  specified by the format (defaulting to the same as the key) at the
164  key's level of granularity.
165  """
166 
167  dataId = self._combineDicts(dataId, **rest)
168  if format is None:
169  format = (key,)
170  elif not hasattr(format, '__iter__'):
171  format = (format,)
172  tuples = self.mapper.queryMetadata(datasetType, key, format, dataId)
173  if len(format) == 1:
174  return [x[0] for x in tuples]
175  return tuples
176 
177  def datasetExists(self, datasetType, dataId={}, **rest):
178  """Determines if a dataset file exists.
179 
180  @param datasetType (str) the type of dataset to inquire about.
181  @param dataId (dict) the data id of the dataset.
182  @param **rest keyword arguments for the data id.
183  @returns (bool) True if the dataset exists or is non-file-based.
184  """
185 
186  dataId = self._combineDicts(dataId, **rest)
187  location = self.mapper.map(datasetType, dataId)
188  additionalData = location.getAdditionalData()
189  storageName = location.getStorageName()
190  if storageName in ('BoostStorage', 'FitsStorage', 'PafStorage',
191  'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage'):
192  locations = location.getLocations()
193  for locationString in locations:
194  logLoc = LogicalLocation(locationString, additionalData).locString()
195  if storageName == 'FitsStorage':
196  # Strip off directives for cfitsio (in square brackets, e.g., extension name)
197  bracket = logLoc.find('[')
198  if bracket > 0:
199  logLoc = logLoc[:bracket]
200  if not os.path.exists(logLoc):
201  return False
202  return True
203  self.log.log(pexLog.Log.WARN,
204  "datasetExists() for non-file storage %s, dataset type=%s, keys=%s" %
205  (storageName, datasetType, str(dataId)))
206  return True
207 
208  def get(self, datasetType, dataId={}, immediate=False, **rest):
209  """Retrieves a dataset given an input collection data id.
210 
211  @param datasetType (str) the type of dataset to retrieve.
212  @param dataId (dict) the data id.
213  @param immediate (bool) don't use a proxy for delayed loading.
214  @param **rest keyword arguments for the data id.
215  @returns an object retrieved from the dataset (or a proxy for one).
216  """
217  dataId = self._combineDicts(dataId, **rest)
218  location = self.mapper.map(datasetType, dataId)
219  self.log.log(pexLog.Log.DEBUG, "Get type=%s keys=%s from %s" %
220  (datasetType, dataId, str(location)))
221 
222  if location.getPythonType() is not None:
223  # import this pythonType dynamically
224  pythonTypeTokenList = location.getPythonType().split('.')
225  importClassString = pythonTypeTokenList.pop()
226  importClassString = importClassString.strip()
227  importPackage = ".".join(pythonTypeTokenList)
228  importType = __import__(importPackage, globals(), locals(), \
229  [importClassString], -1)
230  pythonType = getattr(importType, importClassString)
231  else:
232  pythonType = None
233  if hasattr(self.mapper, "bypass_" + datasetType):
234  bypassFunc = getattr(self.mapper, "bypass_" + datasetType)
235  callback = lambda: bypassFunc(datasetType, pythonType,
236  location, dataId)
237  else:
238  callback = lambda: self._read(pythonType, location)
239  if self.mapper.canStandardize(datasetType):
240  innerCallback = callback
241  callback = lambda: self.mapper.standardize(datasetType,
242  innerCallback(), dataId)
243  if immediate:
244  return callback()
245  return ReadProxy(callback)
246 
247  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
248  """Persists a dataset given an output collection data id.
249 
250  @param obj the object to persist.
251  @param datasetType (str) the type of dataset to persist.
252  @param dataId (dict) the data id.
253  @param doBackup if True, rename existing instead of overwriting
254  @param **rest keyword arguments for the data id.
255 
256  WARNING: Setting doBackup=True is not safe for parallel processing, as it
257  may be subject to race conditions.
258  """
259  if doBackup:
260  self.mapper.backup(datasetType, dataId)
261  dataId = self._combineDicts(dataId, **rest)
262  location = self.mapper.map(datasetType, dataId, write=True)
263  self.log.log(pexLog.Log.DEBUG, "Put type=%s keys=%s to %s" %
264  (datasetType, dataId, str(location)))
265  additionalData = location.getAdditionalData()
266  storageName = location.getStorageName()
267  locations = location.getLocations()
268  # TODO support multiple output locations
269  locationString = locations[0]
270  logLoc = LogicalLocation(locationString, additionalData)
271  trace = pexLog.BlockTimingLog(self.log, "put",
272  pexLog.BlockTimingLog.INSTRUM+1)
273  trace.setUsageFlags(trace.ALLUDATA)
274 
275  if storageName == "PickleStorage":
276  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
277  outDir = os.path.dirname(logLoc.locString())
278  if outDir != "" and not os.path.exists(outDir):
279  try:
280  os.makedirs(outDir)
281  except OSError, e:
282  # Don't fail if directory exists due to race
283  if e.errno != 17:
284  raise e
285  with open(logLoc.locString(), "wb") as outfile:
286  cPickle.dump(obj, outfile, cPickle.HIGHEST_PROTOCOL)
287  trace.done()
288  return
289 
290  if storageName == "ConfigStorage":
291  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
292  outDir = os.path.dirname(logLoc.locString())
293  if outDir != "" and not os.path.exists(outDir):
294  try:
295  os.makedirs(outDir)
296  except OSError, e:
297  # Don't fail if directory exists due to race
298  if e.errno != 17:
299  raise e
300  obj.save(logLoc.locString())
301  trace.done()
302  return
303 
304  if storageName == "FitsCatalogStorage":
305  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
306  outDir = os.path.dirname(logLoc.locString())
307  if outDir != "" and not os.path.exists(outDir):
308  try:
309  os.makedirs(outDir)
310  except OSError, e:
311  # Don't fail if directory exists due to race
312  if e.errno != 17:
313  raise e
314  flags = additionalData.getInt("flags", 0)
315  obj.writeFits(logLoc.locString(), flags=flags)
316  trace.done()
317  return
318 
319  # Create a list of Storages for the item.
320  storageList = StorageList()
321  storage = self.persistence.getPersistStorage(storageName, logLoc)
322  storageList.append(storage)
323  trace.start("write to %s(%s)" % (storageName, logLoc.locString()))
324 
325  # Persist the item.
326  if hasattr(obj, '__deref__'):
327  # We have a smart pointer, so dereference it.
328  self.persistence.persist(
329  obj.__deref__(), storageList, additionalData)
330  else:
331  self.persistence.persist(obj, storageList, additionalData)
332  trace.done()
333 
334  def subset(self, datasetType, level=None, dataId={}, **rest):
335  """Extracts a subset of a dataset collection.
336 
337  Given a partial dataId specified in dataId and **rest, find all
338  datasets at a given level specified by a dataId key (e.g. visit or
339  sensor or amp for a camera) and return a collection of their dataIds
340  as ButlerDataRefs.
341 
342  @param datasetType (str) the type of dataset collection to subset
343  @param level (str) the level of dataId at which to subset
344  @param dataId (dict) the data id.
345  @param **rest keyword arguments for the data id.
346  @returns (ButlerSubset) collection of ButlerDataRefs for datasets
347  matching the data id."""
348 
349  if level is None:
350  level = self.mapper.getDefaultLevel()
351  dataId = self._combineDicts(dataId, **rest)
352  return ButlerSubset(self, datasetType, level, dataId)
353 
354  def dataRef(self, datasetType, level=None, dataId={}, **rest):
355  """Returns a single ButlerDataRef.
356 
357  Given a complete dataId specified in dataId and **rest, find the
358  unique dataset at the given level specified by a dataId key (e.g.
359  visit or sensor or amp for a camera) and return a ButlerDataRef.
360 
361  @param datasetType (str) the type of dataset collection to reference
362  @param level (str) the level of dataId at which to reference
363  @param dataId (dict) the data id.
364  @param **rest keyword arguments for the data id.
365  @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
366  """
367 
368  subset = self.subset(datasetType, level, dataId, **rest)
369  if len(subset) != 1:
370  raise RuntimeError, """No unique dataset for:
371  Dataset type = %s
372  Level = %s
373  Data ID = %s
374  Keywords = %s""" % (str(datasetType), str(level), str(dataId), str(rest))
375  return ButlerDataRef(subset, subset.cache[0])
376 
377  def _combineDicts(self, dataId, **rest):
378  finalId = {}
379  finalId.update(dataId)
380  finalId.update(rest)
381  return finalId
382 
383  def _read(self, pythonType, location):
384  trace = pexLog.BlockTimingLog(self.log, "read",
385  pexLog.BlockTimingLog.INSTRUM+1)
386 
387  additionalData = location.getAdditionalData()
388  # Create a list of Storages for the item.
389  storageName = location.getStorageName()
390  results = []
391  locations = location.getLocations()
392  returnList = True
393  if len(locations) == 1:
394  returnList = False
395 
396  for locationString in locations:
397  logLoc = LogicalLocation(locationString, additionalData)
398  trace.start("read from %s(%s)" % (storageName, logLoc.locString()))
399 
400  if storageName == "PafStorage":
401  finalItem = pexPolicy.Policy.createPolicy(logLoc.locString())
402  elif storageName == "PickleStorage":
403  if not os.path.exists(logLoc.locString()):
404  raise RuntimeError, \
405  "No such pickle file: " + logLoc.locString()
406  with open(logLoc.locString(), "rb") as infile:
407  finalItem = cPickle.load(infile)
408  elif storageName == "FitsCatalogStorage":
409  if not os.path.exists(logLoc.locString()):
410  raise RuntimeError, \
411  "No such FITS catalog file: " + logLoc.locString()
412  hdu = additionalData.getInt("hdu", 0)
413  flags = additionalData.getInt("flags", 0)
414  finalItem = pythonType.readFits(logLoc.locString(), hdu, flags)
415  elif storageName == "ConfigStorage":
416  if not os.path.exists(logLoc.locString()):
417  raise RuntimeError, \
418  "No such config file: " + logLoc.locString()
419  finalItem = pythonType()
420  finalItem.load(logLoc.locString())
421  else:
422  storageList = StorageList()
423  storage = self.persistence.getRetrieveStorage(storageName, logLoc)
424  storageList.append(storage)
425  itemData = self.persistence.unsafeRetrieve(
426  location.getCppType(), storageList, additionalData)
427  finalItem = pythonType.swigConvert(itemData)
428  trace.done()
429  results.append(finalItem)
430 
431  if not returnList:
432  results = results[0]
433  return results
434 
435  def __reduce__(self):
436  return (_unreduce, (self.mapper,))
437 
438 
439 def _unreduce(mapper):
440  return Butler(root=None, mapper=mapper)
Class for logical location of a persisted Persistable instance.
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154