27 """This module defines the Butler class.""" 34 from .
import ReadProxy, ButlerSubset, ButlerDataRef, \
35 Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
36 RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
37 genericDisassembler, PosixStorage, ParentsMismatch
39 preinitedMapperWarning = (
"Passing an instantiated mapper into " +
40 "Butler.__init__ will prevent Butler from passing " +
41 "parentRegistry or repositoryCfg information to " +
42 "the mapper, which is done only at init time. " +
43 "It is better to pass a importable string or " +
48 """Represents a Butler configuration. 52 cfg is 'wet paint' and very likely to change. Use of it in production 53 code other than via the 'old butler' API is strongly discouraged. 55 yaml_tag =
u"!ButlerCfg" 58 super().
__init__({
'repoCfg': repoCfg,
'cls': cls})
62 """Container object for repository data used by Butler 67 The arguments that are used to find or create the RepositoryCfg. 69 "input", "output", or "parent", indicating why Butler loaded this repository. 70 * input: the Repository was passed as a Butler input. 71 * output: the Repository was passed as a Butler output. 72 * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository. 77 The configuration for the Repository. 80 "new", "existing", or "nested". Indicates the origin of the repository and its RepositoryCfg: 81 * new: it was created by this instance of Butler, and this instance of Butler will generate the 83 * existing: it was found (via the root or cfgRoot argument) 84 * nested: the full RepositoryCfg was nested in another RepositoryCfg's parents list (this can happen 85 if parameters of an input specified by RepositoryArgs or dict does not entirely match an existing 89 Path or URI to the location of the RepositoryCfg file. 91 repo : lsst.daf.persistence.Repository 92 The Repository class instance. 94 parentRepoDatas : list of RepoData 95 The parents of this Repository, as indicated this Repository's RepositoryCfg. If this is a new 96 Repository then these are the inputs to this Butler (and will be saved in the RepositoryCfg). These 97 RepoData objects are not owned by this RepoData, these are references to peer RepoData objects in the 98 Butler's RepoDataContainer. 100 isV1Repository : bool 101 True if this is an Old Butler repository. In this case the repository does not have a RepositoryCfg 102 file. It may have a _mapper file and may have a _parent symlink. It will never be treated as a "new" 103 repository, i.e. even though there is not a RepositoryCfg file, one will not be generated. 104 If False, this is a New Butler repository and is specified by RepositoryCfg file. 107 These are values that may be used to restrict the search of input repositories. Details are available 108 in the RepositoryArgs and DataId classes. 111 "input", "output", or "parent", indicating why Butler loaded this repository. 112 * input: the Repository was passed as a Butler input. 113 * output: the Repository was passed as a Butler output. 114 * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository. 116 _repoArgs : RepositoryArgs 117 Contains the arguments that were used to specify this Repository. 147 "parentRepoDatas={}," +
150 "parentRegistry={})").
format(
151 self.__class__.__name__,
163 def setCfg(self, cfg, origin, root, isV1Repository):
164 """Set information about the cfg into the RepoData 169 The RepositoryCfg for the repo. 171 'new', 'existing', or 'nested' 173 URI or absolute path to the location of the RepositoryCfg.yaml file. 179 if origin
not in (
'new',
'existing',
'nested'):
180 raise RuntimeError(
"Invalid value for origin:{}".
format(origin))
200 if val
not in (
'input',
'output',
'parent'):
201 raise RuntimeError(
"Invalid value for role: {}".
format(val))
205 """Get the parents & grandparents etc of this repo data, in depth-first search order. 207 Duplicate entries will be removed in cases where the same parent appears more than once in the parent 212 context : set, optional 213 Users should typically omit context and accept the default argument. Context is used to keep a set 214 of known RepoDatas when calling this function recursively, for duplicate elimination. 219 A list of the parents & grandparents etc of a given repo data, in depth-first search order. 224 if id(self)
in context:
226 context.add(
id(self))
228 parents.append(parent)
229 parents += parent.getParentRepoDatas(context)
240 """Container object for RepoData instances owned by a Butler instance. 244 repoDataList : list of RepoData 245 repoData - RepoData instance to add 251 self.
_all = repoDataList
255 """Get a list of RepoData that are used to as inputs to the Butler. 256 The list is created lazily as needed, and cached. 260 A list of RepoData with readable repositories, in the order to be used when searching. 263 raise RuntimeError(
"Inputs not yet initialized.")
267 """Get a list of RepoData that are used to as outputs to the Butler. 268 The list is created lazily as needed, and cached. 272 A list of RepoData with writable repositories, in the order to be use when searching. 275 raise RuntimeError(
"Outputs not yet initialized.")
279 """Get a list of all RepoData that are used to as by the Butler. 280 The list is created lazily as needed, and cached. 284 A list of RepoData with writable repositories, in the order to be use when searching. 289 return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
290 self.__class__.__name__,
295 def _buildLookupLists(self):
296 """Build the inputs and outputs lists based on the order of self.all().""" 298 def addToList(repoData, lst):
299 """Add a repoData and each of its parents (depth first) to a list""" 300 if id(repoData)
in alreadyAdded:
303 alreadyAdded.add(
id(repoData))
304 for parent
in repoData.parentRepoDatas:
305 addToList(parent, lst)
308 raise RuntimeError(
"Lookup lists are already built.")
309 inputs = [repoData
for repoData
in self.
all()
if repoData.role ==
'input']
310 outputs = [repoData
for repoData
in self.
all()
if repoData.role ==
'output']
313 for repoData
in outputs:
314 if 'r' in repoData.repoArgs.mode: 315 addToList(repoData.repoData, self._inputs) 316 for repoData
in inputs:
317 addToList(repoData.repoData, self.
_inputs)
318 self.
_outputs = [repoData.repoData
for repoData
in outputs]
322 """Butler provides a generic mechanism for persisting and retrieving data using mappers. 324 A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its 325 intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the 326 object containing the data. For example, an ExposureF object might be used to hold the data for a raw 327 image, a post-ISR image, a calibrated science image, or a difference image. These would all be different 330 A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if 331 given a partial data identifier. It can check for the existence of a file containing a dataset given its 332 type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to 333 an appropriate location when given its associated data identifier. 335 Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is 336 lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved 337 and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not 338 using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This 339 function, contained in the input mapper object, must perform any necessary manipulations to force the 340 retrieved object to conform to standards, including translating metadata. 344 __init__(self, root, mapper=None, **mapperArgs) 346 defineAlias(self, alias, datasetType) 348 getKeys(self, datasetType=None, level=None) 350 queryMetadata(self, datasetType, format=None, dataId={}, **rest) 352 datasetExists(self, datasetType, dataId={}, **rest) 354 get(self, datasetType, dataId={}, immediate=False, **rest) 356 put(self, obj, datasetType, dataId={}, **rest) 358 subset(self, datasetType, level=None, dataId={}, **rest) 360 dataRef(self, datasetType, level=None, dataId={}, **rest) 364 The preferred method of initialization is to use the `inputs` and `outputs` __init__ parameters. These 365 are described in the parameters section, below. 367 For backward compatibility: this initialization method signature can take a posix root path, and 368 optionally a mapper class instance or class type that will be instantiated using the mapperArgs input 369 argument. However, for this to work in a backward compatible way it creates a single repository that is 370 used as both an input and an output repository. This is NOT preferred, and will likely break any 371 provenance system we have in place. 376 .. note:: Deprecated in 12_0 377 `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for 378 multiple-repository support. 379 A file system path. Will only work with a PosixRepository. 380 mapper : string or instance 381 .. note:: Deprecated in 12_0 382 `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for 383 multiple-repository support. 384 Provides a mapper to be used with Butler. 386 .. note:: Deprecated in 12_0 387 `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for 388 multiple-repository support. 389 Provides arguments to be passed to the mapper if the mapper input argument is a class type to be 390 instantiated by Butler. 391 inputs : RepositoryArgs, dict, or string 392 Can be a single item or a list. Provides arguments to load an existing repository (or repositories). 393 String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local 394 file system URI does not have to start with 'file://' and in this way can be a relative path). The 395 `RepositoryArgs` class can be used to provide more parameters with which to initialize a repository 396 (such as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation for more 397 details). A dict may be used as shorthand for a `RepositoryArgs` class instance. The dict keys must 398 match parameters to the `RepositoryArgs.__init__` function. 399 outputs : RepositoryArgs, dict, or string 400 Provides arguments to load one or more existing repositories or create new ones. The different types 401 are handled the same as for `inputs`. 403 The Butler init sequence loads all of the input and output repositories. 404 This creates the object hierarchy to read from and write to them. Each 405 repository can have 0 or more parents, which also get loaded as inputs. 406 This becomes a DAG of repositories. Ultimately, Butler creates a list of 407 these Repositories in the order that they are used. 409 Initialization Sequence 410 ======================= 412 During initialization Butler creates a Repository class instance & support structure for each object 413 passed to `inputs` and `outputs` as well as the parent repositories recorded in the `RepositoryCfg` of 414 each existing readable repository. 416 This process is complex. It is explained below to shed some light on the intent of each step. 418 1. Input Argument Standardization 419 --------------------------------- 421 In `Butler._processInputArguments` the input arguments are verified to be legal (and a RuntimeError is 422 raised if not), and they are converted into an expected format that is used for the rest of the Butler 423 init sequence. See the docstring for `_processInputArguments`. 425 2. Create RepoData Objects 426 -------------------------- 428 Butler uses an object, called `RepoData`, to keep track of information about each repository; each 429 repository is contained in a single `RepoData`. The attributes are explained in its docstring. 431 After `_processInputArguments`, a RepoData is instantiated and put in a list for each repository in 432 `outputs` and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output and input 433 repositories (but not parent repositories) that this Butler instance will use. 435 3. Get `RepositoryCfg`s 436 ----------------------- 438 `Butler._getCfgs` gets the `RepositoryCfg` for each repository the `repoDataList`. The behavior is 439 described in the docstring. 444 `Butler._addParents` then considers the parents list in the `RepositoryCfg` of each `RepoData` in the 445 `repoDataList` and inserts new `RepoData` objects for each parent not represented in the proper location 446 in the `repoDataList`. Ultimately a flat list is built to represent the DAG of readable repositories 447 represented in depth-first order. 449 5. Set and Verify Parents of Outputs 450 ------------------------------------ 452 To be able to load parent repositories when output repositories are used as inputs, the input repositories 453 are recorded as parents in the `RepositoryCfg` file of new output repositories. When an output repository 454 already exists, for consistency the Butler's inputs must match the list of parents specified the already- 455 existing output repository's `RepositoryCfg` file. 457 In `Butler._setAndVerifyParentsLists`, the list of parents is recorded in the `RepositoryCfg` of new 458 repositories. For existing repositories the list of parents is compared with the `RepositoryCfg`'s parents 459 list, and if they do not match a `RuntimeError` is raised. 461 6. Set the Default Mapper 462 ------------------------- 464 If all the input repositories use the same mapper then we can assume that mapper to be the 465 "default mapper". If there are new output repositories whose `RepositoryArgs` do not specify a mapper and 466 there is a default mapper then the new output repository will be set to use that default mapper. 468 This is handled in `Butler._setDefaultMapper`. 470 7. Cache References to Parent RepoDatas 471 --------------------------------------- 473 In `Butler._connectParentRepoDatas`, in each `RepoData` in `repoDataList`, a list of `RepoData` object 474 references is built that matches the parents specified in that `RepoData`'s `RepositoryCfg`. 476 This list is used later to find things in that repository's parents, without considering peer repository's 477 parents. (e.g. finding the registry of a parent) 482 Tags are described at https://ldm-463.lsst.io/v/draft/#tagging 484 In `Butler._setRepoDataTags`, for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded 485 in a set, and added to the tags set in each of its parents, for ease of lookup when mapping. 487 9. Find Parent Registry and Instantiate RepoData 488 ------------------------------------------------ 490 At this point there is enough information to instantiate the `Repository` instances. There is one final 491 step before instantiating the Repository, which is to try to get a parent registry that can be used by the 492 child repository. The criteria for "can be used" is spelled out in `Butler._setParentRegistry`. However, 493 to get the registry from the parent, the parent must be instantiated. The `repoDataList`, in depth-first 494 search order, is built so that the most-dependent repositories are first, and the least dependent 495 repositories are last. So the `repoDataList` is reversed and the Repositories are instantiated in that 496 order; for each RepoData a parent registry is searched for, and then the Repository is instantiated with 497 whatever registry could be found.""" 500 """This is a Generation 2 Butler. 503 def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
504 self.
_initArgs = {
'root': root,
'mapper': mapper,
'inputs': inputs,
'outputs': outputs,
505 'mapperArgs': mapperArgs}
507 self.
log = Log.getLogger(
"daf.persistence.butler")
510 root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
513 inputs = [
RepoData(args,
'input')
for args
in inputs]
514 outputs = [
RepoData(args,
'output')
for args
in outputs]
515 repoDataList = outputs + inputs
531 for repoData
in repoDataList:
534 def _initRepo(self, repoData):
535 if repoData.repo
is not None:
539 for parentRepoData
in repoData.parentRepoDatas:
540 if parentRepoData.cfg.mapper != repoData.cfg.mapper:
542 if parentRepoData.repo
is None:
544 parentRegistry = parentRepoData.repo.getRegistry()
545 repoData.parentRegistry = parentRegistry
if parentRegistry
else parentRepoData.parentRegistry
546 if repoData.parentRegistry:
550 def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
551 """Process, verify, and standardize the input arguments. 552 * Inputs can not be for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs) 553 `root`, `mapper`, and `mapperArgs` are Old Butler init API. 554 `inputs` and `outputs` are New Butler init API. 555 Old Butler and New Butler init API may not be mixed, Butler may be initialized with only the Old 556 arguments or the New arguments. 557 * Verify that if there is a readable output that there is exactly one output. (This restriction is in 558 place because all readable repositories must be parents of writable repositories, and for 559 consistency the DAG of readable repositories must always be the same. Keeping the list of parents 560 becomes very complicated in the presence of multiple readable output repositories. It is better to 561 only write to output repositories, and then create a new Butler instance and use the outputs as 562 inputs, and write to new output repositories.) 563 * Make a copy of inputs & outputs so they may be modified without changing the passed-in arguments. 564 * Convert any input/output values that are URI strings to RepositoryArgs. 565 * Listify inputs & outputs. 566 * Set default RW mode on inputs & outputs as needed. 570 Same as Butler.__init__ 574 (list of RepositoryArgs, list of RepositoryArgs) 575 First item is a list to use as inputs. 576 Second item is a list to use as outputs. 581 If Old Butler and New Butler arguments are both used this will raise. 582 If an output is readable there is more than one output this will raise. 585 inputs = copy.deepcopy(inputs)
586 outputs = copy.deepcopy(outputs)
588 isV1Args = inputs
is None and outputs
is None 592 mapperArgs=mapperArgs
or None)
593 elif root
or mapper
or mapperArgs:
595 'Butler version 1 API (root, mapper, **mapperArgs) may ' +
596 'not be used with version 2 API (inputs, outputs)')
605 if not isinstance(args, RepositoryArgs)
else args
for args
in inputs]
607 if not isinstance(args, RepositoryArgs)
else args
for args
in outputs]
611 if args.mode
is None:
613 elif 'rw' == args.mode:
615 elif 'r' != args.mode: 616 raise RuntimeError(
"The mode of an input should be readable.")
618 if args.mode
is None:
620 elif 'w' not in args.mode:
621 raise RuntimeError(
"The mode of an output should be writable.")
623 for args
in inputs + outputs:
624 if (args.mapper
and not isinstance(args.mapper, str)
and 625 not inspect.isclass(args.mapper)):
626 self.
log.
warn(preinitedMapperWarning)
631 raise RuntimeError(
"Butler does not support multiple output repositories if any of the " 632 "outputs are readable.")
637 def inputIsInOutputs(inputArgs, outputArgsList):
638 for o
in outputArgsList:
639 if (
'r' in o.mode and 640 o.root == inputArgs.root and 641 o.mapper == inputArgs.mapper
and 642 o.mapperArgs == inputArgs.mapperArgs
and 643 o.tags == inputArgs.tags
and 644 o.policy == inputArgs.policy):
645 self.
log.
debug((
"Input repositoryArgs {} is also listed in outputs as readable; " +
646 "throwing away the input.").
format(inputArgs))
650 inputs = [args
for args
in inputs
if not inputIsInOutputs(args, outputs)]
651 return inputs, outputs
654 def _getParentVal(repoData):
655 """Get the value of this repoData as it should appear in the parents 656 list of other repositories""" 657 if repoData.isV1Repository:
659 if repoData.cfgOrigin ==
'nested':
662 return repoData.cfg.root
665 def _getParents(ofRepoData, repoInfo):
666 """Create a parents list of repoData from inputs and (readable) outputs.""" 669 for repoData
in repoInfo:
670 if repoData
is ofRepoData:
672 if 'r' not in repoData.repoArgs.mode: 674 parents.append(Butler._getParentVal(repoData))
678 def _getOldButlerRepositoryCfg(repositoryArgs):
679 if not Storage.isPosix(repositoryArgs.cfgRoot):
681 if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
683 if not repositoryArgs.mapper:
684 repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
685 cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
686 parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
688 parent = Butler._getOldButlerRepositoryCfg(
RepositoryArgs(cfgRoot=parent, mode=
'r')) 689 if parent
is not None:
690 cfg.addParents([parent])
693 def _getRepositoryCfg(self, repositoryArgs):
694 """Try to get a repository from the location described by cfgRoot. 698 repositoryArgs : RepositoryArgs or string 699 Provides arguments to load an existing repository (or repositories). String is assumed to be a URI 700 and is used as the cfgRoot (URI to the location of the cfg file). 704 (RepositoryCfg or None, bool) 705 The RepositoryCfg, or None if one cannot be found, and True if the RepositoryCfg was created by 706 reading an Old Butler repository, or False if it is a New Butler Repository. 708 if not isinstance(repositoryArgs, RepositoryArgs):
711 cfg = self.storage.getRepositoryCfg(repositoryArgs.cfgRoot) 712 isOldButlerRepository = False 714 cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
716 isOldButlerRepository =
True 717 return cfg, isOldButlerRepository
719 def _getCfgs(self, repoDataList):
720 """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData. 721 If the cfg exists, compare values. If values match then use the cfg as an "existing" cfg. If the 722 values do not match, use the cfg as a "nested" cfg. 723 If the cfg does not exist, the RepositoryArgs must be for a writable repository. 727 repoDataList : list of RepoData 728 The RepoData that are output and inputs of this Butler 733 If the passed-in RepositoryArgs indicate an existing repository but other cfg parameters in those 735 match the existing repository's cfg a RuntimeError will be raised. 737 def cfgMatchesArgs(args, cfg):
738 """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg""" 739 if args.mapper
is not None and cfg.mapper != args.mapper:
741 if args.mapperArgs
is not None and cfg.mapperArgs != args.mapperArgs:
743 if args.policy
is not None and cfg.policy != args.policy:
747 for repoData
in repoDataList:
750 if 'w' not in repoData.repoArgs.mode:
752 "No cfg found for read-only input repository at {}".
format(repoData.repoArgs.cfgRoot))
753 repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
755 root=repoData.repoArgs.cfgRoot,
756 isV1Repository=isOldButlerRepository)
766 for i, parent
in enumerate(cfg.parents):
767 if isinstance(parent, RepositoryCfg):
770 if parentIsOldButlerRepository:
771 parentCfg.mapperArgs = cfg.mapperArgs
772 self.
log.
info((
"Butler is replacing an Old Butler parent repository path '{}' " 773 "found in the parents list of a New Butler repositoryCfg: {} " 774 "with a repositoryCfg that includes the child repository's " 775 "mapperArgs: {}. This affects the instantiated RepositoryCfg " 776 "but does not change the persisted child repositoryCfg.yaml file." 777 ).
format(parent, cfg, parentCfg))
778 cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
780 if 'w' in repoData.repoArgs.mode:
782 if not cfgMatchesArgs(repoData.repoArgs, cfg):
783 raise RuntimeError((
"The RepositoryArgs and RepositoryCfg must match for writable " +
784 "repositories, RepositoryCfg:{}, RepositoryArgs:{}").
format(
785 cfg, repoData.repoArgs))
786 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
787 isV1Repository=isOldButlerRepository)
790 if cfgMatchesArgs(repoData.repoArgs, cfg):
791 repoData.setCfg(cfg=cfg, origin=
'existing', root=repoData.repoArgs.cfgRoot,
792 isV1Repository=isOldButlerRepository)
794 repoData.setCfg(cfg=cfg, origin=
'nested', root=
None,
795 isV1Repository=isOldButlerRepository)
797 def _addParents(self, repoDataList):
798 """For each repoData in the input list, see if its parents are the next items in the list, and if not 799 add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n. 803 repoDataList : list of RepoData 804 The RepoData for the Butler outputs + inputs. 809 Raised if a RepositoryCfg can not be found at a location where a parent repository should be. 813 if repoDataIdx == len(repoDataList):
815 repoData = repoDataList[repoDataIdx]
816 if 'r' not in repoData.repoArgs.mode: 819 if repoData.isNewRepository:
822 if repoData.cfg.parents
is None:
825 for repoParentIdx, repoParent
in enumerate(repoData.cfg.parents):
826 parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
827 if not isinstance(repoParent, RepositoryCfg):
829 if repoParentCfg
is not None:
830 cfgOrigin =
'existing' 832 isOldButlerRepository =
False 833 repoParentCfg = repoParent
835 if (parentIdxInRepoDataList < len(repoDataList)
and 836 repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
839 role = 'input' if repoData.role ==
'output' else 'parent' 841 newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
842 isV1Repository=isOldButlerRepository)
843 repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
846 def _setAndVerifyParentsLists(self, repoDataList):
847 """Make a list of all the input repositories of this Butler, these are the parents of the outputs. 848 For new output repositories, set the parents in the RepositoryCfg. For existing output repositories 849 verify that the RepositoryCfg's parents match the parents list. 853 repoDataList : list of RepoData 854 All the RepoDatas loaded by this butler, in search order. 859 If an existing output repository is loaded and its parents do not match the parents of this Butler 860 an error will be raised. 862 def getIOParents(ofRepoData, repoDataList):
863 """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable 864 outputs (not parents-of-parents) of this butler""" 866 for repoData
in repoDataList:
867 if repoData.role ==
'parent':
869 if repoData
is ofRepoData:
871 if repoData.role ==
'output':
872 if 'r' in repoData.repoArgs.mode: 873 raise RuntimeError(
"If an output is readable it must be the only output.")
880 for repoData
in repoDataList:
881 if repoData.role !=
'output':
883 parents = getIOParents(repoData, repoDataList)
885 if repoData.cfgOrigin ==
'new':
886 repoData.cfg.addParents(parents)
887 elif repoData.cfgOrigin
in (
'existing',
'nested'):
888 if repoData.cfg.parents != parents:
890 repoData.cfg.extendParents(parents)
891 except ParentsMismatch
as e:
892 raise RuntimeError((
"Inputs of this Butler:{} do not match parents of existing " +
893 "writable cfg:{} (ParentMismatch exception: {}").
format(
894 parents, repoData.cfg.parents, e))
896 def _setDefaultMapper(self, repoDataList):
897 """Establish a default mapper if there is one and assign it to outputs that do not have a mapper 900 If all inputs have the same mapper it will be used as the default mapper. 904 repoDataList : list of RepoData 905 All the RepoDatas loaded by this butler, in search order. 910 If a default mapper can not be established and there is an output that does not have a mapper. 912 needyOutputs = [rd
for rd
in repoDataList
if rd.role ==
'output' and rd.cfg.mapper
is None]
913 if len(needyOutputs) == 0:
915 mappers =
set([rd.cfg.mapper
for rd
in repoDataList
if rd.role ==
'input'])
916 if len(mappers) != 1:
917 inputs = [rd
for rd
in repoDataList
if rd.role ==
'input']
919 (
"No default mapper could be established from inputs:{} and no mapper specified " +
920 "for outputs:{}").
format(inputs, needyOutputs))
921 defaultMapper = mappers.pop()
922 for repoData
in needyOutputs:
923 repoData.cfg.mapper = defaultMapper
925 def _connectParentRepoDatas(self, repoDataList):
926 """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it. 930 repoDataList : list of RepoData 931 All the RepoDatas loaded by this butler, in search order. 936 When a parent is listed in the parents list but not found in the repoDataList. This is not 937 expected to ever happen and would indicate an internal Butler error. 939 for repoData
in repoDataList:
940 for parent
in repoData.cfg.parents:
942 for otherRepoData
in repoDataList:
943 if isinstance(parent, RepositoryCfg):
944 if otherRepoData.repoData.repoData.cfg == parent:
945 parentToAdd = otherRepoData.repoData
947 elif otherRepoData.repoData.cfg.root == parent:
948 parentToAdd = otherRepoData.repoData
950 if parentToAdd
is None:
952 "Could not find a parent matching {} to add to {}".
format(parent, repoData))
953 repoData.addParentRepoData(parentToAdd)
956 def _getParentRepoData(parent, repoDataList):
957 """get a parent RepoData from a cfg from a list of RepoData 961 parent : string or RepositoryCfg 962 cfgRoot of a repo or a cfg that describes the repo 963 repoDataList : list of RepoData 969 A RepoData if one can be found, else None 972 for otherRepoData
in repoDataList:
973 if isinstance(parent, RepositoryCfg):
974 if otherRepoData.cfg == parent:
975 repoData = otherRepoData
977 elif otherRepoData.cfg.root == parent:
978 repoData = otherRepoData
982 def _setRepoDataTags(self):
983 """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged 985 def setTags(repoData, tags, context):
986 if id(repoData)
in context:
988 repoData.addTags(tags)
989 context.add(
id(repoData))
990 for parentRepoData
in repoData.parentRepoDatas:
991 setTags(parentRepoData, tags, context)
992 for repoData
in self.
_repos.outputs() + self.
_repos.inputs():
993 setTags(repoData.repoData, repoData.repoArgs.tags,
set())
995 def _convertV1Args(self, root, mapper, mapperArgs):
996 """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs 1002 Posix path to repository root 1003 mapper : class, class instance, or string 1004 Instantiated class, a class object to be instantiated, or a string that refers to a class that 1005 can be imported & used as the mapper. 1007 RepositoryArgs & their values used when instantiating the mapper. 1012 (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__ 1014 if (mapper
and not isinstance(mapper, str)
and 1015 not inspect.isclass(mapper)):
1016 self.
log.
warn(preinitedMapperWarning)
1019 if hasattr(mapper,
'root'):
1028 mapperArgs=mapperArgs)
1029 return inputs, outputs
1032 return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1035 def _getDefaultMapper(self):
1037 """Get the default mapper. Currently this means if all the repositories use exactly the same mapper, 1038 that mapper may be considered the default. 1040 This definition may be changing; mappers may be able to exclude themselves as candidates for default, 1041 and they may nominate a different mapper instead. Also, we may not want to look at *all* the 1042 repositories, but only a depth-first search on each of the input & output repositories, and use the 1043 first-found mapper for each of those. TBD. 1052 Mapper class or None 1053 Returns the class type of the default mapper, or None if a default 1054 mapper can not be determined. 1056 defaultMapper =
None 1058 for inputRepoData
in self.
_repos.inputs():
1060 if inputRepoData.cfg.mapper
is not None:
1061 mapper = inputRepoData.cfg.mapper
1066 if isinstance(mapper, str):
1068 elif not inspect.isclass(mapper):
1069 mapper = mapper.__class__
1075 if defaultMapper
is None:
1076 defaultMapper = mapper
1077 elif mapper == defaultMapper:
1079 elif mapper
is not None:
1081 return defaultMapper
1083 def _assignDefaultMapper(self, defaultMapper):
1084 for repoData
in self.
_repos.
all().values():
1085 if repoData.cfg.mapper
is None and (repoData.isNewRepository
or repoData.isV1Repository):
1086 if defaultMapper
is None:
1088 "No mapper specified for %s and no default mapper could be determined." %
1090 repoData.cfg.mapper = defaultMapper
1094 """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at 1095 that location or in a parent location. 1097 As we abstract the storage and support different types of storage locations this method will be 1098 moved entirely into Butler Access, or made more dynamic, and the API will very likely change.""" 1099 return Storage.getMapperClass(root)
1102 """Register an alias that will be substituted in datasetTypes. 1107 The alias keyword. It may start with @ or not. It may not contain @ except as the first character. 1108 datasetType - string 1109 The string that will be substituted when @alias is passed into datasetType. It may not contain '@' 1113 atLoc = alias.rfind(
'@')
1115 alias =
"@" + str(alias)
1117 raise RuntimeError(
"Badly formatted alias string: %s" % (alias,))
1120 if datasetType.count(
'@') != 0:
1121 raise RuntimeError(
"Badly formatted type string: %s" % (datasetType))
1126 if key.startswith(alias)
or alias.startswith(key):
1127 raise RuntimeError(
"Alias: %s overlaps with existing alias: %s" % (alias, key))
1131 def getKeys(self, datasetType=None, level=None, tag=None):
1132 """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the 1133 entire collection if None. The dict values are the basic Python types corresponding to the keys (int, 1138 datasetType - string 1139 The type of dataset to get keys for, entire collection if None. 1141 The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the 1142 mapper should lookup the default level. 1143 tags - any, or list of any 1144 Any object that can be tested to be the same as the tag in a dataId passed into butler input 1145 functions. Applies only to input repositories: If tag is specified by the dataId then the repo 1146 will only be read from used if the tag in the dataId matches a tag used for that repository. 1150 Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for 1151 the dataset type or the entire collection if None. The dict values are the basic Python types 1152 corresponding to the keys (int, float, string). 1158 for repoData
in self.
_repos.inputs():
1159 if not tag
or len(tag.intersection(repoData.tags)) > 0:
1160 keys = repoData.repo.getKeys(datasetType, level)
1163 if keys
is not None:
1168 """Returns the valid values for one or more keys when given a partial 1169 input collection data id. 1173 datasetType - string 1174 The type of dataset to inquire about. 1176 Key or tuple of keys to be returned. 1177 dataId - DataId, dict 1178 The partial data id. 1180 Keyword arguments for the partial data id. 1184 A list of valid values or tuples of valid values as specified by the 1190 dataId.update(**rest)
1194 for repoData
in self.
_repos.inputs():
1195 if not dataId.tag
or len(dataId.tag.intersection(repoData.tags)) > 0:
1196 tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1203 if len(format) == 1:
1215 """Determines if a dataset file exists. 1219 datasetType - string 1220 The type of dataset to inquire about. 1221 dataId - DataId, dict 1222 The data id of the dataset. 1224 If True, look only in locations where the dataset could be written, 1225 and return True only if it is present in all of them. 1226 **rest keyword arguments for the data id. 1231 True if the dataset exists or is non-file-based. 1235 dataId.update(**rest)
1236 locations = self.
_locate(datasetType, dataId, write=write)
1238 if locations
is None:
1240 locations = [locations]
1245 for location
in locations:
1248 if isinstance(location, ButlerComposite):
1249 for name, componentInfo
in location.componentInfo.items():
1250 if componentInfo.subset:
1251 subset = self.
subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1252 exists =
all([obj.datasetExists()
for obj
in subset])
1254 exists = self.
datasetExists(componentInfo.datasetType, location.dataId)
1258 if not location.repository.exists(location):
1262 def _locate(self, datasetType, dataId, write):
1263 """Get one or more ButlerLocations and/or ButlercComposites. 1267 datasetType : string 1268 The datasetType that is being searched for. The datasetType may be followed by a dot and 1269 a component name (component names are specified in the policy). IE datasetType.componentName 1271 dataId : dict or DataId class instance 1275 True if this is a search to write an object. False if it is a search to read an object. This 1276 affects what type (an object or a container) is returned. 1280 If write is False, will return either a single object or None. If write is True, will return a list 1281 (which may be empty) 1283 repos = self.
_repos.outputs()
if write
else self.
_repos.inputs()
1285 for repoData
in repos:
1287 if not write
and dataId.tag
and len(dataId.tag.intersection(repoData.tags)) == 0:
1289 components = datasetType.split(
'.')
1290 datasetType = components[0]
1291 components = components[1:]
1293 location = repoData.repo.map(datasetType, dataId, write=write)
1296 if location
is None:
1298 location.datasetType = datasetType
1299 if len(components) > 0:
1300 if not isinstance(location, ButlerComposite):
1301 raise RuntimeError(
"The location for a dotted datasetType must be a composite.")
1303 components[0] = location.componentInfo[components[0]].datasetType
1305 datasetType =
'.'.join(components)
1306 location = self.
_locate(datasetType, dataId, write)
1308 if location
is None:
1319 if hasattr(location.mapper,
"bypass_" + location.datasetType):
1323 location.bypass = bypass
1324 except (NoResults, IOError):
1325 self.
log.
debug(
"Continuing dataset search while evaluating " 1326 "bypass function for Dataset type:{} Data ID:{} at " 1327 "location {}".
format(datasetType, dataId, location))
1331 if (isinstance(location, ButlerComposite)
or hasattr(location,
'bypass')
or 1332 location.repository.exists(location)):
1336 locations.extend(location)
1338 locations.append(location)
1344 def _getBypassFunc(location, dataId):
1345 pythonType = location.getPythonType()
1346 if pythonType
is not None:
1347 if isinstance(pythonType, str):
1349 bypassFunc = getattr(location.mapper,
"bypass_" + location.datasetType)
1350 return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1352 def get(self, datasetType, dataId=None, immediate=True, **rest):
1353 """Retrieves a dataset given an input collection data id. 1357 datasetType - string 1358 The type of dataset to retrieve. 1362 If False use a proxy for delayed loading. 1364 keyword arguments for the data id. 1368 An object retrieved from the dataset (or a proxy for one). 1372 dataId.update(**rest)
1374 location = self.
_locate(datasetType, dataId, write=
False)
1375 if location
is None:
1376 raise NoResults(
"No locations for get:", datasetType, dataId)
1377 self.
log.
debug(
"Get type=%s keys=%s from %s", datasetType, dataId, str(location))
1379 if hasattr(location,
'bypass'):
1382 return location.bypass
1385 return self.
_read(location)
1386 if location.mapper.canStandardize(location.datasetType):
1387 innerCallback = callback
1390 return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1393 return ReadProxy(callback)
1395 def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1396 """Persists a dataset given an output collection data id. 1401 The object to persist. 1402 datasetType - string 1403 The type of dataset to persist. 1407 If True, rename existing instead of overwriting. 1408 WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race 1411 Keyword arguments for the data id. 1415 dataId.update(**rest)
1417 locations = self.
_locate(datasetType, dataId, write=
True)
1419 raise NoResults(
"No locations for put:", datasetType, dataId)
1420 for location
in locations:
1421 if isinstance(location, ButlerComposite):
1422 disassembler = location.disassembler
if location.disassembler
else genericDisassembler
1423 disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1424 for name, info
in location.componentInfo.items():
1425 if not info.inputOnly:
1426 self.
put(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1429 location.getRepository().backup(location.datasetType, dataId)
1430 location.getRepository().
write(location, obj)
1432 def subset(self, datasetType, level=None, dataId={}, **rest):
1433 """Return complete dataIds for a dataset type that match a partial (or empty) dataId. 1435 Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the 1436 dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or 1437 sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs. 1438 Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists(). 1442 datasetType - string 1443 The type of dataset collection to subset 1445 The level of dataId at which to subset. Use an empty string if the mapper should look up the 1450 Keyword arguments for the data id. 1454 subset - ButlerSubset 1455 Collection of ButlerDataRefs for datasets matching the data id. 1459 To print the full dataIds for all r-band measurements in a source catalog 1460 (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`): 1462 >>> subset = butler.subset('src', filter=
'r') 1463 >>> for data_ref
in subset: print(data_ref.dataId)
1465 datasetType = self._resolveDatasetTypeAlias(datasetType) 1467 # Currently expected behavior of subset is that if specified level is None then the mapper's default 1468 # level should be used. Convention for level within Butler is that an empty string is used to indicate 1473 dataId = DataId(dataId) 1474 dataId.update(**rest) 1475 return ButlerSubset(self, datasetType, level, dataId) 1477 def dataRef(self, datasetType, level=None, dataId={}, **rest): 1478 """Returns a single ButlerDataRef.
1480 Given a complete dataId specified
in dataId
and **rest, find the unique dataset at the given level
1481 specified by a dataId key (e.g. visit
or sensor
or amp
for a camera)
and return a ButlerDataRef.
1485 datasetType - string
1486 The type of dataset collection to reference
1488 The level of dataId at which to reference
1492 Keyword arguments
for the data id.
1496 dataRef - ButlerDataRef
1497 ButlerDataRef
for dataset matching the data id
1500 datasetType = self._resolveDatasetTypeAlias(datasetType) 1501 dataId = DataId(dataId) 1502 subset = self.subset(datasetType, level, dataId, **rest) 1503 if len(subset) != 1: 1504 raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" % 1505 (str(datasetType), str(level), str(dataId), str(rest))) 1506 return ButlerDataRef(subset, subset.cache[0]) 1508 def getUri(self, datasetType, dataId=None, write=False, **rest): 1509 """Return the URI
for a dataset
1511 .. warning:: This
is intended only
for debugging. The URI should
1512 never be used
for anything other than printing.
1514 .. note:: In the event there are multiple URIs
for read, we
return only
1517 .. note::
getUri() does
not currently support composite datasets.
1522 The dataset type of interest.
1523 dataId : `dict`, optional
1524 The data identifier.
1525 write : `bool`, optional
1526 Return the URI
for writing?
1527 rest : `dict`, optional
1528 Keyword arguments
for the data id.
1535 datasetType = self._resolveDatasetTypeAlias(datasetType) 1536 dataId = DataId(dataId) 1537 dataId.update(**rest) 1538 locations = self._locate(datasetType, dataId, write=write) 1539 if locations is None: 1540 raise NoResults("No locations for getUri: ", datasetType, dataId) 1543 # Follow the write path 1544 # Return the first valid write location. 1545 for location in locations: 1546 if isinstance(location, ButlerComposite): 1547 for name, info in location.componentInfo.items(): 1548 if not info.inputOnly: 1549 return self.getUri(info.datasetType, location.dataId, write=True) 1551 return location.getLocationsWithRoot()[0] 1552 # fall back to raise 1553 raise NoResults("No locations for getUri(write=True): ", datasetType, dataId) 1555 # Follow the read path, only return the first valid read 1556 return locations.getLocationsWithRoot()[0] 1558 def _read(self, location): 1559 """Unpersist an object using data inside a ButlerLocation
or ButlerComposite object.
1563 location : ButlerLocation
or ButlerComposite
1564 A ButlerLocation
or ButlerComposite instance populated with data needed to read the object.
1569 An instance of the object specified by the location.
1571 self.log.debug("Starting read from %s", location) 1573 if isinstance(location, ButlerComposite): 1574 for name, componentInfo in location.componentInfo.items(): 1575 if componentInfo.subset: 1576 subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId) 1577 componentInfo.obj = [obj.get() for obj in subset] 1579 obj = self.get(componentInfo.datasetType, location.dataId, immediate=True) 1580 componentInfo.obj = obj 1581 assembler = location.assembler or genericAssembler 1582 results = assembler(dataId=location.dataId, componentInfo=location.componentInfo, 1583 cls=location.python) 1586 results = location.repository.read(location) 1587 if len(results) == 1: 1588 results = results[0] 1589 self.log.debug("Ending read from %s", location) 1592 def __reduce__(self): 1593 ret = (_unreduce, (self._initArgs, self.datasetTypeAliasDict)) 1596 def _resolveDatasetTypeAlias(self, datasetType): 1597 """Replaces all the known alias keywords
in the given string with the alias value.
1601 datasetType - string
1602 A datasetType string to search & replace on
1606 datasetType - string
1607 The de-aliased string
1609 for key in self.datasetTypeAliasDict: 1610 # if all aliases have been replaced, bail out 1611 if datasetType.find('@') == -1: 1613 datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key]) 1615 # If an alias specifier can not be resolved then throw. 1616 if datasetType.find('@') != -1: 1617 raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType)) 1622 def _unreduce(initArgs, datasetTypeAliasDict): 1623 mapperArgs = initArgs.pop('mapperArgs') 1624 initArgs.update(mapperArgs) 1625 butler = Butler(**initArgs) 1626 butler.datasetTypeAliasDict = datasetTypeAliasDict
def write(self, patchRef, catalog)
Write the output.
def _buildLookupLists(self)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def _resolveDatasetTypeAlias(self, datasetType)
def datasetExists(self, datasetType, dataId={}, write=False, rest)
def _convertV1Args(self, root, mapper, mapperArgs)
def _setRepoDataTags(self)
def __init__(self, root=None, mapper=None, inputs=None, outputs=None, mapperArgs)
def setCfg(self, cfg, origin, root, isV1Repository)
def _getRepositoryCfg(self, repositoryArgs)
def getParentRepoDatas(self, context=None)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def _getCfgs(self, repoDataList)
def subset(self, datasetType, level=None, dataId={}, rest)
def __init__(self, cls, repoCfg)
def isNewRepository(self)
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def _read(self, location)
def _initRepo(self, repoData)
def _setDefaultMapper(self, repoDataList)
def getUri(self, datasetType, dataId=None, write=False, rest)
def defineAlias(self, alias, datasetType)
def _connectParentRepoDatas(self, repoDataList)
def __init__(self, repoDataList)
def _addParents(self, repoDataList)
def getKeys(self, datasetType=None, level=None, tag=None)
def _getBypassFunc(location, dataId)
def put(self, obj, datasetType, dataId={}, doBackup=False, rest)
def queryMetadata(self, datasetType, format, dataId={}, rest)
def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, mapperArgs)
def addParentRepoData(self, parentRepoData)
def _locate(self, datasetType, dataId, write)
def _getParentVal(repoData)
def _setAndVerifyParentsLists(self, repoDataList)
def get(self, datasetType, dataId=None, immediate=True, rest)
def __init__(self, args, role)