27 """This module defines the Butler class."""
28 from future
import standard_library
29 standard_library.install_aliases()
30 from builtins
import str
31 from past.builtins
import basestring
32 from builtins
import object
43 from .
import LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, Persistence, \
44 Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
45 RepositoryArgs, listify, setify, sequencify, doImport
49 """Represents a Butler configuration.
53 cfg is 'wet paint' and very likely to change. Use of it in production
54 code other than via the 'old butler' API is strongly discouraged.
56 yaml_tag =
u"!ButlerCfg"
59 super(ButlerCfg, self).
__init__({
'repoCfg': repoCfg,
'cls': cls})
63 """Container object for repository data used by Butler"""
66 """Initializer for RepoData
68 @param args (RepositoryArgs) Arguments used to initialize self.repo
69 @param cfg (RepositoryCfg) Configuration of repository (this is persisted)
70 @param repo (Repository) The repository class instance
71 @param tags (set) The tags that apply to this repository, if any
79 if not isinstance(tags, set):
80 raise RuntimeError(
"tags passed into RepoData must be in a set.")
87 return "RepoData(args=%s cfg=%s repo=%s tags=%s" % (self.
args, self.
cfg, self.
repo, self.
tags)
91 """Container object for RepoData instances owned by a Butler instance."""
101 """Add a RepoData to the container
103 @param (RepoData) RepoData instance to add
109 self.
byCfgRoot[repoData.args.cfgRoot] = repoData
112 """Get a list of RepoData that are used to as inputs to the Butler.
114 The list is created lazily as needed, and cached.
115 @return a list of RepoData with readable repositories. List is in the order to be use when searching.
118 self.
_inputs = [rd
for rd
in self.byRepoRoot.values()
if 'r' in rd.mode]
122 """Get a list of RepoData that are used to as outputs to the Butler.
124 The list is created lazily as needed, and cached.
125 @return a list of RepoData with writable repositories. List is in the order to be use when searching.
128 self.
_outputs = [rd
for rd
in self.byRepoRoot.values()
if 'w' in rd.mode]
132 """Get a list of all RepoData that are used to as by the Butler.
134 The list is created lazily as needed, and cached.
135 @return a list of RepoData with writable repositories. List is in the order to be use when searching.
137 if self.
_all is None:
138 self.
_all = [rd
for rd
in self.byRepoRoot.values()]
142 return "%s(\nbyRepoRoot=%r, \nbyCfgRoot=%r, \n_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
143 self.__class__.__name__,
152 """Butler provides a generic mechanism for persisting and retrieving data using mappers.
154 A Butler manages a collection of datasets known as a repository. Each
155 dataset has a type representing its intended usage and a location. Note
156 that the dataset type is not the same as the C++ or Python type of the
157 object containing the data. For example, an ExposureF object might be
158 used to hold the data for a raw image, a post-ISR image, a calibrated
159 science image, or a difference image. These would all be different
162 A Butler can produce a collection of possible values for a key (or tuples
163 of values for multiple keys) if given a partial data identifier. It can
164 check for the existence of a file containing a dataset given its type and
165 data identifier. The Butler can then retrieve the dataset. Similarly, it
166 can persist an object to an appropriate location when given its associated
169 Note that the Butler has two more advanced features when retrieving a data
170 set. First, the retrieval is lazy. Input does not occur until the data
171 set is actually accessed. This allows datasets to be retrieved and
172 placed on a clipboard prospectively with little cost, even if the
173 algorithm of a stage ends up not using them. Second, the Butler will call
174 a standardization hook upon retrieval of the dataset. This function,
175 contained in the input mapper object, must perform any necessary
176 manipulations to force the retrieved object to conform to standards,
177 including translating metadata.
181 __init__(self, root, mapper=None, **mapperArgs)
183 defineAlias(self, alias, datasetType)
185 getKeys(self, datasetType=None, level=None)
187 queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
189 datasetExists(self, datasetType, dataId={}, **rest)
191 get(self, datasetType, dataId={}, immediate=False, **rest)
193 put(self, obj, datasetType, dataId={}, **rest)
195 subset(self, datasetType, level=None, dataId={}, **rest)
197 dataRef(self, datasetType, level=None, dataId={}, **rest)
200 def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
201 """Initializer for the Class.
203 The prefered initialization argument is to pass a single arg; cfg created by Butler.cfg();
204 butler = Butler(Butler.cfg(repoCfg))
205 For backward compatibility: this initialization method signature can take a posix root path, and
206 optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
208 However, for this to work in a backward compatible way it creates a single repository that is used as
209 both an input and an output repository. This is NOT preferred, and will likely break any provenance
210 system we have in place.
212 @param root (str) Best practice is to pass in a cfg created by Butler.cfg(). But for backward
213 compatibility this can also be a fileysystem path. Will only work with a
215 @param mapper Deprecated. Provides a mapper to be used with Butler.
216 @param mapperArgs Deprecated. Provides arguments to be passed to the mapper if the mapper input arg
217 is a class type to be instantiated by Butler.
218 @param inputs (RepositoryArg or string) Can be a single item or a list. Provides arguments to load an
219 existing repository (or repositories). String is assumed to be
220 a URI and is used as the cfgRoot (URI to the location of the
221 cfg file). (Local file system URI does not have to start with
222 'file://' and in this way can be a relative path).
223 @param outputs (RepositoryArg) Can be a single item or a list. Provides arguments to load one or more
224 existing repositories or create new ones. String is assumed to be a
225 URI and as used as the repository root.
229 self.
_initArgs = {
'root': root,
'mapper': mapper,
'inputs': inputs,
'outputs': outputs,
230 'mapperArgs': mapperArgs}
232 isLegacyRepository = inputs
is None and outputs
is None
234 if root
is not None and not isLegacyRepository:
236 'root is a deprecated parameter and may not be used with the parameters input and output')
238 if isLegacyRepository:
240 if hasattr(mapper,
'root'):
246 outputs = RepositoryArgs(mode=
'rw',
249 mapperArgs=mapperArgs)
250 outputs.isLegacyRepository =
True
257 self.
log = Log.getLogger(
"daf.persistence.butler")
264 inputs = [RepositoryArgs(cfgRoot=i)
if isinstance(i, basestring)
else i
for i
in inputs]
265 outputs = [RepositoryArgs(root=o)
if isinstance(o, basestring)
else o
for o
in outputs]
279 butlerIOParents = collections.OrderedDict()
280 for args
in outputs + inputs:
282 butlerIOParents[args.cfgRoot] = args
285 self.
_addRepo(args, inout=
'out', defaultMapper=defaultMapper, butlerIOParents=butlerIOParents)
288 self.
_addRepo(args, inout=
'in', butlerIOParents=butlerIOParents)
290 def _addRepo(self, args, inout, defaultMapper=None, butlerIOParents=None, tags=None):
291 """Create a Repository and related infrastructure and add it to the list of repositories.
293 @param args (RepositoryArgs) settings used to create the repository.
294 @param inout (string) must be 'in' our 'out', indicates how the repository is to be used. Input repos
295 are only read from, and output repositories may be read from and/or written to
296 (w/rw of output repos depends on args.mode)
297 @param defaultMapper (mapper class or None ) if a default mapper could be inferred from inputs then
298 this will be a class object that can be used for any
299 outputs that do not explicitly define their mapper. If
300 None then a mapper class could not be inferred and a
301 mapper must be defined by each output.
302 @param butlerIOParents (ordered dict) The keys are cfgRoot of repoArgs, val is the repoArgs.
303 This is all the explicit input and output repositories to the
304 butler __init__ function, it is used when determining what the
305 parents of writable repositories are.
306 @param tags (any or list of any) Any object that can be tested to be the same as the tag in a dataId
307 passed into butler input functions. Applies only to input
308 repositories: If tag is specified by the dataId then the repo will
309 only be read from used if the tag in the dataId matches a tag used
312 if butlerIOParents
is None:
315 tags = copy.copy(
setify(tags))
316 tags.update(args.tags)
320 if args.cfgRoot
in self._repos.byCfgRoot:
321 repoData = self._repos.byCfgRoot[args.cfgRoot]
322 if not repoData.cfg.matchesArgs(args):
323 raise RuntimeError(
"Mismatched repository configurations passed in or loaded:" +
325 "\n\texisting repoData:%s" %
327 repoData.tags.update(
setify(tags))
334 if 'w' not in args.mode:
335 raise RuntimeError(
'Output repositories must be writable.')
337 if 'r' not in args.mode:
338 raise RuntimeError(
'Input repositories must be readable.')
340 raise RuntimeError(
'Unrecognized value for inout:' % inout)
345 cfg = Storage.getRepositoryCfg(args.cfgRoot)
356 for cfgRoot
in butlerIOParents:
357 if cfgRoot
not in cfg.parents
and cfgRoot != args.cfgRoot:
359 "Existing output repository parents do not match butler's inputs.")
360 if not cfg.matchesArgs(args):
362 "Persisted RepositoryCfg and passed-in RepositoryArgs have"
363 " conflicting parameters:\n" +
"\t%s\n\t%s", (cfg, args))
364 if args.mapperArgs
is not None:
365 if cfg.mapperArgs
is None:
366 cfg.mapperArgs = args.mapperArgs
368 cfg.mapperArgs.update(args.mapperArgs)
370 parentsToAdd = copy.copy(cfg.parents)
372 if args.mapper
is None:
373 if defaultMapper
is None:
375 "Could not infer mapper and one not specified in repositoryArgs:%s" % args)
376 args.mapper = defaultMapper
377 parents = [cfgRoot
for cfgRoot
in list(butlerIOParents.keys())
if cfgRoot != args.cfgRoot]
378 cfg = RepositoryCfg.makeFromArgs(args, parents)
379 Storage.putRepositoryCfg(cfg, args.cfgRoot)
381 repo = Repository(cfg)
382 self._repos.add(
RepoData(args, cfg, repo, tags))
383 for parent
in parentsToAdd:
384 if parent
in butlerIOParents:
385 args = butlerIOParents[parent]
387 args = RepositoryArgs(cfgRoot=parent, mode=
'r')
388 self._addRepo(args=args, inout='in', tags=tags)
391 return 'Butler(datasetTypeAliasDict=%s, repos=%s, persistence=%s)' % (
398 if args.mapper
is not None:
404 if isinstance(mapper, basestring):
406 elif not inspect.isclass(mapper):
407 mapper = mapper.__class__
409 cfgRoot = args.cfgRoot
410 mapper = Butler.getMapperClass(cfgRoot)
413 if len(mappers) == 1:
420 """posix-only; gets the mapper class at the path specifed by root (if a file _mapper can be found at
421 that location or in a parent location.
423 As we abstract the storage and support different types of storage locaitons this method will be
424 moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
425 return Storage.getMapperClass(root)
428 """Register an alias that will be substituted in datasetTypes.
430 @param alias (str) the alias keyword. it may start with @ or not. It may not contain @ except as the
432 @param datasetType (str) the string that will be substituted when @alias is passed into datasetType.
433 It may not contain '@'
438 atLoc = alias.rfind(
'@')
440 alias =
"@" + str(alias)
442 raise RuntimeError(
"Badly formatted alias string: %s" % (alias,))
445 if datasetType.count(
'@') != 0:
446 raise RuntimeError(
"Badly formatted type string: %s" % (datasetType))
451 if key.startswith(alias)
or alias.startswith(key):
452 raise RuntimeError(
"Alias: %s overlaps with existing alias: %s" % (alias, key))
456 def getKeys(self, datasetType=None, level=None, tag=None):
457 """Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy
458 for the dataset type or the entire collection if None. The dict values are the basic Python types
459 corresponding to the keys (int, float, str).
461 @param datasetType (str) the type of dataset to get keys for, entire collection if None.
462 @param level (str) the hierarchy level to descend to. None if it should not be restricted. Use an
463 empty string if the mapper should lookup the default level.
464 @param tags (any or list of any) Any object that can be tested to be the same as the tag in a dataId
465 passed into butler input functions. Applies only to input
466 repositories: If tag is specified by the dataId then the repo will
467 only be read from used if the tag in the dataId matches a tag used
469 @returns (dict) valid data id keys; values are corresponding types.
475 for repoData
in self._repos.inputs():
476 if not tag
or len(tag.intersection(repoData.tags)) > 0:
477 keys = repoData.repo.getKeys(datasetType, level)
485 """Returns the valid values for one or more keys when given a partial
486 input collection data id.
488 @param datasetType (str) the type of dataset to inquire about.
489 @param key (str) a key giving the level of granularity of the inquiry.
490 @param format (str, tuple) an optional key or tuple of keys to be returned.
491 @param dataId (DataId, dict) the partial data id.
492 @param **rest keyword arguments for the partial data id.
493 @returns (list) a list of valid values or tuples of valid values as
494 specified by the format (defaulting to the same as the key) at the
495 key's level of granularity.
499 dataId = DataId(dataId)
500 dataId.update(**rest)
508 for repoData
in self._repos.inputs():
509 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
510 tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
529 """Determines if a dataset file exists.
531 @param datasetType (str) the type of dataset to inquire about.
532 @param dataId (DataId, dict) the data id of the dataset.
533 @param **rest keyword arguments for the data id.
534 @returns (bool) True if the dataset exists or is non-file-based.
538 dataId = DataId(dataId)
539 dataId.update(**rest)
542 for repoData
in self._repos.inputs():
543 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
544 location = repoData.repo.map(datasetType, dataId)
551 additionalData = location.getAdditionalData()
552 storageName = location.getStorageName()
553 if storageName
in (
'BoostStorage',
'FitsStorage',
'PafStorage',
554 'PickleStorage',
'ConfigStorage',
'FitsCatalogStorage'):
555 locations = location.getLocations()
556 for locationString
in locations:
557 logLoc = LogicalLocation(locationString, additionalData).locString()
558 if storageName ==
'FitsStorage':
560 bracket = logLoc.find(
'[')
562 logLoc = logLoc[:bracket]
563 if not os.path.exists(logLoc):
566 self.log.warn(
"datasetExists() for non-file storage %s, dataset type=%s, keys=%s",
567 storageName, datasetType, str(dataId))
570 def get(self, datasetType, dataId=None, immediate=False, **rest):
571 """Retrieves a dataset given an input collection data id.
573 @param datasetType (str) the type of dataset to retrieve.
574 @param dataId (dict) the data id.
575 @param immediate (bool) don't use a proxy for delayed loading.
576 @param **rest keyword arguments for the data id.
577 @returns an object retrieved from the dataset (or a proxy for one).
581 dataId = DataId(dataId)
582 dataId.update(**rest)
584 for repoData
in self._repos.inputs():
585 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
586 location = repoData.repo.map(datasetType, dataId)
590 raise NoResults(
"No locations for get:", datasetType, dataId)
592 self.log.debug(
"Get type=%s keys=%s from %s", datasetType, dataId, str(location))
594 if hasattr(location.mapper,
"bypass_" + datasetType):
596 pythonType = location.getPythonType()
597 if pythonType
is not None:
598 if isinstance(pythonType, basestring):
600 pythonTypeTokenList = location.getPythonType().split(
'.')
601 importClassString = pythonTypeTokenList.pop()
602 importClassString = importClassString.strip()
603 importPackage =
".".join(pythonTypeTokenList)
604 importType = __import__(importPackage, globals(), locals(), [importClassString], 0)
605 pythonType = getattr(importType, importClassString)
606 bypassFunc = getattr(location.mapper,
"bypass_" + datasetType)
607 callback =
lambda: bypassFunc(datasetType, pythonType, location, dataId)
609 callback =
lambda: self.
_read(location)
610 if location.mapper.canStandardize(datasetType):
611 innerCallback = callback
612 callback =
lambda: location.mapper.standardize(datasetType, innerCallback(), dataId)
615 return ReadProxy(callback)
617 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
618 """Persists a dataset given an output collection data id.
620 @param obj the object to persist.
621 @param datasetType (str) the type of dataset to persist.
622 @param dataId (dict) the data id.
623 @param doBackup if True, rename existing instead of overwriting
624 @param **rest keyword arguments for the data id.
626 WARNING: Setting doBackup=True is not safe for parallel processing, as it
627 may be subject to race conditions.
631 dataId = DataId(dataId)
632 dataId.update(**rest)
634 for repoData
in self._repos.outputs():
635 location = repoData.repo.map(datasetType, dataId, write=
True)
638 repoData.repo.backup(datasetType, dataId)
639 repoData.repo.write(location, obj)
641 def subset(self, datasetType, level=None, dataId={}, **rest):
642 """Extracts a subset of a dataset collection.
644 Given a partial dataId specified in dataId and **rest, find all
645 datasets at a given level specified by a dataId key (e.g. visit or
646 sensor or amp for a camera) and return a collection of their dataIds
649 @param datasetType (str) the type of dataset collection to subset
650 @param level (str) the level of dataId at which to subset. Use an empty string if the mapper
651 should look up the default level.
652 @param dataId (dict) the data id.
653 @param **rest keyword arguments for the data id.
654 @returns (ButlerSubset) collection of ButlerDataRefs for datasets
655 matching the data id.
666 dataId = DataId(dataId)
667 dataId.update(**rest)
668 return ButlerSubset(self, datasetType, level, dataId)
670 def dataRef(self, datasetType, level=None, dataId={}, **rest):
671 """Returns a single ButlerDataRef.
673 Given a complete dataId specified in dataId and **rest, find the
674 unique dataset at the given level specified by a dataId key (e.g.
675 visit or sensor or amp for a camera) and return a ButlerDataRef.
677 @param datasetType (str) the type of dataset collection to reference
678 @param level (str) the level of dataId at which to reference
679 @param dataId (dict) the data id.
680 @param **rest keyword arguments for the data id.
681 @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
685 dataId = DataId(dataId)
686 subset = self.
subset(datasetType, level, dataId, **rest)
688 raise RuntimeError(
"No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
689 (str(datasetType), str(level), str(dataId), str(rest)))
690 return ButlerDataRef(subset, subset.cache[0])
693 self.log.debug(
"Starting read from %s", location)
694 results = location.repository.read(location)
695 if len(results) == 1:
697 self.log.debug(
"Ending read from %s", location)
705 """ Replaces all the known alias keywords in the given string with the alias value.
706 @param (str)datasetType
707 @return (str) the de-aliased string
712 if datasetType.find(
'@') == -1:
717 if datasetType.find(
'@') != -1:
718 raise RuntimeError(
"Unresolvable alias specifier in datasetType: %s" % (datasetType))
724 mapperArgs = initArgs.pop(
'mapperArgs')
725 initArgs.update(mapperArgs)
726 butler =
Butler(**initArgs)
727 butler.datasetTypeAliasDict = datasetTypeAliasDict
a container for holding hierarchical configuration data in memory.
def _resolveDatasetTypeAlias