27 """This module defines the Butler class."""
34 from .
import ReadProxy, ButlerSubset, ButlerDataRef, \
35 Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
36 RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
37 genericDisassembler, PosixStorage, ParentsMismatch
39 preinitedMapperWarning = (
"Passing an instantiated mapper into "
40 "Butler.__init__ will prevent Butler from passing "
41 "parentRegistry or repositoryCfg information to "
42 "the mapper, which is done only at init time. "
43 "It is better to pass a importable string or "
48 """Represents a Butler configuration.
52 cfg is 'wet paint' and very likely to change. Use of it in production
53 code other than via the 'old butler' API is strongly discouraged.
55 yaml_tag =
u"!ButlerCfg"
58 super().
__init__({
'repoCfg': repoCfg,
'cls': cls})
62 """Container object for repository data used by Butler
67 The arguments that are used to find or create the RepositoryCfg.
69 "input", "output", or "parent", indicating why Butler loaded this repository.
70 * input: the Repository was passed as a Butler input.
71 * output: the Repository was passed as a Butler output.
72 * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
77 The configuration for the Repository.
80 "new", "existing", or "nested". Indicates the origin of the repository and its RepositoryCfg:
81 * new: it was created by this instance of Butler, and this instance of Butler will generate the
83 * existing: it was found (via the root or cfgRoot argument)
84 * nested: the full RepositoryCfg was nested in another RepositoryCfg's parents list (this can happen
85 if parameters of an input specified by RepositoryArgs or dict does not entirely match an existing
89 Path or URI to the location of the RepositoryCfg file.
91 repo : lsst.daf.persistence.Repository
92 The Repository class instance.
94 parentRepoDatas : list of RepoData
95 The parents of this Repository, as indicated this Repository's RepositoryCfg. If this is a new
96 Repository then these are the inputs to this Butler (and will be saved in the RepositoryCfg). These
97 RepoData objects are not owned by this RepoData, these are references to peer RepoData objects in the
98 Butler's RepoDataContainer.
100 isV1Repository : bool
101 True if this is an Old Butler repository. In this case the repository does not have a RepositoryCfg
102 file. It may have a _mapper file and may have a _parent symlink. It will never be treated as a "new"
103 repository, i.e. even though there is not a RepositoryCfg file, one will not be generated.
104 If False, this is a New Butler repository and is specified by RepositoryCfg file.
107 These are values that may be used to restrict the search of input repositories. Details are available
108 in the RepositoryArgs and DataId classes.
111 "input", "output", or "parent", indicating why Butler loaded this repository.
112 * input: the Repository was passed as a Butler input.
113 * output: the Repository was passed as a Butler output.
114 * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
116 _repoArgs : RepositoryArgs
117 Contains the arguments that were used to specify this Repository.
147 "parentRepoDatas={},"
150 "parentRegistry={})").
format(
151 self.__class__.__name__,
163 def setCfg(self, cfg, origin, root, isV1Repository):
164 """Set information about the cfg into the RepoData
169 The RepositoryCfg for the repo.
171 'new', 'existing', or 'nested'
173 URI or absolute path to the location of the RepositoryCfg.yaml file.
179 if origin
not in (
'new',
'existing',
'nested'):
180 raise RuntimeError(
"Invalid value for origin:{}".
format(origin))
200 if val
not in (
'input',
'output',
'parent'):
201 raise RuntimeError(
"Invalid value for role: {}".
format(val))
205 """Get the parents & grandparents etc of this repo data, in depth-first search order.
207 Duplicate entries will be removed in cases where the same parent appears more than once in the parent
212 context : set, optional
213 Users should typically omit context and accept the default argument. Context is used to keep a set
214 of known RepoDatas when calling this function recursively, for duplicate elimination.
219 A list of the parents & grandparents etc of a given repo data, in depth-first search order.
224 if id(self)
in context:
226 context.add(
id(self))
228 parents.append(parent)
229 parents += parent.getParentRepoDatas(context)
240 """Container object for RepoData instances owned by a Butler instance.
244 repoDataList : list of RepoData
245 repoData - RepoData instance to add
251 self.
_all = repoDataList
255 """Get a list of RepoData that are used to as inputs to the Butler.
256 The list is created lazily as needed, and cached.
260 A list of RepoData with readable repositories, in the order to be used when searching.
263 raise RuntimeError(
"Inputs not yet initialized.")
267 """Get a list of RepoData that are used to as outputs to the Butler.
268 The list is created lazily as needed, and cached.
272 A list of RepoData with writable repositories, in the order to be use when searching.
275 raise RuntimeError(
"Outputs not yet initialized.")
279 """Get a list of all RepoData that are used to as by the Butler.
280 The list is created lazily as needed, and cached.
284 A list of RepoData with writable repositories, in the order to be use when searching.
289 return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
290 self.__class__.__name__,
295 def _buildLookupLists(self):
296 """Build the inputs and outputs lists based on the order of self.all()."""
298 def addToList(repoData, lst):
299 """Add a repoData and each of its parents (depth first) to a list"""
300 if id(repoData)
in alreadyAdded:
303 alreadyAdded.add(
id(repoData))
304 for parent
in repoData.parentRepoDatas:
305 addToList(parent, lst)
308 raise RuntimeError(
"Lookup lists are already built.")
309 inputs = [repoData
for repoData
in self.
all()
if repoData.role ==
'input']
310 outputs = [repoData
for repoData
in self.
all()
if repoData.role ==
'output']
313 for repoData
in outputs:
314 if 'r' in repoData.repoArgs.mode:
315 addToList(repoData.repoData, self.
_inputs)
316 for repoData
in inputs:
317 addToList(repoData.repoData, self.
_inputs)
318 self.
_outputs = [repoData.repoData
for repoData
in outputs]
322 """Butler provides a generic mechanism for persisting and retrieving data using mappers.
324 A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
325 intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the
326 object containing the data. For example, an ExposureF object might be used to hold the data for a raw
327 image, a post-ISR image, a calibrated science image, or a difference image. These would all be different
330 A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if
331 given a partial data identifier. It can check for the existence of a file containing a dataset given its
332 type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
333 an appropriate location when given its associated data identifier.
335 Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is
336 lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved
337 and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not
338 using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
339 function, contained in the input mapper object, must perform any necessary manipulations to force the
340 retrieved object to conform to standards, including translating metadata.
344 __init__(self, root, mapper=None, **mapperArgs)
346 defineAlias(self, alias, datasetType)
348 getKeys(self, datasetType=None, level=None)
350 getDatasetTypes(self)
352 queryMetadata(self, datasetType, format=None, dataId={}, **rest)
354 datasetExists(self, datasetType, dataId={}, **rest)
356 get(self, datasetType, dataId={}, immediate=False, **rest)
358 put(self, obj, datasetType, dataId={}, **rest)
360 subset(self, datasetType, level=None, dataId={}, **rest)
362 dataRef(self, datasetType, level=None, dataId={}, **rest)
366 The preferred method of initialization is to use the `inputs` and `outputs` __init__ parameters. These
367 are described in the parameters section, below.
369 For backward compatibility: this initialization method signature can take a posix root path, and
370 optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
371 argument. However, for this to work in a backward compatible way it creates a single repository that is
372 used as both an input and an output repository. This is NOT preferred, and will likely break any
373 provenance system we have in place.
378 .. note:: Deprecated in 12_0
379 `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for
380 multiple-repository support.
381 A file system path. Will only work with a PosixRepository.
382 mapper : string or instance
383 .. note:: Deprecated in 12_0
384 `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for
385 multiple-repository support.
386 Provides a mapper to be used with Butler.
388 .. note:: Deprecated in 12_0
389 `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for
390 multiple-repository support.
391 Provides arguments to be passed to the mapper if the mapper input argument is a class type to be
392 instantiated by Butler.
393 inputs : RepositoryArgs, dict, or string
394 Can be a single item or a list. Provides arguments to load an existing repository (or repositories).
395 String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local
396 file system URI does not have to start with 'file://' and in this way can be a relative path). The
397 `RepositoryArgs` class can be used to provide more parameters with which to initialize a repository
398 (such as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation for more
399 details). A dict may be used as shorthand for a `RepositoryArgs` class instance. The dict keys must
400 match parameters to the `RepositoryArgs.__init__` function.
401 outputs : RepositoryArgs, dict, or string
402 Provides arguments to load one or more existing repositories or create new ones. The different types
403 are handled the same as for `inputs`.
405 The Butler init sequence loads all of the input and output repositories.
406 This creates the object hierarchy to read from and write to them. Each
407 repository can have 0 or more parents, which also get loaded as inputs.
408 This becomes a DAG of repositories. Ultimately, Butler creates a list of
409 these Repositories in the order that they are used.
411 Initialization Sequence
412 =======================
414 During initialization Butler creates a Repository class instance & support structure for each object
415 passed to `inputs` and `outputs` as well as the parent repositories recorded in the `RepositoryCfg` of
416 each existing readable repository.
418 This process is complex. It is explained below to shed some light on the intent of each step.
420 1. Input Argument Standardization
421 ---------------------------------
423 In `Butler._processInputArguments` the input arguments are verified to be legal (and a RuntimeError is
424 raised if not), and they are converted into an expected format that is used for the rest of the Butler
425 init sequence. See the docstring for `_processInputArguments`.
427 2. Create RepoData Objects
428 --------------------------
430 Butler uses an object, called `RepoData`, to keep track of information about each repository; each
431 repository is contained in a single `RepoData`. The attributes are explained in its docstring.
433 After `_processInputArguments`, a RepoData is instantiated and put in a list for each repository in
434 `outputs` and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output and input
435 repositories (but not parent repositories) that this Butler instance will use.
437 3. Get `RepositoryCfg`s
438 -----------------------
440 `Butler._getCfgs` gets the `RepositoryCfg` for each repository the `repoDataList`. The behavior is
441 described in the docstring.
446 `Butler._addParents` then considers the parents list in the `RepositoryCfg` of each `RepoData` in the
447 `repoDataList` and inserts new `RepoData` objects for each parent not represented in the proper location
448 in the `repoDataList`. Ultimately a flat list is built to represent the DAG of readable repositories
449 represented in depth-first order.
451 5. Set and Verify Parents of Outputs
452 ------------------------------------
454 To be able to load parent repositories when output repositories are used as inputs, the input repositories
455 are recorded as parents in the `RepositoryCfg` file of new output repositories. When an output repository
456 already exists, for consistency the Butler's inputs must match the list of parents specified the already-
457 existing output repository's `RepositoryCfg` file.
459 In `Butler._setAndVerifyParentsLists`, the list of parents is recorded in the `RepositoryCfg` of new
460 repositories. For existing repositories the list of parents is compared with the `RepositoryCfg`'s parents
461 list, and if they do not match a `RuntimeError` is raised.
463 6. Set the Default Mapper
464 -------------------------
466 If all the input repositories use the same mapper then we can assume that mapper to be the
467 "default mapper". If there are new output repositories whose `RepositoryArgs` do not specify a mapper and
468 there is a default mapper then the new output repository will be set to use that default mapper.
470 This is handled in `Butler._setDefaultMapper`.
472 7. Cache References to Parent RepoDatas
473 ---------------------------------------
475 In `Butler._connectParentRepoDatas`, in each `RepoData` in `repoDataList`, a list of `RepoData` object
476 references is built that matches the parents specified in that `RepoData`'s `RepositoryCfg`.
478 This list is used later to find things in that repository's parents, without considering peer repository's
479 parents. (e.g. finding the registry of a parent)
484 Tags are described at https://ldm-463.lsst.io/v/draft/#tagging
486 In `Butler._setRepoDataTags`, for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded
487 in a set, and added to the tags set in each of its parents, for ease of lookup when mapping.
489 9. Find Parent Registry and Instantiate RepoData
490 ------------------------------------------------
492 At this point there is enough information to instantiate the `Repository` instances. There is one final
493 step before instantiating the Repository, which is to try to get a parent registry that can be used by the
494 child repository. The criteria for "can be used" is spelled out in `Butler._setParentRegistry`. However,
495 to get the registry from the parent, the parent must be instantiated. The `repoDataList`, in depth-first
496 search order, is built so that the most-dependent repositories are first, and the least dependent
497 repositories are last. So the `repoDataList` is reversed and the Repositories are instantiated in that
498 order; for each RepoData a parent registry is searched for, and then the Repository is instantiated with
499 whatever registry could be found."""
502 """This is a Generation 2 Butler.
505 def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
506 self.
_initArgs = {
'root': root,
'mapper': mapper,
'inputs': inputs,
'outputs': outputs,
507 'mapperArgs': mapperArgs}
509 self.
log = Log.getLogger(
"daf.persistence.butler")
512 root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
515 inputs = [
RepoData(args,
'input')
for args
in inputs]
516 outputs = [
RepoData(args,
'output')
for args
in outputs]
517 repoDataList = outputs + inputs
533 for repoData
in repoDataList:
536 def _initRepo(self, repoData):
537 if repoData.repo
is not None:
541 for parentRepoData
in repoData.parentRepoDatas:
542 if parentRepoData.cfg.mapper != repoData.cfg.mapper:
544 if parentRepoData.repo
is None:
546 parentRegistry = parentRepoData.repo.getRegistry()
547 repoData.parentRegistry = parentRegistry
if parentRegistry
else parentRepoData.parentRegistry
548 if repoData.parentRegistry:
550 repoData.repo = Repository(repoData)
552 def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
553 """Process, verify, and standardize the input arguments.
554 * Inputs can not be for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs)
555 `root`, `mapper`, and `mapperArgs` are Old Butler init API.
556 `inputs` and `outputs` are New Butler init API.
557 Old Butler and New Butler init API may not be mixed, Butler may be initialized with only the Old
558 arguments or the New arguments.
559 * Verify that if there is a readable output that there is exactly one output. (This restriction is in
560 place because all readable repositories must be parents of writable repositories, and for
561 consistency the DAG of readable repositories must always be the same. Keeping the list of parents
562 becomes very complicated in the presence of multiple readable output repositories. It is better to
563 only write to output repositories, and then create a new Butler instance and use the outputs as
564 inputs, and write to new output repositories.)
565 * Make a copy of inputs & outputs so they may be modified without changing the passed-in arguments.
566 * Convert any input/output values that are URI strings to RepositoryArgs.
567 * Listify inputs & outputs.
568 * Set default RW mode on inputs & outputs as needed.
572 Same as Butler.__init__
576 (list of RepositoryArgs, list of RepositoryArgs)
577 First item is a list to use as inputs.
578 Second item is a list to use as outputs.
583 If Old Butler and New Butler arguments are both used this will raise.
584 If an output is readable there is more than one output this will raise.
587 inputs = copy.deepcopy(inputs)
588 outputs = copy.deepcopy(outputs)
590 isV1Args = inputs
is None and outputs
is None
594 mapperArgs=mapperArgs
or None)
595 elif root
or mapper
or mapperArgs:
597 'Butler version 1 API (root, mapper, **mapperArgs) may '
598 'not be used with version 2 API (inputs, outputs)')
606 inputs = [RepositoryArgs(cfgRoot=args)
607 if not isinstance(args, RepositoryArgs)
else args
for args
in inputs]
608 outputs = [RepositoryArgs(cfgRoot=args)
609 if not isinstance(args, RepositoryArgs)
else args
for args
in outputs]
613 if args.mode
is None:
615 elif 'rw' == args.mode:
617 elif 'r' != args.mode:
618 raise RuntimeError(
"The mode of an input should be readable.")
620 if args.mode
is None:
622 elif 'w' not in args.mode:
623 raise RuntimeError(
"The mode of an output should be writable.")
625 for args
in inputs + outputs:
626 if (args.mapper
and not isinstance(args.mapper, str)
627 and not inspect.isclass(args.mapper)):
628 self.
log.
warn(preinitedMapperWarning)
633 raise RuntimeError(
"Butler does not support multiple output repositories if any of the "
634 "outputs are readable.")
639 def inputIsInOutputs(inputArgs, outputArgsList):
640 for o
in outputArgsList:
642 and o.root == inputArgs.root
643 and o.mapper == inputArgs.mapper
644 and o.mapperArgs == inputArgs.mapperArgs
645 and o.tags == inputArgs.tags
646 and o.policy == inputArgs.policy):
647 self.
log.
debug((
"Input repositoryArgs {} is also listed in outputs as readable; "
648 "throwing away the input.").
format(inputArgs))
652 inputs = [args
for args
in inputs
if not inputIsInOutputs(args, outputs)]
653 return inputs, outputs
656 def _getParentVal(repoData):
657 """Get the value of this repoData as it should appear in the parents
658 list of other repositories"""
659 if repoData.isV1Repository:
661 if repoData.cfgOrigin ==
'nested':
664 return repoData.cfg.root
667 def _getParents(ofRepoData, repoInfo):
668 """Create a parents list of repoData from inputs and (readable) outputs."""
671 for repoData
in repoInfo:
672 if repoData
is ofRepoData:
674 if 'r' not in repoData.repoArgs.mode:
676 parents.append(Butler._getParentVal(repoData))
680 def _getOldButlerRepositoryCfg(repositoryArgs):
681 if not Storage.isPosix(repositoryArgs.cfgRoot):
683 if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
685 if not repositoryArgs.mapper:
686 repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
687 cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
688 parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
690 parent = Butler._getOldButlerRepositoryCfg(RepositoryArgs(cfgRoot=parent, mode=
'r'))
691 if parent
is not None:
692 cfg.addParents([parent])
695 def _getRepositoryCfg(self, repositoryArgs):
696 """Try to get a repository from the location described by cfgRoot.
700 repositoryArgs : RepositoryArgs or string
701 Provides arguments to load an existing repository (or repositories). String is assumed to be a URI
702 and is used as the cfgRoot (URI to the location of the cfg file).
706 (RepositoryCfg or None, bool)
707 The RepositoryCfg, or None if one cannot be found, and True if the RepositoryCfg was created by
708 reading an Old Butler repository, or False if it is a New Butler Repository.
710 if not isinstance(repositoryArgs, RepositoryArgs):
711 repositoryArgs = RepositoryArgs(cfgRoot=repositoryArgs, mode=
'r')
713 cfg = self.
storage.getRepositoryCfg(repositoryArgs.cfgRoot)
714 isOldButlerRepository =
False
716 cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
718 isOldButlerRepository =
True
719 return cfg, isOldButlerRepository
721 def _getCfgs(self, repoDataList):
722 """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData.
723 If the cfg exists, compare values. If values match then use the cfg as an "existing" cfg. If the
724 values do not match, use the cfg as a "nested" cfg.
725 If the cfg does not exist, the RepositoryArgs must be for a writable repository.
729 repoDataList : list of RepoData
730 The RepoData that are output and inputs of this Butler
735 If the passed-in RepositoryArgs indicate an existing repository but other cfg parameters in those
737 match the existing repository's cfg a RuntimeError will be raised.
739 def cfgMatchesArgs(args, cfg):
740 """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg"""
741 if args.mapper
is not None and cfg.mapper != args.mapper:
743 if args.mapperArgs
is not None and cfg.mapperArgs != args.mapperArgs:
745 if args.policy
is not None and cfg.policy != args.policy:
749 for repoData
in repoDataList:
752 if 'w' not in repoData.repoArgs.mode:
754 "No cfg found for read-only input repository at {}".
format(repoData.repoArgs.cfgRoot))
755 repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
757 root=repoData.repoArgs.cfgRoot,
758 isV1Repository=isOldButlerRepository)
768 for i, parent
in enumerate(cfg.parents):
769 if isinstance(parent, RepositoryCfg):
772 if parentIsOldButlerRepository:
773 parentCfg.mapperArgs = cfg.mapperArgs
774 self.
log.
info((
"Butler is replacing an Old Butler parent repository path '{}' "
775 "found in the parents list of a New Butler repositoryCfg: {} "
776 "with a repositoryCfg that includes the child repository's "
777 "mapperArgs: {}. This affects the instantiated RepositoryCfg "
778 "but does not change the persisted child repositoryCfg.yaml file."
779 ).
format(parent, cfg, parentCfg))
780 cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
782 if 'w' in repoData.repoArgs.mode:
784 if not cfgMatchesArgs(repoData.repoArgs, cfg):
785 raise RuntimeError((
"The RepositoryArgs and RepositoryCfg must match for writable "
786 "repositories, RepositoryCfg:{}, RepositoryArgs:{}").
format(
787 cfg, repoData.repoArgs))
788 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
789 isV1Repository=isOldButlerRepository)
792 if cfgMatchesArgs(repoData.repoArgs, cfg):
793 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
794 isV1Repository=isOldButlerRepository)
796 repoData.setCfg(cfg=cfg, origin=
'nested', root=
None,
797 isV1Repository=isOldButlerRepository)
799 def _addParents(self, repoDataList):
800 """For each repoData in the input list, see if its parents are the next items in the list, and if not
801 add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n.
805 repoDataList : list of RepoData
806 The RepoData for the Butler outputs + inputs.
811 Raised if a RepositoryCfg can not be found at a location where a parent repository should be.
815 if repoDataIdx == len(repoDataList):
817 repoData = repoDataList[repoDataIdx]
818 if 'r' not in repoData.repoArgs.mode:
821 if repoData.isNewRepository:
824 if repoData.cfg.parents
is None:
827 for repoParentIdx, repoParent
in enumerate(repoData.cfg.parents):
828 parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
829 if not isinstance(repoParent, RepositoryCfg):
831 if repoParentCfg
is not None:
832 cfgOrigin =
'existing'
834 isOldButlerRepository =
False
835 repoParentCfg = repoParent
837 if (parentIdxInRepoDataList < len(repoDataList)
838 and repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
840 args = RepositoryArgs(cfgRoot=repoParentCfg.root, mode=
'r')
841 role =
'input' if repoData.role ==
'output' else 'parent'
843 newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
844 isV1Repository=isOldButlerRepository)
845 repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
848 def _setAndVerifyParentsLists(self, repoDataList):
849 """Make a list of all the input repositories of this Butler, these are the parents of the outputs.
850 For new output repositories, set the parents in the RepositoryCfg. For existing output repositories
851 verify that the RepositoryCfg's parents match the parents list.
855 repoDataList : list of RepoData
856 All the RepoDatas loaded by this butler, in search order.
861 If an existing output repository is loaded and its parents do not match the parents of this Butler
862 an error will be raised.
864 def getIOParents(ofRepoData, repoDataList):
865 """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable
866 outputs (not parents-of-parents) of this butler"""
868 for repoData
in repoDataList:
869 if repoData.role ==
'parent':
871 if repoData
is ofRepoData:
873 if repoData.role ==
'output':
874 if 'r' in repoData.repoArgs.mode:
875 raise RuntimeError(
"If an output is readable it must be the only output.")
882 for repoData
in repoDataList:
883 if repoData.role !=
'output':
885 parents = getIOParents(repoData, repoDataList)
887 if repoData.cfgOrigin ==
'new':
888 repoData.cfg.addParents(parents)
889 elif repoData.cfgOrigin
in (
'existing',
'nested'):
890 if repoData.cfg.parents != parents:
892 repoData.cfg.extendParents(parents)
893 except ParentsMismatch
as e:
894 raise RuntimeError((
"Inputs of this Butler:{} do not match parents of existing "
895 "writable cfg:{} (ParentMismatch exception: {}").
format(
896 parents, repoData.cfg.parents, e))
898 def _setDefaultMapper(self, repoDataList):
899 """Establish a default mapper if there is one and assign it to outputs that do not have a mapper
902 If all inputs have the same mapper it will be used as the default mapper.
906 repoDataList : list of RepoData
907 All the RepoDatas loaded by this butler, in search order.
912 If a default mapper can not be established and there is an output that does not have a mapper.
914 needyOutputs = [rd
for rd
in repoDataList
if rd.role ==
'output' and rd.cfg.mapper
is None]
915 if len(needyOutputs) == 0:
917 mappers =
set([rd.cfg.mapper
for rd
in repoDataList
if rd.role ==
'input'])
918 if len(mappers) != 1:
919 inputs = [rd
for rd
in repoDataList
if rd.role ==
'input']
921 (
"No default mapper could be established from inputs:{} and no mapper specified "
922 "for outputs:{}").
format(inputs, needyOutputs))
923 defaultMapper = mappers.pop()
924 for repoData
in needyOutputs:
925 repoData.cfg.mapper = defaultMapper
927 def _connectParentRepoDatas(self, repoDataList):
928 """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it.
932 repoDataList : list of RepoData
933 All the RepoDatas loaded by this butler, in search order.
938 When a parent is listed in the parents list but not found in the repoDataList. This is not
939 expected to ever happen and would indicate an internal Butler error.
941 for repoData
in repoDataList:
942 for parent
in repoData.cfg.parents:
944 for otherRepoData
in repoDataList:
945 if isinstance(parent, RepositoryCfg):
946 if otherRepoData.repoData.repoData.cfg == parent:
947 parentToAdd = otherRepoData.repoData
949 elif otherRepoData.repoData.cfg.root == parent:
950 parentToAdd = otherRepoData.repoData
952 if parentToAdd
is None:
954 "Could not find a parent matching {} to add to {}".
format(parent, repoData))
955 repoData.addParentRepoData(parentToAdd)
958 def _getParentRepoData(parent, repoDataList):
959 """get a parent RepoData from a cfg from a list of RepoData
963 parent : string or RepositoryCfg
964 cfgRoot of a repo or a cfg that describes the repo
965 repoDataList : list of RepoData
971 A RepoData if one can be found, else None
974 for otherRepoData
in repoDataList:
975 if isinstance(parent, RepositoryCfg):
976 if otherRepoData.cfg == parent:
977 repoData = otherRepoData
979 elif otherRepoData.cfg.root == parent:
980 repoData = otherRepoData
984 def _setRepoDataTags(self):
985 """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
987 def setTags(repoData, tags, context):
988 if id(repoData)
in context:
990 repoData.addTags(tags)
991 context.add(
id(repoData))
992 for parentRepoData
in repoData.parentRepoDatas:
993 setTags(parentRepoData, tags, context)
994 for repoData
in self.
_repos.outputs() + self.
_repos.inputs():
995 setTags(repoData.repoData, repoData.repoArgs.tags,
set())
997 def _convertV1Args(self, root, mapper, mapperArgs):
998 """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs
1004 Posix path to repository root
1005 mapper : class, class instance, or string
1006 Instantiated class, a class object to be instantiated, or a string that refers to a class that
1007 can be imported & used as the mapper.
1009 RepositoryArgs & their values used when instantiating the mapper.
1014 (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__
1016 if (mapper
and not isinstance(mapper, str)
1017 and not inspect.isclass(mapper)):
1018 self.
log.
warn(preinitedMapperWarning)
1021 if hasattr(mapper,
'root'):
1027 outputs = RepositoryArgs(mode=
'rw',
1030 mapperArgs=mapperArgs)
1031 return inputs, outputs
1034 return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1037 def _getDefaultMapper(self):
1039 """Get the default mapper. Currently this means if all the repositories use exactly the same mapper,
1040 that mapper may be considered the default.
1042 This definition may be changing; mappers may be able to exclude themselves as candidates for default,
1043 and they may nominate a different mapper instead. Also, we may not want to look at *all* the
1044 repositories, but only a depth-first search on each of the input & output repositories, and use the
1045 first-found mapper for each of those. TBD.
1054 Mapper class or None
1055 Returns the class type of the default mapper, or None if a default
1056 mapper can not be determined.
1058 defaultMapper =
None
1060 for inputRepoData
in self.
_repos.inputs():
1062 if inputRepoData.cfg.mapper
is not None:
1063 mapper = inputRepoData.cfg.mapper
1068 if isinstance(mapper, str):
1070 elif not inspect.isclass(mapper):
1071 mapper = mapper.__class__
1077 if defaultMapper
is None:
1078 defaultMapper = mapper
1079 elif mapper == defaultMapper:
1081 elif mapper
is not None:
1083 return defaultMapper
1085 def _assignDefaultMapper(self, defaultMapper):
1086 for repoData
in self.
_repos.
all().values():
1087 if repoData.cfg.mapper
is None and (repoData.isNewRepository
or repoData.isV1Repository):
1088 if defaultMapper
is None:
1090 "No mapper specified for %s and no default mapper could be determined." %
1092 repoData.cfg.mapper = defaultMapper
1096 """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at
1097 that location or in a parent location.
1099 As we abstract the storage and support different types of storage locations this method will be
1100 moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
1101 return Storage.getMapperClass(root)
1104 """Register an alias that will be substituted in datasetTypes.
1109 The alias keyword. It may start with @ or not. It may not contain @ except as the first character.
1110 datasetType - string
1111 The string that will be substituted when @alias is passed into datasetType. It may not contain '@'
1115 atLoc = alias.rfind(
'@')
1117 alias =
"@" + str(alias)
1119 raise RuntimeError(
"Badly formatted alias string: %s" % (alias,))
1122 if datasetType.count(
'@') != 0:
1123 raise RuntimeError(
"Badly formatted type string: %s" % (datasetType))
1128 if key.startswith(alias)
or alias.startswith(key):
1129 raise RuntimeError(
"Alias: %s overlaps with existing alias: %s" % (alias, key))
1133 def getKeys(self, datasetType=None, level=None, tag=None):
1134 """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
1135 entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
1140 datasetType - string
1141 The type of dataset to get keys for, entire collection if None.
1143 The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the
1144 mapper should lookup the default level.
1145 tags - any, or list of any
1146 If tag is specified then the repo will only be used if the tag
1147 or a tag in the list matches a tag used for that repository.
1151 Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for
1152 the dataset type or the entire collection if None. The dict values are the basic Python types
1153 corresponding to the keys (int, float, string).
1159 for repoData
in self.
_repos.inputs():
1160 if not tag
or len(tag.intersection(repoData.tags)) > 0:
1161 keys = repoData.repo.getKeys(datasetType, level)
1164 if keys
is not None:
1169 """Get the valid dataset types for all known repos or those matching
1174 tag - any, or list of any
1175 If tag is specified then the repo will only be used if the tag
1176 or a tag in the list matches a tag used for that repository.
1180 Returns the dataset types as a set of strings.
1182 datasetTypes =
set()
1184 for repoData
in self.
_repos.outputs() + self.
_repos.inputs():
1185 if not tag
or len(tag.intersection(repoData.tags)) > 0:
1186 datasetTypes = datasetTypes.union(
1191 """Returns the valid values for one or more keys when given a partial
1192 input collection data id.
1196 datasetType - string
1197 The type of dataset to inquire about.
1199 Key or tuple of keys to be returned.
1200 dataId - DataId, dict
1201 The partial data id.
1203 Keyword arguments for the partial data id.
1207 A list of valid values or tuples of valid values as specified by the
1212 dataId = DataId(dataId)
1213 dataId.update(**rest)
1217 for repoData
in self.
_repos.inputs():
1218 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
1219 tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1226 if len(format) == 1:
1238 """Determines if a dataset file exists.
1242 datasetType - string
1243 The type of dataset to inquire about.
1244 dataId - DataId, dict
1245 The data id of the dataset.
1247 If True, look only in locations where the dataset could be written,
1248 and return True only if it is present in all of them.
1249 **rest keyword arguments for the data id.
1254 True if the dataset exists or is non-file-based.
1257 dataId = DataId(dataId)
1258 dataId.update(**rest)
1259 locations = self.
_locate(datasetType, dataId, write=write)
1261 if locations
is None:
1263 locations = [locations]
1268 for location
in locations:
1271 if isinstance(location, ButlerComposite):
1272 for name, componentInfo
in location.componentInfo.items():
1273 if componentInfo.subset:
1274 subset = self.
subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1275 exists =
all([obj.datasetExists()
for obj
in subset])
1277 exists = self.
datasetExists(componentInfo.datasetType, location.dataId)
1281 if not location.repository.exists(location):
1285 def _locate(self, datasetType, dataId, write):
1286 """Get one or more ButlerLocations and/or ButlercComposites.
1290 datasetType : string
1291 The datasetType that is being searched for. The datasetType may be followed by a dot and
1292 a component name (component names are specified in the policy). IE datasetType.componentName
1294 dataId : dict or DataId class instance
1298 True if this is a search to write an object. False if it is a search to read an object. This
1299 affects what type (an object or a container) is returned.
1303 If write is False, will return either a single object or None. If write is True, will return a list
1304 (which may be empty)
1306 repos = self.
_repos.outputs()
if write
else self.
_repos.inputs()
1308 for repoData
in repos:
1310 if not write
and dataId.tag
and len(dataId.tag.intersection(repoData.tags)) == 0:
1312 components = datasetType.split(
'.')
1313 datasetType = components[0]
1314 components = components[1:]
1316 location = repoData.repo.map(datasetType, dataId, write=write)
1319 if location
is None:
1321 location.datasetType = datasetType
1322 if len(components) > 0:
1323 if not isinstance(location, ButlerComposite):
1324 raise RuntimeError(
"The location for a dotted datasetType must be a composite.")
1326 components[0] = location.componentInfo[components[0]].datasetType
1328 datasetType =
'.'.join(components)
1329 location = self.
_locate(datasetType, dataId, write)
1331 if location
is None:
1342 if hasattr(location.mapper,
"bypass_" + location.datasetType):
1346 location.bypass = bypass
1347 except (NoResults, IOError):
1348 self.
log.
debug(
"Continuing dataset search while evaluating "
1349 "bypass function for Dataset type:{} Data ID:{} at "
1350 "location {}".
format(datasetType, dataId, location))
1354 if (isinstance(location, ButlerComposite)
or hasattr(location,
'bypass')
1355 or location.repository.exists(location)):
1359 locations.extend(location)
1361 locations.append(location)
1367 def _getBypassFunc(location, dataId):
1368 pythonType = location.getPythonType()
1369 if pythonType
is not None:
1370 if isinstance(pythonType, str):
1372 bypassFunc = getattr(location.mapper,
"bypass_" + location.datasetType)
1373 return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1375 def get(self, datasetType, dataId=None, immediate=True, **rest):
1376 """Retrieves a dataset given an input collection data id.
1380 datasetType - string
1381 The type of dataset to retrieve.
1385 If False use a proxy for delayed loading.
1387 keyword arguments for the data id.
1391 An object retrieved from the dataset (or a proxy for one).
1394 dataId = DataId(dataId)
1395 dataId.update(**rest)
1397 location = self.
_locate(datasetType, dataId, write=
False)
1398 if location
is None:
1399 raise NoResults(
"No locations for get:", datasetType, dataId)
1400 self.
log.
debug(
"Get type=%s keys=%s from %s", datasetType, dataId, str(location))
1402 if hasattr(location,
'bypass'):
1405 return location.bypass
1408 return self.
_read(location)
1409 if location.mapper.canStandardize(location.datasetType):
1410 innerCallback = callback
1413 return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1416 return ReadProxy(callback)
1418 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1419 """Persists a dataset given an output collection data id.
1424 The object to persist.
1425 datasetType - string
1426 The type of dataset to persist.
1430 If True, rename existing instead of overwriting.
1431 WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race
1434 Keyword arguments for the data id.
1437 dataId = DataId(dataId)
1438 dataId.update(**rest)
1440 locations = self.
_locate(datasetType, dataId, write=
True)
1442 raise NoResults(
"No locations for put:", datasetType, dataId)
1443 for location
in locations:
1444 if isinstance(location, ButlerComposite):
1445 disassembler = location.disassembler
if location.disassembler
else genericDisassembler
1446 disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1447 for name, info
in location.componentInfo.items():
1448 if not info.inputOnly:
1449 self.
put(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1452 location.getRepository().backup(location.datasetType, dataId)
1453 location.getRepository().
write(location, obj)
1455 def subset(self, datasetType, level=None, dataId={}, **rest):
1456 """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
1458 Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the
1459 dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or
1460 sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs.
1461 Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists().
1465 datasetType - string
1466 The type of dataset collection to subset
1468 The level of dataId at which to subset. Use an empty string if the mapper should look up the
1473 Keyword arguments for the data id.
1477 subset - ButlerSubset
1478 Collection of ButlerDataRefs for datasets matching the data id.
1482 To print the full dataIds for all r-band measurements in a source catalog
1483 (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`):
1485 >>> subset = butler.subset('src', filter='r')
1486 >>> for data_ref in subset: print(data_ref.dataId)
1496 dataId = DataId(dataId)
1497 dataId.update(**rest)
1498 return ButlerSubset(self, datasetType, level, dataId)
1500 def dataRef(self, datasetType, level=None, dataId={}, **rest):
1501 """Returns a single ButlerDataRef.
1503 Given a complete dataId specified in dataId and **rest, find the unique dataset at the given level
1504 specified by a dataId key (e.g. visit or sensor or amp for a camera) and return a ButlerDataRef.
1508 datasetType - string
1509 The type of dataset collection to reference
1511 The level of dataId at which to reference
1515 Keyword arguments for the data id.
1519 dataRef - ButlerDataRef
1520 ButlerDataRef for dataset matching the data id
1524 dataId = DataId(dataId)
1525 subset = self.
subset(datasetType, level, dataId, **rest)
1526 if len(subset) != 1:
1527 raise RuntimeError(
"No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
1528 (str(datasetType), str(level), str(dataId), str(rest)))
1529 return ButlerDataRef(subset, subset.cache[0])
1531 def getUri(self, datasetType, dataId=None, write=False, **rest):
1532 """Return the URI for a dataset
1534 .. warning:: This is intended only for debugging. The URI should
1535 never be used for anything other than printing.
1537 .. note:: In the event there are multiple URIs for read, we return only
1540 .. note:: getUri() does not currently support composite datasets.
1545 The dataset type of interest.
1546 dataId : `dict`, optional
1547 The data identifier.
1548 write : `bool`, optional
1549 Return the URI for writing?
1550 rest : `dict`, optional
1551 Keyword arguments for the data id.
1559 dataId = DataId(dataId)
1560 dataId.update(**rest)
1561 locations = self.
_locate(datasetType, dataId, write=write)
1562 if locations
is None:
1563 raise NoResults(
"No locations for getUri: ", datasetType, dataId)
1568 for location
in locations:
1569 if isinstance(location, ButlerComposite):
1570 for name, info
in location.componentInfo.items():
1571 if not info.inputOnly:
1572 return self.
getUri(info.datasetType, location.dataId, write=
True)
1574 return location.getLocationsWithRoot()[0]
1576 raise NoResults(
"No locations for getUri(write=True): ", datasetType, dataId)
1579 return locations.getLocationsWithRoot()[0]
1581 def _read(self, location):
1582 """Unpersist an object using data inside a ButlerLocation or ButlerComposite object.
1586 location : ButlerLocation or ButlerComposite
1587 A ButlerLocation or ButlerComposite instance populated with data needed to read the object.
1592 An instance of the object specified by the location.
1594 self.
log.
debug(
"Starting read from %s", location)
1596 if isinstance(location, ButlerComposite):
1597 for name, componentInfo
in location.componentInfo.items():
1598 if componentInfo.subset:
1599 subset = self.
subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1600 componentInfo.obj = [obj.get()
for obj
in subset]
1602 obj = self.
get(componentInfo.datasetType, location.dataId, immediate=
True)
1603 componentInfo.obj = obj
1604 assembler = location.assembler
or genericAssembler
1605 results = assembler(dataId=location.dataId, componentInfo=location.componentInfo,
1606 cls=location.python)
1609 results = location.repository.read(location)
1610 if len(results) == 1:
1611 results = results[0]
1612 self.
log.
debug(
"Ending read from %s", location)
1619 def _resolveDatasetTypeAlias(self, datasetType):
1620 """Replaces all the known alias keywords in the given string with the alias value.
1624 datasetType - string
1625 A datasetType string to search & replace on
1629 datasetType - string
1630 The de-aliased string
1634 if datasetType.find(
'@') == -1:
1639 if datasetType.find(
'@') != -1:
1640 raise RuntimeError(
"Unresolvable alias specifier in datasetType: %s" % (datasetType))
1645 def _unreduce(initArgs, datasetTypeAliasDict):
1646 mapperArgs = initArgs.pop(
'mapperArgs')
1647 initArgs.update(mapperArgs)
1648 butler =
Butler(**initArgs)
1649 butler.datasetTypeAliasDict = datasetTypeAliasDict