27 """This module defines the Butler class."""
29 from __future__
import with_statement
35 from lsst.daf.persistence import StorageList, LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, \
39 """Butler provides a generic mechanism for persisting and retrieving data using mappers.
41 A Butler manages a collection of datasets known as a repository. Each
42 dataset has a type representing its intended usage and a location. Note
43 that the dataset type is not the same as the C++ or Python type of the
44 object containing the data. For example, an ExposureF object might be
45 used to hold the data for a raw image, a post-ISR image, a calibrated
46 science image, or a difference image. These would all be different
49 A Butler can produce a collection of possible values for a key (or tuples
50 of values for multiple keys) if given a partial data identifier. It can
51 check for the existence of a file containing a dataset given its type and
52 data identifier. The Butler can then retrieve the dataset. Similarly, it
53 can persist an object to an appropriate location when given its associated
56 Note that the Butler has two more advanced features when retrieving a data
57 set. First, the retrieval is lazy. Input does not occur until the data
58 set is actually accessed. This allows datasets to be retrieved and
59 placed on a clipboard prospectively with little cost, even if the
60 algorithm of a stage ends up not using them. Second, the Butler will call
61 a standardization hook upon retrieval of the dataset. This function,
62 contained in the input mapper object, must perform any necessary
63 manipulations to force the retrieved object to conform to standards,
64 including translating metadata.
68 __init__(self, root, mapper=None, **mapperArgs)
70 getKeys(self, datasetType=None, level=None)
72 queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
74 datasetExists(self, datasetType, dataId={}, **rest)
76 get(self, datasetType, dataId={}, immediate=False, **rest)
78 put(self, obj, datasetType, dataId={}, **rest)
80 subset(self, datasetType, level=None, dataId={}, **rest))
86 """Return the mapper class associated with a repository root."""
90 mapperFile =
"_mapper"
92 while not os.path.exists(os.path.join(basePath, mapperFile)):
94 if os.path.exists(os.path.join(basePath,
"_parent")):
95 basePath = os.path.join(basePath,
"_parent")
98 "No mapper provided and no %s available" %
100 mapperFile = os.path.join(basePath, mapperFile)
103 with open(mapperFile,
"r") as f:
104 mapperName = f.readline().strip()
105 components = mapperName.split(".")
106 if len(components) <= 1:
107 raise RuntimeError(
"Unqualified mapper name %s in %s" %
108 (mapperName, mapperFile))
109 pkg = importlib.import_module(
".".join(components[:-1]))
110 return getattr(pkg, components[-1])
112 def __init__(self, root, mapper=None, **mapperArgs):
113 """Construct the Butler. If no mapper class is provided, then a file
114 named "_mapper" is expected to be found in the repository, which
115 must be a filesystem path. The first line in that file is read and
116 must contain the fully-qualified name of a Mapper subclass, which is
117 then imported and instantiated using the root and the mapperArgs.
119 @param root (str) the repository to be managed (at least
120 initially). May be None if a mapper is
122 @param mapper (Mapper) if present, the Mapper subclass instance
123 to be used as the butler's mapper.
124 @param **mapperArgs arguments to be passed to the mapper's
125 __init__ method, in addition to the root.
130 if mapper
is not None:
133 cls = Butler.getMapperClass(root)
134 self.
mapper = cls(root=root, **mapperArgs)
140 "daf.persistence.butler")
143 """Register an alias that will be substituted in datasetTypes.
145 @param alias (str) the alias keyword. it may start with @ or not. It may not contain @ except as the
147 @param datasetType (str) the string that will be substituted when @alias is passed into datasetType. It may
153 atLoc = alias.rfind(
'@')
155 alias =
"@" + str(alias)
157 raise RuntimeError(
"Badly formatted alias string: %s" %(alias,))
160 if datasetType.count(
'@') != 0:
161 raise RuntimeError(
"Badly formatted type string: %s" %(datasetType))
166 if key.startswith(alias)
or alias.startswith(key):
167 raise RuntimeError(
"Alias: %s overlaps with existing alias: %s" %(alias, key))
171 def getKeys(self, datasetType=None, level=None):
172 """Returns a dict. The dict keys are the valid data id keys at or
173 above the given level of hierarchy for the dataset type or the entire
174 collection if None. The dict values are the basic Python types
175 corresponding to the keys (int, float, str).
177 @param datasetType (str) the type of dataset to get keys for, entire
179 @param level (str) the hierarchy level to descend to or None.
180 @returns (dict) valid data id keys; values are corresponding types.
184 return self.mapper.getKeys(datasetType, level)
187 """Returns the valid values for one or more keys when given a partial
188 input collection data id.
190 @param datasetType (str) the type of dataset to inquire about.
191 @param key (str) a key giving the level of granularity of the inquiry.
192 @param format (str, tuple) an optional key or tuple of keys to be returned.
193 @param dataId (dict) the partial data id.
194 @param **rest keyword arguments for the partial data id.
195 @returns (list) a list of valid values or tuples of valid values as
196 specified by the format (defaulting to the same as the key) at the
197 key's level of granularity.
204 elif not hasattr(format,
'__iter__'):
206 tuples = self.mapper.queryMetadata(datasetType, key, format, dataId)
208 return [x[0]
for x
in tuples]
212 """Determines if a dataset file exists.
214 @param datasetType (str) the type of dataset to inquire about.
215 @param dataId (dict) the data id of the dataset.
216 @param **rest keyword arguments for the data id.
217 @returns (bool) True if the dataset exists or is non-file-based.
222 location = self.mapper.map(datasetType, dataId)
223 additionalData = location.getAdditionalData()
224 storageName = location.getStorageName()
225 if storageName
in (
'BoostStorage',
'FitsStorage',
'PafStorage',
226 'PickleStorage',
'ConfigStorage',
'FitsCatalogStorage'):
227 locations = location.getLocations()
228 for locationString
in locations:
230 if storageName ==
'FitsStorage':
232 bracket = logLoc.find(
'[')
234 logLoc = logLoc[:bracket]
235 if not os.path.exists(logLoc):
238 self.log.log(pexLog.Log.WARN,
239 "datasetExists() for non-file storage %s, dataset type=%s, keys=%s" %
240 (storageName, datasetType, str(dataId)))
243 def get(self, datasetType, dataId={}, immediate=False, **rest):
244 """Retrieves a dataset given an input collection data id.
246 @param datasetType (str) the type of dataset to retrieve.
247 @param dataId (dict) the data id.
248 @param immediate (bool) don't use a proxy for delayed loading.
249 @param **rest keyword arguments for the data id.
250 @returns an object retrieved from the dataset (or a proxy for one).
255 location = self.mapper.map(datasetType, dataId)
256 self.log.log(pexLog.Log.DEBUG,
"Get type=%s keys=%s from %s" %
257 (datasetType, dataId, str(location)))
259 if location.getPythonType()
is not None:
261 pythonTypeTokenList = location.getPythonType().split(
'.')
262 importClassString = pythonTypeTokenList.pop()
263 importClassString = importClassString.strip()
264 importPackage =
".".join(pythonTypeTokenList)
265 importType = __import__(importPackage, globals(), locals(), \
266 [importClassString], -1)
267 pythonType = getattr(importType, importClassString)
270 if hasattr(self.
mapper,
"bypass_" + datasetType):
271 bypassFunc = getattr(self.
mapper,
"bypass_" + datasetType)
272 callback =
lambda: bypassFunc(datasetType, pythonType,
275 callback =
lambda: self.
_read(pythonType, location)
276 if self.mapper.canStandardize(datasetType):
277 innerCallback = callback
278 callback =
lambda: self.mapper.standardize(datasetType,
279 innerCallback(), dataId)
282 return ReadProxy(callback)
284 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
285 """Persists a dataset given an output collection data id.
287 @param obj the object to persist.
288 @param datasetType (str) the type of dataset to persist.
289 @param dataId (dict) the data id.
290 @param doBackup if True, rename existing instead of overwriting
291 @param **rest keyword arguments for the data id.
293 WARNING: Setting doBackup=True is not safe for parallel processing, as it
294 may be subject to race conditions.
299 self.mapper.backup(datasetType, dataId)
301 location = self.mapper.map(datasetType, dataId, write=
True)
302 self.log.log(pexLog.Log.DEBUG,
"Put type=%s keys=%s to %s" %
303 (datasetType, dataId, str(location)))
304 additionalData = location.getAdditionalData()
305 storageName = location.getStorageName()
306 locations = location.getLocations()
308 locationString = locations[0]
311 pexLog.BlockTimingLog.INSTRUM+1)
312 trace.setUsageFlags(trace.ALLUDATA)
314 if storageName ==
"PickleStorage":
315 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
316 outDir = os.path.dirname(logLoc.locString())
317 if outDir !=
"" and not os.path.exists(outDir):
324 with open(logLoc.locString(),
"wb")
as outfile:
325 cPickle.dump(obj, outfile, cPickle.HIGHEST_PROTOCOL)
329 if storageName ==
"ConfigStorage":
330 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
331 outDir = os.path.dirname(logLoc.locString())
332 if outDir !=
"" and not os.path.exists(outDir):
339 obj.save(logLoc.locString())
343 if storageName ==
"FitsCatalogStorage":
344 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
345 outDir = os.path.dirname(logLoc.locString())
346 if outDir !=
"" and not os.path.exists(outDir):
353 flags = additionalData.getInt(
"flags", 0)
354 obj.writeFits(logLoc.locString(), flags=flags)
359 storageList = StorageList()
360 storage = self.persistence.getPersistStorage(storageName, logLoc)
361 storageList.append(storage)
362 trace.start(
"write to %s(%s)" % (storageName, logLoc.locString()))
365 if hasattr(obj,
'__deref__'):
367 self.persistence.persist(
368 obj.__deref__(), storageList, additionalData)
370 self.persistence.persist(obj, storageList, additionalData)
373 def subset(self, datasetType, level=None, dataId={}, **rest):
374 """Extracts a subset of a dataset collection.
376 Given a partial dataId specified in dataId and **rest, find all
377 datasets at a given level specified by a dataId key (e.g. visit or
378 sensor or amp for a camera) and return a collection of their dataIds
381 @param datasetType (str) the type of dataset collection to subset
382 @param level (str) the level of dataId at which to subset
383 @param dataId (dict) the data id.
384 @param **rest keyword arguments for the data id.
385 @returns (ButlerSubset) collection of ButlerDataRefs for datasets
386 matching the data id.
391 level = self.mapper.getDefaultLevel()
393 return ButlerSubset(self, datasetType, level, dataId)
395 def dataRef(self, datasetType, level=None, dataId={}, **rest):
396 """Returns a single ButlerDataRef.
398 Given a complete dataId specified in dataId and **rest, find the
399 unique dataset at the given level specified by a dataId key (e.g.
400 visit or sensor or amp for a camera) and return a ButlerDataRef.
402 @param datasetType (str) the type of dataset collection to reference
403 @param level (str) the level of dataId at which to reference
404 @param dataId (dict) the data id.
405 @param **rest keyword arguments for the data id.
406 @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
410 subset = self.
subset(datasetType, level, dataId, **rest)
412 raise RuntimeError,
"""No unique dataset for:
416 Keywords = %s""" % (str(datasetType), str(level), str(dataId), str(rest))
417 return ButlerDataRef(subset, subset.cache[0])
421 finalId.update(dataId)
425 def _read(self, pythonType, location):
427 pexLog.BlockTimingLog.INSTRUM+1)
429 additionalData = location.getAdditionalData()
431 storageName = location.getStorageName()
433 locations = location.getLocations()
435 if len(locations) == 1:
438 for locationString
in locations:
440 trace.start(
"read from %s(%s)" % (storageName, logLoc.locString()))
442 if storageName ==
"PafStorage":
443 finalItem = pexPolicy.Policy.createPolicy(logLoc.locString())
444 elif storageName ==
"PickleStorage":
445 if not os.path.exists(logLoc.locString()):
446 raise RuntimeError, \
447 "No such pickle file: " + logLoc.locString()
448 with open(logLoc.locString(),
"rb")
as infile:
449 finalItem = cPickle.load(infile)
450 elif storageName ==
"FitsCatalogStorage":
451 if not os.path.exists(logLoc.locString()):
452 raise RuntimeError, \
453 "No such FITS catalog file: " + logLoc.locString()
454 hdu = additionalData.getInt(
"hdu", 0)
455 flags = additionalData.getInt(
"flags", 0)
456 finalItem = pythonType.readFits(logLoc.locString(), hdu, flags)
457 elif storageName ==
"ConfigStorage":
458 if not os.path.exists(logLoc.locString()):
459 raise RuntimeError, \
460 "No such config file: " + logLoc.locString()
461 finalItem = pythonType()
462 finalItem.load(logLoc.locString())
464 storageList = StorageList()
465 storage = self.persistence.getRetrieveStorage(storageName, logLoc)
466 storageList.append(storage)
467 itemData = self.persistence.unsafeRetrieve(
468 location.getCppType(), storageList, additionalData)
469 finalItem = pythonType.swigConvert(itemData)
471 results.append(finalItem)
481 """ Replaces all the known alias keywords in the given string with the alias value.
482 @param (str)datasetType
483 @return (str) the de-aliased string
488 if datasetType.find(
'@') == -1:
493 if datasetType.find(
'@') != -1:
494 raise RuntimeError(
"Unresolvable alias specifier in datasetType: %s" %(datasetType))
499 butler =
Butler(root=
None, mapper=mapper)
500 butler.datasetTypeAliasDict = datasetTypeAliasDict
Class for logical location of a persisted Persistable instance.
a container for holding hierarchical configuration data in memory.
a place to record messages and descriptions of the state of processing.
def _resolveDatasetTypeAlias