27"""This module defines the Butler class."""
34from .deprecation
import deprecate_class
35from .
import ReadProxy, ButlerSubset, ButlerDataRef, \
36 Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
37 RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
38 genericDisassembler, PosixStorage, ParentsMismatch
40preinitedMapperWarning = (
"Passing an instantiated mapper into "
41 "Butler.__init__ will prevent Butler from passing "
42 "parentRegistry or repositoryCfg information to "
43 "the mapper, which is done only at init time. "
44 "It is better to pass a importable string or "
49 """Represents a Butler configuration.
53 cfg is 'wet paint' and very likely to change. Use of it
in production
54 code other than via the
'old butler' API
is strongly discouraged.
56 yaml_tag = u"!ButlerCfg"
59 super().
__init__({
'repoCfg': repoCfg,
'cls': cls})
63 """Container object for repository data used by Butler
68 The arguments that are used to find or create the RepositoryCfg.
70 "input",
"output",
or "parent", indicating why Butler loaded this repository.
71 * input: the Repository was passed
as a Butler input.
72 * output: the Repository was passed
as a Butler output.
73 * parent: the Repository was specified
in the RepositoryCfg parents list of a readable repository.
78 The configuration
for the Repository.
81 "new",
"existing",
or "nested". Indicates the origin of the repository
and its RepositoryCfg:
82 * new: it was created by this instance of Butler,
and this instance of Butler will generate the
84 * existing: it was found (via the root
or cfgRoot argument)
85 * nested: the full RepositoryCfg was nested
in another RepositoryCfg
's parents list (this can happen
86 if parameters of an input specified by RepositoryArgs
or dict does
not entirely match an existing
90 Path
or URI to the location of the RepositoryCfg file.
93 The Repository
class instance.
95 parentRepoDatas : list of RepoData
96 The parents of this Repository,
as indicated this Repository
's RepositoryCfg. If this is a new
97 Repository then these are the inputs to this Butler (and will be saved
in the RepositoryCfg). These
98 RepoData objects are
not owned by this RepoData, these are references to peer RepoData objects
in the
99 Butler
's RepoDataContainer.
101 isV1Repository : bool
102 True if this
is an Old Butler repository. In this case the repository does
not have a RepositoryCfg
103 file. It may have a _mapper file
and may have a _parent symlink. It will never be treated
as a
"new"
104 repository, i.e. even though there
is not a RepositoryCfg file, one will
not be generated.
105 If
False, this
is a New Butler repository
and is specified by RepositoryCfg file.
108 These are values that may be used to restrict the search of input repositories. Details are available
109 in the RepositoryArgs
and DataId classes.
112 "input",
"output",
or "parent", indicating why Butler loaded this repository.
113 * input: the Repository was passed
as a Butler input.
114 * output: the Repository was passed
as a Butler output.
115 * parent: the Repository was specified
in the RepositoryCfg parents list of a readable repository.
117 _repoArgs : RepositoryArgs
118 Contains the arguments that were used to specify this Repository.
148 "parentRepoDatas={},"
151 "parentRegistry={})").
format(
152 self.__class__.__name__,
164 def setCfg(self, cfg, origin, root, isV1Repository):
165 """Set information about the cfg into the RepoData
170 The RepositoryCfg for the repo.
172 'new',
'existing',
or 'nested'
174 URI
or absolute path to the location of the RepositoryCfg.yaml file.
180 if origin
not in (
'new',
'existing',
'nested'):
181 raise RuntimeError(
"Invalid value for origin:{}".
format(origin))
197 return self.
_role_role
201 if val
not in (
'input',
'output',
'parent'):
202 raise RuntimeError(
"Invalid value for role: {}".
format(val))
203 self.
_role_role = val
206 """Get the parents & grandparents etc of this repo data, in depth-first search order.
208 Duplicate entries will be removed in cases where the same parent appears more than once
in the parent
213 context : set, optional
214 Users should typically omit context
and accept the default argument. Context
is used to keep a set
215 of known RepoDatas when calling this function recursively,
for duplicate elimination.
220 A list of the parents & grandparents etc of a given repo data,
in depth-first search order.
225 if id(self)
in context:
227 context.add(
id(self))
229 parents.append(parent)
230 parents += parent.getParentRepoDatas(context)
237 self.
tagstags = self.
tagstags.union(tags)
241 """Container object for RepoData instances owned by a Butler instance.
245 repoDataList : list of RepoData
246 repoData - RepoData instance to add
252 self.
_all_all = repoDataList
256 """Get a list of RepoData that are used to as inputs to the Butler.
257 The list is created lazily
as needed,
and cached.
261 A list of RepoData
with readable repositories,
in the order to be used when searching.
263 if self.
_inputs_inputs
is None:
264 raise RuntimeError(
"Inputs not yet initialized.")
268 """Get a list of RepoData that are used to as outputs to the Butler.
269 The list is created lazily
as needed,
and cached.
273 A list of RepoData
with writable repositories,
in the order to be use when searching.
276 raise RuntimeError(
"Outputs not yet initialized.")
280 """Get a list of all RepoData that are used to as by the Butler.
281 The list is created lazily
as needed,
and cached.
285 A list of RepoData
with writable repositories,
in the order to be use when searching.
290 return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
291 self.__class__.__name__,
296 def _buildLookupLists(self):
297 """Build the inputs and outputs lists based on the order of self.all()."""
299 def addToList(repoData, lst):
300 """Add a repoData and each of its parents (depth first) to a list"""
301 if id(repoData)
in alreadyAdded:
304 alreadyAdded.add(
id(repoData))
305 for parent
in repoData.parentRepoDatas:
306 addToList(parent, lst)
308 if self.
_inputs_inputs
is not None or self.
_outputs_outputs
is not None:
309 raise RuntimeError(
"Lookup lists are already built.")
310 inputs = [repoData
for repoData
in self.
allall()
if repoData.role ==
'input']
311 outputs = [repoData
for repoData
in self.
allall()
if repoData.role ==
'output']
314 for repoData
in outputs:
315 if 'r' in repoData.repoArgs.mode:
316 addToList(repoData.repoData, self.
_inputs_inputs)
317 for repoData
in inputs:
318 addToList(repoData.repoData, self.
_inputs_inputs)
319 self.
_outputs_outputs = [repoData.repoData
for repoData
in outputs]
324 """Butler provides a generic mechanism for persisting and retrieving data using mappers.
326 A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
327 intended usage
and a location. Note that the dataset type
is not the same
as the C++
or Python type of the
328 object containing the data. For example, an ExposureF object might be used to hold the data
for a raw
329 image, a post-ISR image, a calibrated science image,
or a difference image. These would all be different
332 A Butler can produce a collection of possible values
for a key (
or tuples of values
for multiple keys)
if
333 given a partial data identifier. It can check
for the existence of a file containing a dataset given its
334 type
and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
335 an appropriate location when given its associated data identifier.
337 Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval
is
338 lazy. Input does
not occur until the data set
is actually accessed. This allows datasets to be retrieved
339 and placed on a clipboard prospectively
with little cost, even
if the algorithm of a stage ends up
not
340 using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
341 function, contained
in the input mapper object, must perform any necessary manipulations to force the
342 retrieved object to conform to standards, including translating metadata.
346 __init__(self, root, mapper=
None, **mapperArgs)
350 getKeys(self, datasetType=
None, level=
None)
354 queryMetadata(self, datasetType, format=
None, dataId={}, **rest)
358 get(self, datasetType, dataId={}, immediate=
False, **rest)
360 put(self, obj, datasetType, dataId={}, **rest)
362 subset(self, datasetType, level=
None, dataId={}, **rest)
364 dataRef(self, datasetType, level=
None, dataId={}, **rest)
368 The preferred method of initialization
is to use the `inputs`
and `outputs` __init__ parameters. These
369 are described
in the parameters section, below.
371 For backward compatibility: this initialization method signature can take a posix root path,
and
372 optionally a mapper
class instance or class
type that will be instantiated using the mapperArgs input
373 argument. However,
for this to work
in a backward compatible way it creates a single repository that
is
374 used
as both an input
and an output repository. This
is NOT preferred,
and will likely
break any
375 provenance system we have
in place.
380 .. note:: Deprecated
in 12_0
381 `root` will be removed
in TBD, it
is replaced by `inputs`
and `outputs`
for
382 multiple-repository support.
383 A file system path. Will only work
with a PosixRepository.
384 mapper : string
or instance
385 .. note:: Deprecated
in 12_0
386 `mapper` will be removed
in TBD, it
is replaced by `inputs`
and `outputs`
for
387 multiple-repository support.
388 Provides a mapper to be used
with Butler.
390 .. note:: Deprecated
in 12_0
391 `mapperArgs` will be removed
in TBD, it
is replaced by `inputs`
and `outputs`
for
392 multiple-repository support.
393 Provides arguments to be passed to the mapper
if the mapper input argument
is a
class type to be
394 instantiated by Butler.
395 inputs : RepositoryArgs, dict,
or string
396 Can be a single item
or a list. Provides arguments to load an existing repository (
or repositories).
397 String
is assumed to be a URI
and is used
as the cfgRoot (URI to the location of the cfg file). (Local
398 file system URI does
not have to start
with 'file://' and in this way can be a relative path). The
399 `RepositoryArgs`
class can be used
to provide more parameters with which
to initialize
a repository
400 (such
as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation
for more
401 details). A dict may be used
as shorthand
for a `RepositoryArgs`
class instance. The dict keys must
402 match parameters to the `RepositoryArgs.__init__` function.
403 outputs : RepositoryArgs, dict,
or string
404 Provides arguments to load one
or more existing repositories
or create new ones. The different types
405 are handled the same
as for `inputs`.
407 The Butler init sequence loads all of the input
and output repositories.
408 This creates the object hierarchy to read
from and write to them. Each
409 repository can have 0
or more parents, which also get loaded
as inputs.
410 This becomes a DAG of repositories. Ultimately, Butler creates a list of
411 these Repositories
in the order that they are used.
413 Initialization Sequence
414 =======================
416 During initialization Butler creates a Repository
class instance & support structure
for each object
417 passed to `inputs`
and `outputs`
as well
as the parent repositories recorded
in the `RepositoryCfg` of
418 each existing readable repository.
420 This process
is complex. It
is explained below to shed some light on the intent of each step.
422 1. Input Argument Standardization
423 ---------------------------------
425 In `Butler._processInputArguments` the input arguments are verified to be legal (
and a RuntimeError
is
426 raised
if not),
and they are converted into an expected format that
is used
for the rest of the Butler
427 init sequence. See the docstring
for `_processInputArguments`.
429 2. Create RepoData Objects
430 --------------------------
432 Butler uses an object, called `RepoData`, to keep track of information about each repository; each
433 repository
is contained
in a single `RepoData`. The attributes are explained
in its docstring.
435 After `_processInputArguments`, a RepoData
is instantiated
and put
in a list
for each repository
in
436 `outputs`
and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output
and input
437 repositories (but
not parent repositories) that this Butler instance will use.
439 3. Get `RepositoryCfg`s
440 -----------------------
442 `Butler._getCfgs` gets the `RepositoryCfg`
for each repository the `repoDataList`. The behavior
is
443 described
in the docstring.
448 `Butler._addParents` then considers the parents list
in the `RepositoryCfg` of each `RepoData`
in the
449 `repoDataList`
and inserts new `RepoData` objects
for each parent
not represented
in the proper location
450 in the `repoDataList`. Ultimately a flat list
is built to represent the DAG of readable repositories
451 represented
in depth-first order.
453 5. Set
and Verify Parents of Outputs
454 ------------------------------------
456 To be able to load parent repositories when output repositories are used
as inputs, the input repositories
457 are recorded
as parents
in the `RepositoryCfg` file of new output repositories. When an output repository
458 already exists,
for consistency the Butler
's inputs must match the list of parents specified the already-
459 existing output repository's `RepositoryCfg` file.
461 In `Butler._setAndVerifyParentsLists`, the list of parents is recorded
in the `RepositoryCfg` of new
462 repositories. For existing repositories the list of parents
is compared
with the `RepositoryCfg`
's parents
463 list, and if they do
not match a `RuntimeError`
is raised.
465 6. Set the Default Mapper
466 -------------------------
468 If all the input repositories use the same mapper then we can assume that mapper to be the
469 "default mapper". If there are new output repositories whose `RepositoryArgs` do
not specify a mapper
and
470 there
is a default mapper then the new output repository will be set to use that default mapper.
472 This
is handled
in `Butler._setDefaultMapper`.
474 7. Cache References to Parent RepoDatas
475 ---------------------------------------
477 In `Butler._connectParentRepoDatas`,
in each `RepoData`
in `repoDataList`, a list of `RepoData` object
478 references
is built that matches the parents specified
in that `RepoData`
's `RepositoryCfg`.
480 This list is used later to find things
in that repository
's parents, without considering peer repository's
481 parents. (e.g. finding the registry of a parent)
486 Tags are described at https://ldm-463.lsst.io/v/draft/
488 In `Butler._setRepoDataTags`,
for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded
489 in a set,
and added to the tags set
in each of its parents,
for ease of lookup when mapping.
491 9. Find Parent Registry
and Instantiate RepoData
492 ------------------------------------------------
494 At this point there
is enough information to instantiate the `Repository` instances. There
is one final
495 step before instantiating the Repository, which
is to
try to get a parent registry that can be used by the
496 child repository. The criteria
for "can be used" is spelled out
in `Butler._setParentRegistry`. However,
497 to get the registry
from the parent, the parent must be instantiated. The `repoDataList`,
in depth-first
498 search order,
is built so that the most-dependent repositories are first,
and the least dependent
499 repositories are last. So the `repoDataList`
is reversed
and the Repositories are instantiated
in that
500 order;
for each RepoData a parent registry
is searched
for,
and then the Repository
is instantiated
with
501 whatever registry could be found.
"""
504 """This is a Generation 2 Butler.
507 def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
508 self.
_initArgs_initArgs = {
'root': root,
'mapper': mapper,
'inputs': inputs,
'outputs': outputs,
509 'mapperArgs': mapperArgs}
511 self.
loglog = Log.getLogger(
"daf.persistence.butler")
514 root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
517 inputs = [
RepoData(args,
'input')
for args
in inputs]
518 outputs = [
RepoData(args,
'output')
for args
in outputs]
519 repoDataList = outputs + inputs
535 for repoData
in repoDataList:
538 def _initRepo(self, repoData):
539 if repoData.repo
is not None:
543 for parentRepoData
in repoData.parentRepoDatas:
544 if parentRepoData.cfg.mapper != repoData.cfg.mapper:
546 if parentRepoData.repo
is None:
548 parentRegistry = parentRepoData.repo.getRegistry()
549 repoData.parentRegistry = parentRegistry
if parentRegistry
else parentRepoData.parentRegistry
550 if repoData.parentRegistry:
552 repoData.repo = Repository(repoData)
554 def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
555 """Process, verify, and standardize the input arguments.
556 * Inputs can not be
for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs)
557 `root`, `mapper`,
and `mapperArgs` are Old Butler init API.
558 `inputs`
and `outputs` are New Butler init API.
559 Old Butler
and New Butler init API may
not be mixed, Butler may be initialized
with only the Old
560 arguments
or the New arguments.
561 * Verify that
if there
is a readable output that there
is exactly one output. (This restriction
is in
562 place because all readable repositories must be parents of writable repositories,
and for
563 consistency the DAG of readable repositories must always be the same. Keeping the list of parents
564 becomes very complicated
in the presence of multiple readable output repositories. It
is better to
565 only write to output repositories,
and then create a new Butler instance
and use the outputs
as
566 inputs,
and write to new output repositories.)
567 * Make a copy of inputs & outputs so they may be modified without changing the passed-
in arguments.
568 * Convert any input/output values that are URI strings to RepositoryArgs.
569 * Listify inputs & outputs.
570 * Set default RW mode on inputs & outputs
as needed.
574 Same
as Butler.__init__
578 (list of RepositoryArgs, list of RepositoryArgs)
579 First item
is a list to use
as inputs.
580 Second item
is a list to use
as outputs.
585 If Old Butler
and New Butler arguments are both used this will
raise.
586 If an output
is readable there
is more than one output this will
raise.
589 inputs = copy.deepcopy(inputs)
590 outputs = copy.deepcopy(outputs)
592 isV1Args = inputs
is None and outputs
is None
596 mapperArgs=mapperArgs
or None)
597 elif root
or mapper
or mapperArgs:
599 'Butler version 1 API (root, mapper, **mapperArgs) may '
600 'not be used with version 2 API (inputs, outputs)')
608 inputs = [RepositoryArgs(cfgRoot=args)
609 if not isinstance(args, RepositoryArgs)
else args
for args
in inputs]
610 outputs = [RepositoryArgs(cfgRoot=args)
611 if not isinstance(args, RepositoryArgs)
else args
for args
in outputs]
615 if args.mode
is None:
617 elif 'rw' == args.mode:
619 elif 'r' != args.mode:
620 raise RuntimeError(
"The mode of an input should be readable.")
622 if args.mode
is None:
624 elif 'w' not in args.mode:
625 raise RuntimeError(
"The mode of an output should be writable.")
627 for args
in inputs + outputs:
628 if (args.mapper
and not isinstance(args.mapper, str)
629 and not inspect.isclass(args.mapper)):
630 self.
loglog.
warn(preinitedMapperWarning)
635 raise RuntimeError(
"Butler does not support multiple output repositories if any of the "
636 "outputs are readable.")
641 def inputIsInOutputs(inputArgs, outputArgsList):
642 for o
in outputArgsList:
644 and o.root == inputArgs.root
645 and o.mapper == inputArgs.mapper
646 and o.mapperArgs == inputArgs.mapperArgs
647 and o.tags == inputArgs.tags
648 and o.policy == inputArgs.policy):
649 self.
loglog.
debug((
"Input repositoryArgs {} is also listed in outputs as readable; "
650 "throwing away the input.").
format(inputArgs))
654 inputs = [args
for args
in inputs
if not inputIsInOutputs(args, outputs)]
655 return inputs, outputs
658 def _getParentVal(repoData):
659 """Get the value of this repoData as it should appear in the parents
660 list of other repositories"""
661 if repoData.isV1Repository:
663 if repoData.cfgOrigin ==
'nested':
666 return repoData.cfg.root
669 def _getParents(ofRepoData, repoInfo):
670 """Create a parents list of repoData from inputs and (readable) outputs."""
673 for repoData
in repoInfo:
674 if repoData
is ofRepoData:
676 if 'r' not in repoData.repoArgs.mode:
678 parents.append(Butler._getParentVal(repoData))
682 def _getOldButlerRepositoryCfg(repositoryArgs):
683 if not Storage.isPosix(repositoryArgs.cfgRoot):
685 if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
687 if not repositoryArgs.mapper:
688 repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
689 cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
690 parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
692 parent = Butler._getOldButlerRepositoryCfg(RepositoryArgs(cfgRoot=parent, mode=
'r'))
693 if parent
is not None:
694 cfg.addParents([parent])
697 def _getRepositoryCfg(self, repositoryArgs):
698 """Try to get a repository from the location described by cfgRoot.
702 repositoryArgs : RepositoryArgs or string
703 Provides arguments to load an existing repository (
or repositories). String
is assumed to be a URI
704 and is used
as the cfgRoot (URI to the location of the cfg file).
708 (RepositoryCfg
or None, bool)
709 The RepositoryCfg,
or None if one cannot be found,
and True if the RepositoryCfg was created by
710 reading an Old Butler repository,
or False if it
is a New Butler Repository.
712 if not isinstance(repositoryArgs, RepositoryArgs):
713 repositoryArgs = RepositoryArgs(cfgRoot=repositoryArgs, mode=
'r')
715 cfg = self.
storagestorage.getRepositoryCfg(repositoryArgs.cfgRoot)
716 isOldButlerRepository =
False
718 cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
720 isOldButlerRepository =
True
721 return cfg, isOldButlerRepository
723 def _getCfgs(self, repoDataList):
724 """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData.
725 If the cfg exists, compare values. If values match then use the cfg as an
"existing" cfg. If the
726 values do
not match, use the cfg
as a
"nested" cfg.
727 If the cfg does
not exist, the RepositoryArgs must be
for a writable repository.
731 repoDataList : list of RepoData
732 The RepoData that are output
and inputs of this Butler
737 If the passed-
in RepositoryArgs indicate an existing repository but other cfg parameters
in those
739 match the existing repository's cfg a RuntimeError will be raised.
741 def cfgMatchesArgs(args, cfg):
742 """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg"""
743 if args.mapper
is not None and cfg.mapper != args.mapper:
745 if args.mapperArgs
is not None and cfg.mapperArgs != args.mapperArgs:
747 if args.policy
is not None and cfg.policy != args.policy:
751 for repoData
in repoDataList:
752 cfg, isOldButlerRepository = self.
_getRepositoryCfg_getRepositoryCfg(repoData.repoArgs)
754 if 'w' not in repoData.repoArgs.mode:
756 "No cfg found for read-only input repository at {}".
format(repoData.repoArgs.cfgRoot))
757 repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
759 root=repoData.repoArgs.cfgRoot,
760 isV1Repository=isOldButlerRepository)
770 for i, parent
in enumerate(cfg.parents):
771 if isinstance(parent, RepositoryCfg):
773 parentCfg, parentIsOldButlerRepository = self.
_getRepositoryCfg_getRepositoryCfg(parent)
774 if parentIsOldButlerRepository:
775 parentCfg.mapperArgs = cfg.mapperArgs
776 self.
loglog.
info((
"Butler is replacing an Old Butler parent repository path '{}' "
777 "found in the parents list of a New Butler repositoryCfg: {} "
778 "with a repositoryCfg that includes the child repository's "
779 "mapperArgs: {}. This affects the instantiated RepositoryCfg "
780 "but does not change the persisted child repositoryCfg.yaml file."
781 ).
format(parent, cfg, parentCfg))
782 cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
784 if 'w' in repoData.repoArgs.mode:
786 if not cfgMatchesArgs(repoData.repoArgs, cfg):
787 raise RuntimeError((
"The RepositoryArgs and RepositoryCfg must match for writable "
788 "repositories, RepositoryCfg:{}, RepositoryArgs:{}").
format(
789 cfg, repoData.repoArgs))
790 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
791 isV1Repository=isOldButlerRepository)
794 if cfgMatchesArgs(repoData.repoArgs, cfg):
795 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
796 isV1Repository=isOldButlerRepository)
798 repoData.setCfg(cfg=cfg, origin=
'nested', root=
None,
799 isV1Repository=isOldButlerRepository)
801 def _addParents(self, repoDataList):
802 """For each repoData in the input list, see if its parents are the next items in the list, and if not
803 add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n.
807 repoDataList : list of RepoData
808 The RepoData
for the Butler outputs + inputs.
813 Raised
if a RepositoryCfg can
not be found at a location where a parent repository should be.
817 if repoDataIdx == len(repoDataList):
819 repoData = repoDataList[repoDataIdx]
820 if 'r' not in repoData.repoArgs.mode:
823 if repoData.isNewRepository:
826 if repoData.cfg.parents
is None:
829 for repoParentIdx, repoParent
in enumerate(repoData.cfg.parents):
830 parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
831 if not isinstance(repoParent, RepositoryCfg):
832 repoParentCfg, isOldButlerRepository = self.
_getRepositoryCfg_getRepositoryCfg(repoParent)
833 if repoParentCfg
is not None:
834 cfgOrigin =
'existing'
836 isOldButlerRepository =
False
837 repoParentCfg = repoParent
839 if (parentIdxInRepoDataList < len(repoDataList)
840 and repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
842 args = RepositoryArgs(cfgRoot=repoParentCfg.root, mode=
'r')
843 role =
'input' if repoData.role ==
'output' else 'parent'
845 newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
846 isV1Repository=isOldButlerRepository)
847 repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
850 def _setAndVerifyParentsLists(self, repoDataList):
851 """Make a list of all the input repositories of this Butler, these are the parents of the outputs.
852 For new output repositories, set the parents in the RepositoryCfg. For existing output repositories
853 verify that the RepositoryCfg
's parents match the parents list.
857 repoDataList : list of RepoData
858 All the RepoDatas loaded by this butler, in search order.
863 If an existing output repository
is loaded
and its parents do
not match the parents of this Butler
864 an error will be raised.
866 def getIOParents(ofRepoData, repoDataList):
867 """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable
868 outputs (not parents-of-parents) of this butler
"""
870 for repoData
in repoDataList:
871 if repoData.role ==
'parent':
873 if repoData
is ofRepoData:
875 if repoData.role ==
'output':
876 if 'r' in repoData.repoArgs.mode:
877 raise RuntimeError(
"If an output is readable it must be the only output.")
884 for repoData
in repoDataList:
885 if repoData.role !=
'output':
887 parents = getIOParents(repoData, repoDataList)
889 if repoData.cfgOrigin ==
'new':
890 repoData.cfg.addParents(parents)
891 elif repoData.cfgOrigin
in (
'existing',
'nested'):
892 if repoData.cfg.parents != parents:
894 repoData.cfg.extendParents(parents)
895 except ParentsMismatch
as e:
896 raise RuntimeError((
"Inputs of this Butler:{} do not match parents of existing "
897 "writable cfg:{} (ParentMismatch exception: {}").
format(
898 parents, repoData.cfg.parents, e))
900 def _setDefaultMapper(self, repoDataList):
901 """Establish a default mapper if there is one and assign it to outputs that do not have a mapper
904 If all inputs have the same mapper it will be used as the default mapper.
908 repoDataList : list of RepoData
909 All the RepoDatas loaded by this butler,
in search order.
914 If a default mapper can
not be established
and there
is an output that does
not have a mapper.
916 needyOutputs = [rd for rd
in repoDataList
if rd.role ==
'output' and rd.cfg.mapper
is None]
917 if len(needyOutputs) == 0:
919 mappers =
set([rd.cfg.mapper
for rd
in repoDataList
if rd.role ==
'input'])
920 if len(mappers) != 1:
921 inputs = [rd
for rd
in repoDataList
if rd.role ==
'input']
923 (
"No default mapper could be established from inputs:{} and no mapper specified "
924 "for outputs:{}").
format(inputs, needyOutputs))
925 defaultMapper = mappers.pop()
926 for repoData
in needyOutputs:
927 repoData.cfg.mapper = defaultMapper
929 def _connectParentRepoDatas(self, repoDataList):
930 """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it.
934 repoDataList : list of RepoData
935 All the RepoDatas loaded by this butler, in search order.
940 When a parent
is listed
in the parents list but
not found
in the repoDataList. This
is not
941 expected to ever happen
and would indicate an internal Butler error.
943 for repoData
in repoDataList:
944 for parent
in repoData.cfg.parents:
946 for otherRepoData
in repoDataList:
947 if isinstance(parent, RepositoryCfg):
948 if otherRepoData.repoData.repoData.cfg == parent:
949 parentToAdd = otherRepoData.repoData
951 elif otherRepoData.repoData.cfg.root == parent:
952 parentToAdd = otherRepoData.repoData
954 if parentToAdd
is None:
956 "Could not find a parent matching {} to add to {}".
format(parent, repoData))
957 repoData.addParentRepoData(parentToAdd)
960 def _getParentRepoData(parent, repoDataList):
961 """get a parent RepoData from a cfg from a list of RepoData
965 parent : string or RepositoryCfg
966 cfgRoot of a repo
or a cfg that describes the repo
967 repoDataList : list of RepoData
973 A RepoData
if one can be found,
else None
976 for otherRepoData
in repoDataList:
977 if isinstance(parent, RepositoryCfg):
978 if otherRepoData.cfg == parent:
979 repoData = otherRepoData
981 elif otherRepoData.cfg.root == parent:
982 repoData = otherRepoData
986 def _setRepoDataTags(self):
987 """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
989 def setTags(repoData, tags, context):
990 if id(repoData)
in context:
992 repoData.addTags(tags)
993 context.add(
id(repoData))
994 for parentRepoData
in repoData.parentRepoDatas:
995 setTags(parentRepoData, tags, context)
996 for repoData
in self.
_repos_repos.outputs() + self.
_repos_repos.inputs():
997 setTags(repoData.repoData, repoData.repoArgs.tags,
set())
999 def _convertV1Args(self, root, mapper, mapperArgs):
1000 """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs
1006 Posix path to repository root
1007 mapper : class,
class instance, or
string
1008 Instantiated
class, a
class object to be instantiated, or
a string that refers
to a class that
1009 can be imported & used
as the mapper.
1011 RepositoryArgs & their values used when instantiating the mapper.
1016 (inputs, outputs) - values to be used
for inputs
and outputs
in Butler.__init__
1018 if (mapper
and not isinstance(mapper, str)
1019 and not inspect.isclass(mapper)):
1020 self.
loglog.
warn(preinitedMapperWarning)
1023 if hasattr(mapper,
'root'):
1029 outputs = RepositoryArgs(mode=
'rw',
1032 mapperArgs=mapperArgs)
1033 return inputs, outputs
1036 return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1039 def _getDefaultMapper(self):
1041 """Get the default mapper. Currently this means if all the repositories use exactly the same mapper,
1042 that mapper may be considered the default.
1044 This definition may be changing; mappers may be able to exclude themselves as candidates
for default,
1045 and they may nominate a different mapper instead. Also, we may
not want to look at *all* the
1046 repositories, but only a depth-first search on each of the input & output repositories,
and use the
1047 first-found mapper
for each of those. TBD.
1056 Mapper
class or None
1057 Returns the
class type of the default
mapper, or None if
a default
1058 mapper can
not be determined.
1060 defaultMapper = None
1062 for inputRepoData
in self.
_repos_repos.inputs():
1064 if inputRepoData.cfg.mapper
is not None:
1065 mapper = inputRepoData.cfg.mapper
1070 if isinstance(mapper, str):
1072 elif not inspect.isclass(mapper):
1073 mapper = mapper.__class__
1079 if defaultMapper
is None:
1080 defaultMapper = mapper
1081 elif mapper == defaultMapper:
1083 elif mapper
is not None:
1085 return defaultMapper
1087 def _assignDefaultMapper(self, defaultMapper):
1088 for repoData
in self.
_repos_repos.
all().values():
1089 if repoData.cfg.mapper
is None and (repoData.isNewRepository
or repoData.isV1Repository):
1090 if defaultMapper
is None:
1092 "No mapper specified for %s and no default mapper could be determined." %
1094 repoData.cfg.mapper = defaultMapper
1098 """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at
1099 that location or in a parent location.
1101 As we abstract the storage
and support different types of storage locations this method will be
1102 moved entirely into Butler Access,
or made more dynamic,
and the API will very likely change.
"""
1103 return Storage.getMapperClass(root)
1106 """Register an alias that will be substituted in datasetTypes.
1111 The alias keyword. It may start with @
or not. It may
not contain @
except as the first character.
1112 datasetType - string
1113 The string that will be substituted when
@alias is passed into datasetType. It may
not contain
'@'
1117 atLoc = alias.rfind(
'@')
1119 alias =
"@" +
str(alias)
1121 raise RuntimeError(
"Badly formatted alias string: %s" % (alias,))
1124 if datasetType.count(
'@') != 0:
1125 raise RuntimeError(
"Badly formatted type string: %s" % (datasetType))
1130 if key.startswith(alias)
or alias.startswith(key):
1131 raise RuntimeError(
"Alias: %s overlaps with existing alias: %s" % (alias, key))
1135 def getKeys(self, datasetType=None, level=None, tag=None):
1136 """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
1137 entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
1142 datasetType - string
1143 The type of dataset to get keys
for, entire collection
if None.
1145 The hierarchy level to descend to.
None if it should
not be restricted. Use an empty string
if the
1146 mapper should lookup the default level.
1147 tags - any,
or list of any
1148 If tag
is specified then the repo will only be used
if the tag
1149 or a tag
in the list matches a tag used
for that repository.
1153 Returns a dict. The dict keys are the valid data id keys at
or above the given level of hierarchy
for
1154 the dataset type
or the entire collection
if None. The dict values are the basic Python types
1155 corresponding to the keys (int, float, string).
1161 for repoData
in self.
_repos_repos.inputs():
1162 if not tag
or len(tag.intersection(repoData.tags)) > 0:
1163 keys = repoData.repo.getKeys(datasetType, level)
1166 if keys
is not None:
1171 """Get the valid dataset types for all known repos or those matching
1176 tag - any, or list of any
1177 If tag
is specified then the repo will only be used
if the tag
1178 or a tag
in the list matches a tag used
for that repository.
1182 Returns the dataset types
as a set of strings.
1184 datasetTypes = set()
1186 for repoData
in self.
_repos_repos.outputs() + self.
_repos_repos.inputs():
1187 if not tag
or len(tag.intersection(repoData.tags)) > 0:
1188 datasetTypes = datasetTypes.union(
1193 """Returns the valid values for one or more keys when given a partial
1194 input collection data id.
1198 datasetType - string
1199 The type of dataset to inquire about.
1201 Key or tuple of keys to be returned.
1202 dataId - DataId, dict
1203 The partial data id.
1205 Keyword arguments
for the partial data id.
1209 A list of valid values
or tuples of valid values
as specified by the
1214 dataId = DataId(dataId)
1215 dataId.update(**rest)
1219 for repoData
in self.
_repos_repos.inputs():
1220 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
1221 tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1228 if len(format) == 1:
1240 """Determines if a dataset file exists.
1244 datasetType - string
1245 The type of dataset to inquire about.
1246 dataId - DataId, dict
1247 The data id of the dataset.
1249 If True, look only
in locations where the dataset could be written,
1250 and return True only
if it
is present
in all of them.
1251 **rest keyword arguments
for the data id.
1256 True if the dataset exists
or is non-file-based.
1259 dataId = DataId(dataId)
1260 dataId.update(**rest)
1261 locations = self._locate_locate(datasetType, dataId, write=write)
1263 if locations
is None:
1265 locations = [locations]
1270 for location
in locations:
1273 if isinstance(location, ButlerComposite):
1274 for name, componentInfo
in location.componentInfo.items():
1275 if componentInfo.subset:
1276 subset = self.
subsetsubset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1277 exists =
all([obj.datasetExists()
for obj
in subset])
1279 exists = self.
datasetExistsdatasetExists(componentInfo.datasetType, location.dataId)
1283 if not location.repository.exists(location):
1287 def _locate(self, datasetType, dataId, write):
1288 """Get one or more ButlerLocations and/or ButlercComposites.
1292 datasetType : string
1293 The datasetType that is being searched
for. The datasetType may be followed by a dot
and
1294 a component name (component names are specified
in the policy). IE datasetType.componentName
1296 dataId : dict
or DataId
class instance
1300 True if this
is a search to write an object.
False if it
is a search to read an object. This
1301 affects what type (an object
or a container)
is returned.
1305 If write
is False, will
return either a single object
or None. If write
is True, will
return a list
1306 (which may be empty)
1308 repos = self._repos_repos.outputs() if write
else self.
_repos_repos.inputs()
1310 for repoData
in repos:
1312 if not write
and dataId.tag
and len(dataId.tag.intersection(repoData.tags)) == 0:
1314 components = datasetType.split(
'.')
1315 datasetType = components[0]
1316 components = components[1:]
1318 location = repoData.repo.map(datasetType, dataId, write=write)
1321 if location
is None:
1323 location.datasetType = datasetType
1324 if len(components) > 0:
1325 if not isinstance(location, ButlerComposite):
1326 raise RuntimeError(
"The location for a dotted datasetType must be a composite.")
1328 components[0] = location.componentInfo[components[0]].datasetType
1330 datasetType =
'.'.join(components)
1331 location = self.
_locate_locate(datasetType, dataId, write)
1333 if location
is None:
1344 if hasattr(location.mapper,
"bypass_" + location.datasetType):
1348 location.bypass = bypass
1349 except (NoResults, IOError):
1350 self.
loglog.
debug(
"Continuing dataset search while evaluating "
1351 "bypass function for Dataset type:{} Data ID:{} at "
1352 "location {}".
format(datasetType, dataId, location))
1356 if (isinstance(location, ButlerComposite)
or hasattr(location,
'bypass')
1357 or location.repository.exists(location)):
1361 locations.extend(location)
1363 locations.append(location)
1369 def _getBypassFunc(location, dataId):
1370 pythonType = location.getPythonType()
1371 if pythonType
is not None:
1372 if isinstance(pythonType, str):
1374 bypassFunc = getattr(location.mapper,
"bypass_" + location.datasetType)
1375 return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1377 def get(self, datasetType, dataId=None, immediate=True, **rest):
1378 """Retrieves a dataset given an input collection data id.
1382 datasetType - string
1383 The type of dataset to retrieve.
1387 If False use a proxy
for delayed loading.
1389 keyword arguments
for the data id.
1393 An object retrieved
from the dataset (
or a proxy
for one).
1396 dataId = DataId(dataId)
1397 dataId.update(**rest)
1399 location = self._locate_locate(datasetType, dataId, write=False)
1400 if location
is None:
1401 raise NoResults(
"No locations for get:", datasetType, dataId)
1402 self.
loglog.
debug(
"Get type=%s keys=%s from %s", datasetType, dataId,
str(location))
1404 if hasattr(location,
'bypass'):
1407 return location.bypass
1410 return self.
_read_read(location)
1411 if location.mapper.canStandardize(location.datasetType):
1412 innerCallback = callback
1415 return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1418 return ReadProxy(callback)
1420 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1421 """Persists a dataset given an output collection data id.
1426 The object to persist.
1427 datasetType - string
1428 The type of dataset to persist.
1432 If True, rename existing instead of overwriting.
1433 WARNING: Setting doBackup=
True is not safe
for parallel processing,
as it may be subject to race
1436 Keyword arguments
for the data id.
1439 dataId = DataId(dataId)
1440 dataId.update(**rest)
1442 locations = self._locate_locate(datasetType, dataId, write=True)
1444 raise NoResults(
"No locations for put:", datasetType, dataId)
1445 for location
in locations:
1446 if isinstance(location, ButlerComposite):
1447 disassembler = location.disassembler
if location.disassembler
else genericDisassembler
1448 disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1449 for name, info
in location.componentInfo.items():
1450 if not info.inputOnly:
1451 self.
putput(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1454 location.getRepository().backup(location.datasetType, dataId)
1455 location.getRepository().
write(location, obj)
1457 def subset(self, datasetType, level=None, dataId={}, **rest):
1458 """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
1460 Given a partial (or empty) dataId specified
in dataId
and **rest, find all datasets that match the
1461 dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit
or
1462 sensor
or amp
for a camera). Return an iterable collection of complete dataIds
as ButlerDataRefs.
1463 Datasets
with the resulting dataIds may
not exist; that needs to be tested
with datasetExists().
1467 datasetType - string
1468 The type of dataset collection to subset
1470 The level of dataId at which to subset. Use an empty string
if the mapper should look up the
1475 Keyword arguments
for the data id.
1479 subset - ButlerSubset
1480 Collection of ButlerDataRefs
for datasets matching the data id.
1484 To
print the full dataIds
for all r-band measurements
in a source catalog
1485 (note that the subset call
is equivalent to: `butler.subset(
'src', dataId={
'filter':
'r'})`):
1487 >>> subset = butler.subset(
'src', filter=
'r')
1488 >>>
for data_ref
in subset: print(data_ref.dataId)
1498 dataId = DataId(dataId)
1499 dataId.update(**rest)
1500 return ButlerSubset(self, datasetType, level, dataId)
1502 def dataRef(self, datasetType, level=None, dataId={}, **rest):
1503 """Returns a single ButlerDataRef.
1505 Given a complete dataId specified in dataId
and **rest, find the unique dataset at the given level
1506 specified by a dataId key (e.g. visit
or sensor
or amp
for a camera)
and return a ButlerDataRef.
1510 datasetType - string
1511 The type of dataset collection to reference
1513 The level of dataId at which to reference
1517 Keyword arguments
for the data id.
1521 dataRef - ButlerDataRef
1522 ButlerDataRef
for dataset matching the data id
1526 dataId = DataId(dataId)
1527 subset = self.subsetsubset(datasetType, level, dataId, **rest)
1528 if len(subset) != 1:
1529 raise RuntimeError(
"No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
1530 (
str(datasetType),
str(level),
str(dataId),
str(rest)))
1531 return ButlerDataRef(subset, subset.cache[0])
1533 def getUri(self, datasetType, dataId=None, write=False, **rest):
1534 """Return the URI for a dataset
1536 .. warning:: This is intended only
for debugging. The URI should
1537 never be used
for anything other than printing.
1539 .. note:: In the event there are multiple URIs
for read, we
return only
1542 .. note::
getUri() does
not currently support composite datasets.
1547 The dataset type of interest.
1548 dataId : `dict`, optional
1549 The data identifier.
1550 write : `bool`, optional
1551 Return the URI
for writing?
1552 rest : `dict`, optional
1553 Keyword arguments
for the data id.
1561 dataId = DataId(dataId)
1562 dataId.update(**rest)
1563 locations = self._locate_locate(datasetType, dataId, write=write)
1564 if locations
is None:
1565 raise NoResults(
"No locations for getUri: ", datasetType, dataId)
1570 for location
in locations:
1571 if isinstance(location, ButlerComposite):
1572 for name, info
in location.componentInfo.items():
1573 if not info.inputOnly:
1574 return self.
getUrigetUri(info.datasetType, location.dataId, write=
True)
1576 return location.getLocationsWithRoot()[0]
1578 raise NoResults(
"No locations for getUri(write=True): ", datasetType, dataId)
1581 return locations.getLocationsWithRoot()[0]
1583 def _read(self, location):
1584 """Unpersist an object using data inside a ButlerLocation or ButlerComposite object.
1588 location : ButlerLocation or ButlerComposite
1589 A ButlerLocation
or ButlerComposite instance populated
with data needed to read the object.
1594 An instance of the object specified by the location.
1596 self.loglog.debug("Starting read from %s", location)
1598 if isinstance(location, ButlerComposite):
1599 for name, componentInfo
in location.componentInfo.items():
1600 if componentInfo.subset:
1601 subset = self.
subsetsubset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1602 componentInfo.obj = [obj.get()
for obj
in subset]
1604 obj = self.
getget(componentInfo.datasetType, location.dataId, immediate=
True)
1605 componentInfo.obj = obj
1606 assembler = location.assembler
or genericAssembler
1607 results = assembler(dataId=location.dataId, componentInfo=location.componentInfo,
1608 cls=location.python)
1611 results = location.repository.read(location)
1612 if len(results) == 1:
1613 results = results[0]
1614 self.
loglog.
debug(
"Ending read from %s", location)
1621 def _resolveDatasetTypeAlias(self, datasetType):
1622 """Replaces all the known alias keywords in the given string with the alias value.
1626 datasetType - string
1627 A datasetType string to search & replace on
1631 datasetType - string
1632 The de-aliased string
1636 if datasetType.find(
'@') == -1:
1641 if datasetType.find(
'@') != -1:
1642 raise RuntimeError(
"Unresolvable alias specifier in datasetType: %s" % (datasetType))
1647def _unreduce(initArgs, datasetTypeAliasDict):
1648 mapperArgs = initArgs.pop(
'mapperArgs')
1649 initArgs.update(mapperArgs)
1650 butler =
Butler(**initArgs)
1651 butler.datasetTypeAliasDict = datasetTypeAliasDict
def __init__(self, cls, repoCfg)
def _read(self, location)
def _initRepo(self, repoData)
def _resolveDatasetTypeAlias(self, datasetType)
def queryMetadata(self, datasetType, format, dataId={}, **rest)
def getKeys(self, datasetType=None, level=None, tag=None)
def datasetExists(self, datasetType, dataId={}, write=False, **rest)
def _setRepoDataTags(self)
def _getCfgs(self, repoDataList)
def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
def dataRef(self, datasetType, level=None, dataId={}, **rest)
def getUri(self, datasetType, dataId=None, write=False, **rest)
def _getParentVal(repoData)
def _addParents(self, repoDataList)
def subset(self, datasetType, level=None, dataId={}, **rest)
def _convertV1Args(self, root, mapper, mapperArgs)
def defineAlias(self, alias, datasetType)
def get(self, datasetType, dataId=None, immediate=True, **rest)
def _setDefaultMapper(self, repoDataList)
def getDatasetTypes(self, tag=None)
def put(self, obj, datasetType, dataId={}, doBackup=False, **rest)
def _locate(self, datasetType, dataId, write)
def _getRepositoryCfg(self, repositoryArgs)
def _getBypassFunc(location, dataId)
def _connectParentRepoDatas(self, repoDataList)
def _setAndVerifyParentsLists(self, repoDataList)
def _buildLookupLists(self)
def __init__(self, repoDataList)
def __init__(self, args, role)
def setCfg(self, cfg, origin, root, isV1Repository)
def addParentRepoData(self, parentRepoData)
def getParentRepoDatas(self, context=None)
def isNewRepository(self)
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def write(self, patchRef, catalog)
Write the output.