27 """This module defines the Butler class."""
29 from __future__
import with_statement
35 from lsst.daf.persistence import StorageList, LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, \
39 """Butler provides a generic mechanism for persisting and retrieving data using mappers.
41 A Butler manages a collection of datasets known as a repository. Each
42 dataset has a type representing its intended usage and a location. Note
43 that the dataset type is not the same as the C++ or Python type of the
44 object containing the data. For example, an ExposureF object might be
45 used to hold the data for a raw image, a post-ISR image, a calibrated
46 science image, or a difference image. These would all be different
49 A Butler can produce a collection of possible values for a key (or tuples
50 of values for multiple keys) if given a partial data identifier. It can
51 check for the existence of a file containing a dataset given its type and
52 data identifier. The Butler can then retrieve the dataset. Similarly, it
53 can persist an object to an appropriate location when given its associated
56 Note that the Butler has two more advanced features when retrieving a data
57 set. First, the retrieval is lazy. Input does not occur until the data
58 set is actually accessed. This allows datasets to be retrieved and
59 placed on a clipboard prospectively with little cost, even if the
60 algorithm of a stage ends up not using them. Second, the Butler will call
61 a standardization hook upon retrieval of the dataset. This function,
62 contained in the input mapper object, must perform any necessary
63 manipulations to force the retrieved object to conform to standards,
64 including translating metadata.
68 __init__(self, root, mapper=None, **mapperArgs)
70 getKeys(self, datasetType=None, level=None)
72 queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
74 datasetExists(self, datasetType, dataId={}, **rest)
76 get(self, datasetType, dataId={}, immediate=False, **rest)
78 put(self, obj, datasetType, dataId={}, **rest)
80 subset(self, datasetType, level=None, dataId={}, **rest))
86 """Return the mapper class associated with a repository root."""
90 mapperFile =
"_mapper"
92 while not os.path.exists(os.path.join(basePath, mapperFile)):
94 if os.path.exists(os.path.join(basePath,
"_parent")):
95 basePath = os.path.join(basePath,
"_parent")
98 "No mapper provided and no %s available" %
100 mapperFile = os.path.join(basePath, mapperFile)
103 with open(mapperFile,
"r") as f:
104 mapperName = f.readline().strip()
105 components = mapperName.split(".")
106 if len(components) <= 1:
107 raise RuntimeError(
"Unqualified mapper name %s in %s" %
108 (mapperName, mapperFile))
109 pkg = importlib.import_module(
".".join(components[:-1]))
110 return getattr(pkg, components[-1])
112 def __init__(self, root, mapper=None, **mapperArgs):
113 """Construct the Butler. If no mapper class is provided, then a file
114 named "_mapper" is expected to be found in the repository, which
115 must be a filesystem path. The first line in that file is read and
116 must contain the fully-qualified name of a Mapper subclass, which is
117 then imported and instantiated using the root and the mapperArgs.
119 @param root (str) the repository to be managed (at least
120 initially). May be None if a mapper is
122 @param mapper (Mapper) if present, the Mapper subclass instance
123 to be used as the butler's mapper.
124 @param **mapperArgs arguments to be passed to the mapper's
125 __init__ method, in addition to the root."""
127 if mapper
is not None:
130 cls = Butler.getMapperClass(root)
131 self.
mapper = cls(root=root, **mapperArgs)
137 "daf.persistence.butler")
139 def getKeys(self, datasetType=None, level=None):
141 """Returns a dict. The dict keys are the valid data id keys at or
142 above the given level of hierarchy for the dataset type or the entire
143 collection if None. The dict values are the basic Python types
144 corresponding to the keys (int, float, str).
146 @param datasetType (str) the type of dataset to get keys for, entire
148 @param level (str) the hierarchy level to descend to or None.
149 @returns (dict) valid data id keys; values are corresponding types."""
151 return self.mapper.getKeys(datasetType, level)
154 """Returns the valid values for one or more keys when given a partial
155 input collection data id.
157 @param datasetType (str) the type of dataset to inquire about.
158 @param key (str) a key giving the level of granularity of the inquiry.
159 @param format (str, tuple) an optional key or tuple of keys to be returned.
160 @param dataId (dict) the partial data id.
161 @param **rest keyword arguments for the partial data id.
162 @returns (list) a list of valid values or tuples of valid values as
163 specified by the format (defaulting to the same as the key) at the
164 key's level of granularity.
170 elif not hasattr(format,
'__iter__'):
172 tuples = self.mapper.queryMetadata(datasetType, key, format, dataId)
174 return [x[0]
for x
in tuples]
178 """Determines if a dataset file exists.
180 @param datasetType (str) the type of dataset to inquire about.
181 @param dataId (dict) the data id of the dataset.
182 @param **rest keyword arguments for the data id.
183 @returns (bool) True if the dataset exists or is non-file-based.
187 location = self.mapper.map(datasetType, dataId)
188 additionalData = location.getAdditionalData()
189 storageName = location.getStorageName()
190 if storageName
in (
'BoostStorage',
'FitsStorage',
'PafStorage',
191 'PickleStorage',
'ConfigStorage',
'FitsCatalogStorage'):
192 locations = location.getLocations()
193 for locationString
in locations:
195 if storageName ==
'FitsStorage':
197 bracket = logLoc.find(
'[')
199 logLoc = logLoc[:bracket]
200 if not os.path.exists(logLoc):
203 self.log.log(pexLog.Log.WARN,
204 "datasetExists() for non-file storage %s, dataset type=%s, keys=%s" %
205 (storageName, datasetType, str(dataId)))
208 def get(self, datasetType, dataId={}, immediate=False, **rest):
209 """Retrieves a dataset given an input collection data id.
211 @param datasetType (str) the type of dataset to retrieve.
212 @param dataId (dict) the data id.
213 @param immediate (bool) don't use a proxy for delayed loading.
214 @param **rest keyword arguments for the data id.
215 @returns an object retrieved from the dataset (or a proxy for one).
218 location = self.mapper.map(datasetType, dataId)
219 self.log.log(pexLog.Log.DEBUG,
"Get type=%s keys=%s from %s" %
220 (datasetType, dataId, str(location)))
222 if location.getPythonType()
is not None:
224 pythonTypeTokenList = location.getPythonType().split(
'.')
225 importClassString = pythonTypeTokenList.pop()
226 importClassString = importClassString.strip()
227 importPackage =
".".join(pythonTypeTokenList)
228 importType = __import__(importPackage, globals(), locals(), \
229 [importClassString], -1)
230 pythonType = getattr(importType, importClassString)
233 if hasattr(self.
mapper,
"bypass_" + datasetType):
234 bypassFunc = getattr(self.
mapper,
"bypass_" + datasetType)
235 callback =
lambda: bypassFunc(datasetType, pythonType,
238 callback =
lambda: self.
_read(pythonType, location)
239 if self.mapper.canStandardize(datasetType):
240 innerCallback = callback
241 callback =
lambda: self.mapper.standardize(datasetType,
242 innerCallback(), dataId)
245 return ReadProxy(callback)
247 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
248 """Persists a dataset given an output collection data id.
250 @param obj the object to persist.
251 @param datasetType (str) the type of dataset to persist.
252 @param dataId (dict) the data id.
253 @param doBackup if True, rename existing instead of overwriting
254 @param **rest keyword arguments for the data id.
256 WARNING: Setting doBackup=True is not safe for parallel processing, as it
257 may be subject to race conditions.
260 self.mapper.backup(datasetType, dataId)
262 location = self.mapper.map(datasetType, dataId, write=
True)
263 self.log.log(pexLog.Log.DEBUG,
"Put type=%s keys=%s to %s" %
264 (datasetType, dataId, str(location)))
265 additionalData = location.getAdditionalData()
266 storageName = location.getStorageName()
267 locations = location.getLocations()
269 locationString = locations[0]
272 pexLog.BlockTimingLog.INSTRUM+1)
273 trace.setUsageFlags(trace.ALLUDATA)
275 if storageName ==
"PickleStorage":
276 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
277 outDir = os.path.dirname(logLoc.locString())
278 if outDir !=
"" and not os.path.exists(outDir):
285 with open(logLoc.locString(),
"wb")
as outfile:
286 cPickle.dump(obj, outfile, cPickle.HIGHEST_PROTOCOL)
290 if storageName ==
"ConfigStorage":
291 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
292 outDir = os.path.dirname(logLoc.locString())
293 if outDir !=
"" and not os.path.exists(outDir):
300 obj.save(logLoc.locString())
304 if storageName ==
"FitsCatalogStorage":
305 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
306 outDir = os.path.dirname(logLoc.locString())
307 if outDir !=
"" and not os.path.exists(outDir):
314 flags = additionalData.getInt(
"flags", 0)
315 obj.writeFits(logLoc.locString(), flags=flags)
320 storageList = StorageList()
321 storage = self.persistence.getPersistStorage(storageName, logLoc)
322 storageList.append(storage)
323 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
326 if hasattr(obj,
'__deref__'):
328 self.persistence.persist(
329 obj.__deref__(), storageList, additionalData)
331 self.persistence.persist(obj, storageList, additionalData)
334 def subset(self, datasetType, level=None, dataId={}, **rest):
335 """Extracts a subset of a dataset collection.
337 Given a partial dataId specified in dataId and **rest, find all
338 datasets at a given level specified by a dataId key (e.g. visit or
339 sensor or amp for a camera) and return a collection of their dataIds
342 @param datasetType (str) the type of dataset collection to subset
343 @param level (str) the level of dataId at which to subset
344 @param dataId (dict) the data id.
345 @param **rest keyword arguments for the data id.
346 @returns (ButlerSubset) collection of ButlerDataRefs for datasets
347 matching the data id."""
350 level = self.mapper.getDefaultLevel()
352 return ButlerSubset(self, datasetType, level, dataId)
354 def dataRef(self, datasetType, level=None, dataId={}, **rest):
355 """Returns a single ButlerDataRef.
357 Given a complete dataId specified in dataId and **rest, find the
358 unique dataset at the given level specified by a dataId key (e.g.
359 visit or sensor or amp for a camera) and return a ButlerDataRef.
361 @param datasetType (str) the type of dataset collection to reference
362 @param level (str) the level of dataId at which to reference
363 @param dataId (dict) the data id.
364 @param **rest keyword arguments for the data id.
365 @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
368 subset = self.
subset(datasetType, level, dataId, **rest)
370 raise RuntimeError,
"""No unique dataset for:
374 Keywords = %s""" % (str(datasetType), str(level), str(dataId), str(rest))
375 return ButlerDataRef(subset, subset.cache[0])
379 finalId.update(dataId)
383 def _read(self, pythonType, location):
385 pexLog.BlockTimingLog.INSTRUM+1)
387 additionalData = location.getAdditionalData()
389 storageName = location.getStorageName()
391 locations = location.getLocations()
393 if len(locations) == 1:
396 for locationString
in locations:
398 trace.start(
"read from %s(%s)" % (storageName, logLoc.locString()))
400 if storageName ==
"PafStorage":
401 finalItem = pexPolicy.Policy.createPolicy(logLoc.locString())
402 elif storageName ==
"PickleStorage":
403 if not os.path.exists(logLoc.locString()):
404 raise RuntimeError, \
405 "No such pickle file: " + logLoc.locString()
406 with open(logLoc.locString(),
"rb")
as infile:
407 finalItem = cPickle.load(infile)
408 elif storageName ==
"FitsCatalogStorage":
409 if not os.path.exists(logLoc.locString()):
410 raise RuntimeError, \
411 "No such FITS catalog file: " + logLoc.locString()
412 hdu = additionalData.getInt(
"hdu", 0)
413 flags = additionalData.getInt(
"flags", 0)
414 finalItem = pythonType.readFits(logLoc.locString(), hdu, flags)
415 elif storageName ==
"ConfigStorage":
416 if not os.path.exists(logLoc.locString()):
417 raise RuntimeError, \
418 "No such config file: " + logLoc.locString()
419 finalItem = pythonType()
420 finalItem.load(logLoc.locString())
422 storageList = StorageList()
423 storage = self.persistence.getRetrieveStorage(storageName, logLoc)
424 storageList.append(storage)
425 itemData = self.persistence.unsafeRetrieve(
426 location.getCppType(), storageList, additionalData)
427 finalItem = pythonType.swigConvert(itemData)
429 results.append(finalItem)
436 return (_unreduce, (self.
mapper,))
440 return Butler(root=
None, mapper=mapper)
Class for logical location of a persisted Persistable instance.
a container for holding hierarchical configuration data in memory.
a place to record messages and descriptions of the state of processing.