LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 import copy
29 import inspect
30 
31 import yaml
32 
33 from lsst.log import Log
34 from .deprecation import deprecate_class
35 from . import ReadProxy, ButlerSubset, ButlerDataRef, \
36  Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
37  RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
38  genericDisassembler, PosixStorage, ParentsMismatch
39 
40 preinitedMapperWarning = ("Passing an instantiated mapper into "
41  "Butler.__init__ will prevent Butler from passing "
42  "parentRegistry or repositoryCfg information to "
43  "the mapper, which is done only at init time. "
44  "It is better to pass a importable string or "
45  "class object.")
46 
47 
48 class ButlerCfg(Policy, yaml.YAMLObject):
49  """Represents a Butler configuration.
50 
51  .. warning::
52 
53  cfg is 'wet paint' and very likely to change. Use of it in production
54  code other than via the 'old butler' API is strongly discouraged.
55  """
56  yaml_tag = u"!ButlerCfg"
57 
58  def __init__(self, cls, repoCfg):
59  super().__init__({'repoCfg': repoCfg, 'cls': cls})
60 
61 
62 class RepoData:
63  """Container object for repository data used by Butler
64 
65  Parameters
66  ----------
67  args : RepositoryArgs
68  The arguments that are used to find or create the RepositoryCfg.
69  role : string
70  "input", "output", or "parent", indicating why Butler loaded this repository.
71  * input: the Repository was passed as a Butler input.
72  * output: the Repository was passed as a Butler output.
73  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
74 
75  Attributes
76  ----------
77  cfg: RepositoryCfg
78  The configuration for the Repository.
79 
80  _cfgOrigin : string
81  "new", "existing", or "nested". Indicates the origin of the repository and its RepositoryCfg:
82  * new: it was created by this instance of Butler, and this instance of Butler will generate the
83  RepositoryCfg file.
84  * existing: it was found (via the root or cfgRoot argument)
85  * nested: the full RepositoryCfg was nested in another RepositoryCfg's parents list (this can happen
86  if parameters of an input specified by RepositoryArgs or dict does not entirely match an existing
87  RepositoryCfg).
88 
89  cfgRoot : string
90  Path or URI to the location of the RepositoryCfg file.
91 
92  repo : lsst.daf.persistence.Repository
93  The Repository class instance.
94 
95  parentRepoDatas : list of RepoData
96  The parents of this Repository, as indicated this Repository's RepositoryCfg. If this is a new
97  Repository then these are the inputs to this Butler (and will be saved in the RepositoryCfg). These
98  RepoData objects are not owned by this RepoData, these are references to peer RepoData objects in the
99  Butler's RepoDataContainer.
100 
101  isV1Repository : bool
102  True if this is an Old Butler repository. In this case the repository does not have a RepositoryCfg
103  file. It may have a _mapper file and may have a _parent symlink. It will never be treated as a "new"
104  repository, i.e. even though there is not a RepositoryCfg file, one will not be generated.
105  If False, this is a New Butler repository and is specified by RepositoryCfg file.
106 
107  tags : set
108  These are values that may be used to restrict the search of input repositories. Details are available
109  in the RepositoryArgs and DataId classes.
110 
111  role : string
112  "input", "output", or "parent", indicating why Butler loaded this repository.
113  * input: the Repository was passed as a Butler input.
114  * output: the Repository was passed as a Butler output.
115  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
116 
117  _repoArgs : RepositoryArgs
118  Contains the arguments that were used to specify this Repository.
119  """
120 
121  def __init__(self, args, role):
122  self.cfgcfg = None
123  self._cfgOrigin_cfgOrigin = None
124  self.cfgRootcfgRoot = None
125  self.reporepo = None
126  self.parentRepoDatasparentRepoDatas = []
127  self.isV1RepositoryisV1Repository = False
128  self.tagstags = set()
129  self.rolerolerolerole = role
130  self.parentRegistryparentRegistry = None
131  self._repoArgs_repoArgs = args
132 
133  @property
134  def repoArgs(self):
135  return self._repoArgs_repoArgs
136 
137  @property
138  def repoData(self):
139  return self
140 
141  def __repr__(self):
142  return ("{}(id={},"
143  "repoArgs={}"
144  "cfg={!r},"
145  "cfgOrigin={},"
146  "cfgRoot={},"
147  "repo={},"
148  "parentRepoDatas={},"
149  "isV1Repository={},"
150  "role={},"
151  "parentRegistry={})").format(
152  self.__class__.__name__,
153  id(self),
154  self.repoArgsrepoArgs,
155  self.cfgcfg,
156  self.cfgOrigincfgOrigincfgOrigin,
157  self.cfgRootcfgRoot,
158  self.reporepo,
159  [id(p) for p in self.parentRepoDatasparentRepoDatas],
160  self.isV1RepositoryisV1Repository,
161  self.rolerolerolerole,
162  self.parentRegistryparentRegistry)
163 
164  def setCfg(self, cfg, origin, root, isV1Repository):
165  """Set information about the cfg into the RepoData
166 
167  Parameters
168  ----------
169  cfg : RepositoryCfg
170  The RepositoryCfg for the repo.
171  origin : string
172  'new', 'existing', or 'nested'
173  root : string
174  URI or absolute path to the location of the RepositoryCfg.yaml file.
175 
176  Returns
177  -------
178  None
179  """
180  if origin not in ('new', 'existing', 'nested'):
181  raise RuntimeError("Invalid value for origin:{}".format(origin))
182  self.cfgcfg = cfg
183  self._cfgOrigin_cfgOrigin = origin
184  self.cfgRootcfgRoot = root
185  self.isV1RepositoryisV1Repository = isV1Repository
186 
187  @property
188  def cfgOrigin(self):
189  return self._cfgOrigin_cfgOrigin
190 
191  @property
192  def isNewRepository(self):
193  return self.cfgOrigincfgOrigincfgOrigin == 'new'
194 
195  @property
196  def role(self):
197  return self._role_role
198 
199  @role.setter
200  def role(self, val):
201  if val not in ('input', 'output', 'parent'):
202  raise RuntimeError("Invalid value for role: {}".format(val))
203  self._role_role = val
204 
205  def getParentRepoDatas(self, context=None):
206  """Get the parents & grandparents etc of this repo data, in depth-first search order.
207 
208  Duplicate entries will be removed in cases where the same parent appears more than once in the parent
209  graph.
210 
211  Parameters
212  ----------
213  context : set, optional
214  Users should typically omit context and accept the default argument. Context is used to keep a set
215  of known RepoDatas when calling this function recursively, for duplicate elimination.
216 
217  Returns
218  -------
219  list of RepoData
220  A list of the parents & grandparents etc of a given repo data, in depth-first search order.
221  """
222  if context is None:
223  context = set()
224  parents = []
225  if id(self) in context:
226  return parents
227  context.add(id(self))
228  for parent in self.parentRepoDatasparentRepoDatas:
229  parents.append(parent)
230  parents += parent.getParentRepoDatas(context)
231  return parents
232 
233  def addParentRepoData(self, parentRepoData):
234  self.parentRepoDatasparentRepoDatas.append(parentRepoData)
235 
236  def addTags(self, tags):
237  self.tagstags = self.tagstags.union(tags)
238 
239 
241  """Container object for RepoData instances owned by a Butler instance.
242 
243  Parameters
244  ----------
245  repoDataList : list of RepoData
246  repoData - RepoData instance to add
247  """
248 
249  def __init__(self, repoDataList):
250  self._inputs_inputs = None
251  self._outputs_outputs = None
252  self._all_all = repoDataList
253  self._buildLookupLists_buildLookupLists()
254 
255  def inputs(self):
256  """Get a list of RepoData that are used to as inputs to the Butler.
257  The list is created lazily as needed, and cached.
258 
259  Returns
260  -------
261  A list of RepoData with readable repositories, in the order to be used when searching.
262  """
263  if self._inputs_inputs is None:
264  raise RuntimeError("Inputs not yet initialized.")
265  return self._inputs_inputs
266 
267  def outputs(self):
268  """Get a list of RepoData that are used to as outputs to the Butler.
269  The list is created lazily as needed, and cached.
270 
271  Returns
272  -------
273  A list of RepoData with writable repositories, in the order to be use when searching.
274  """
275  if self._outputs_outputs is None:
276  raise RuntimeError("Outputs not yet initialized.")
277  return self._outputs_outputs
278 
279  def all(self):
280  """Get a list of all RepoData that are used to as by the Butler.
281  The list is created lazily as needed, and cached.
282 
283  Returns
284  -------
285  A list of RepoData with writable repositories, in the order to be use when searching.
286  """
287  return self._all_all
288 
289  def __repr__(self):
290  return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
291  self.__class__.__name__,
292  self._inputs_inputs,
293  self._outputs_outputs,
294  self._all_all)
295 
296  def _buildLookupLists(self):
297  """Build the inputs and outputs lists based on the order of self.all()."""
298 
299  def addToList(repoData, lst):
300  """Add a repoData and each of its parents (depth first) to a list"""
301  if id(repoData) in alreadyAdded:
302  return
303  lst.append(repoData)
304  alreadyAdded.add(id(repoData))
305  for parent in repoData.parentRepoDatas:
306  addToList(parent, lst)
307 
308  if self._inputs_inputs is not None or self._outputs_outputs is not None:
309  raise RuntimeError("Lookup lists are already built.")
310  inputs = [repoData for repoData in self.allall() if repoData.role == 'input']
311  outputs = [repoData for repoData in self.allall() if repoData.role == 'output']
312  self._inputs_inputs = []
313  alreadyAdded = set()
314  for repoData in outputs:
315  if 'r' in repoData.repoArgs.mode:
316  addToList(repoData.repoData, self._inputs_inputs)
317  for repoData in inputs:
318  addToList(repoData.repoData, self._inputs_inputs)
319  self._outputs_outputs = [repoData.repoData for repoData in outputs]
320 
321 
322 @deprecate_class
323 class Butler:
324  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
325 
326  A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
327  intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the
328  object containing the data. For example, an ExposureF object might be used to hold the data for a raw
329  image, a post-ISR image, a calibrated science image, or a difference image. These would all be different
330  dataset types.
331 
332  A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if
333  given a partial data identifier. It can check for the existence of a file containing a dataset given its
334  type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
335  an appropriate location when given its associated data identifier.
336 
337  Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is
338  lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved
339  and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not
340  using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
341  function, contained in the input mapper object, must perform any necessary manipulations to force the
342  retrieved object to conform to standards, including translating metadata.
343 
344  Public methods:
345 
346  __init__(self, root, mapper=None, **mapperArgs)
347 
348  defineAlias(self, alias, datasetType)
349 
350  getKeys(self, datasetType=None, level=None)
351 
352  getDatasetTypes(self)
353 
354  queryMetadata(self, datasetType, format=None, dataId={}, **rest)
355 
356  datasetExists(self, datasetType, dataId={}, **rest)
357 
358  get(self, datasetType, dataId={}, immediate=False, **rest)
359 
360  put(self, obj, datasetType, dataId={}, **rest)
361 
362  subset(self, datasetType, level=None, dataId={}, **rest)
363 
364  dataRef(self, datasetType, level=None, dataId={}, **rest)
365 
366  Initialization:
367 
368  The preferred method of initialization is to use the `inputs` and `outputs` __init__ parameters. These
369  are described in the parameters section, below.
370 
371  For backward compatibility: this initialization method signature can take a posix root path, and
372  optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
373  argument. However, for this to work in a backward compatible way it creates a single repository that is
374  used as both an input and an output repository. This is NOT preferred, and will likely break any
375  provenance system we have in place.
376 
377  Parameters
378  ----------
379  root : string
380  .. note:: Deprecated in 12_0
381  `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for
382  multiple-repository support.
383  A file system path. Will only work with a PosixRepository.
384  mapper : string or instance
385  .. note:: Deprecated in 12_0
386  `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for
387  multiple-repository support.
388  Provides a mapper to be used with Butler.
389  mapperArgs : dict
390  .. note:: Deprecated in 12_0
391  `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for
392  multiple-repository support.
393  Provides arguments to be passed to the mapper if the mapper input argument is a class type to be
394  instantiated by Butler.
395  inputs : RepositoryArgs, dict, or string
396  Can be a single item or a list. Provides arguments to load an existing repository (or repositories).
397  String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local
398  file system URI does not have to start with 'file://' and in this way can be a relative path). The
399  `RepositoryArgs` class can be used to provide more parameters with which to initialize a repository
400  (such as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation for more
401  details). A dict may be used as shorthand for a `RepositoryArgs` class instance. The dict keys must
402  match parameters to the `RepositoryArgs.__init__` function.
403  outputs : RepositoryArgs, dict, or string
404  Provides arguments to load one or more existing repositories or create new ones. The different types
405  are handled the same as for `inputs`.
406 
407  The Butler init sequence loads all of the input and output repositories.
408  This creates the object hierarchy to read from and write to them. Each
409  repository can have 0 or more parents, which also get loaded as inputs.
410  This becomes a DAG of repositories. Ultimately, Butler creates a list of
411  these Repositories in the order that they are used.
412 
413  Initialization Sequence
414  =======================
415 
416  During initialization Butler creates a Repository class instance & support structure for each object
417  passed to `inputs` and `outputs` as well as the parent repositories recorded in the `RepositoryCfg` of
418  each existing readable repository.
419 
420  This process is complex. It is explained below to shed some light on the intent of each step.
421 
422  1. Input Argument Standardization
423  ---------------------------------
424 
425  In `Butler._processInputArguments` the input arguments are verified to be legal (and a RuntimeError is
426  raised if not), and they are converted into an expected format that is used for the rest of the Butler
427  init sequence. See the docstring for `_processInputArguments`.
428 
429  2. Create RepoData Objects
430  --------------------------
431 
432  Butler uses an object, called `RepoData`, to keep track of information about each repository; each
433  repository is contained in a single `RepoData`. The attributes are explained in its docstring.
434 
435  After `_processInputArguments`, a RepoData is instantiated and put in a list for each repository in
436  `outputs` and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output and input
437  repositories (but not parent repositories) that this Butler instance will use.
438 
439  3. Get `RepositoryCfg`s
440  -----------------------
441 
442  `Butler._getCfgs` gets the `RepositoryCfg` for each repository the `repoDataList`. The behavior is
443  described in the docstring.
444 
445  4. Add Parents
446  --------------
447 
448  `Butler._addParents` then considers the parents list in the `RepositoryCfg` of each `RepoData` in the
449  `repoDataList` and inserts new `RepoData` objects for each parent not represented in the proper location
450  in the `repoDataList`. Ultimately a flat list is built to represent the DAG of readable repositories
451  represented in depth-first order.
452 
453  5. Set and Verify Parents of Outputs
454  ------------------------------------
455 
456  To be able to load parent repositories when output repositories are used as inputs, the input repositories
457  are recorded as parents in the `RepositoryCfg` file of new output repositories. When an output repository
458  already exists, for consistency the Butler's inputs must match the list of parents specified the already-
459  existing output repository's `RepositoryCfg` file.
460 
461  In `Butler._setAndVerifyParentsLists`, the list of parents is recorded in the `RepositoryCfg` of new
462  repositories. For existing repositories the list of parents is compared with the `RepositoryCfg`'s parents
463  list, and if they do not match a `RuntimeError` is raised.
464 
465  6. Set the Default Mapper
466  -------------------------
467 
468  If all the input repositories use the same mapper then we can assume that mapper to be the
469  "default mapper". If there are new output repositories whose `RepositoryArgs` do not specify a mapper and
470  there is a default mapper then the new output repository will be set to use that default mapper.
471 
472  This is handled in `Butler._setDefaultMapper`.
473 
474  7. Cache References to Parent RepoDatas
475  ---------------------------------------
476 
477  In `Butler._connectParentRepoDatas`, in each `RepoData` in `repoDataList`, a list of `RepoData` object
478  references is built that matches the parents specified in that `RepoData`'s `RepositoryCfg`.
479 
480  This list is used later to find things in that repository's parents, without considering peer repository's
481  parents. (e.g. finding the registry of a parent)
482 
483  8. Set Tags
484  -----------
485 
486  Tags are described at https://ldm-463.lsst.io/v/draft/#tagging
487 
488  In `Butler._setRepoDataTags`, for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded
489  in a set, and added to the tags set in each of its parents, for ease of lookup when mapping.
490 
491  9. Find Parent Registry and Instantiate RepoData
492  ------------------------------------------------
493 
494  At this point there is enough information to instantiate the `Repository` instances. There is one final
495  step before instantiating the Repository, which is to try to get a parent registry that can be used by the
496  child repository. The criteria for "can be used" is spelled out in `Butler._setParentRegistry`. However,
497  to get the registry from the parent, the parent must be instantiated. The `repoDataList`, in depth-first
498  search order, is built so that the most-dependent repositories are first, and the least dependent
499  repositories are last. So the `repoDataList` is reversed and the Repositories are instantiated in that
500  order; for each RepoData a parent registry is searched for, and then the Repository is instantiated with
501  whatever registry could be found."""
502 
503  GENERATION = 2
504  """This is a Generation 2 Butler.
505  """
506 
507  def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
508  self._initArgs_initArgs = {'root': root, 'mapper': mapper, 'inputs': inputs, 'outputs': outputs,
509  'mapperArgs': mapperArgs}
510 
511  self.loglog = Log.getLogger("daf.persistence.butler")
512 
513  inputs, outputs = self._processInputArguments_processInputArguments(
514  root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
515 
516  # convert the RepoArgs into RepoData
517  inputs = [RepoData(args, 'input') for args in inputs]
518  outputs = [RepoData(args, 'output') for args in outputs]
519  repoDataList = outputs + inputs
520 
521  self._getCfgs_getCfgs(repoDataList)
522 
523  self._addParents_addParents(repoDataList)
524 
525  self._setAndVerifyParentsLists_setAndVerifyParentsLists(repoDataList)
526 
527  self._setDefaultMapper_setDefaultMapper(repoDataList)
528 
529  self._connectParentRepoDatas_connectParentRepoDatas(repoDataList)
530 
531  self._repos_repos = RepoDataContainer(repoDataList)
532 
533  self._setRepoDataTags_setRepoDataTags()
534 
535  for repoData in repoDataList:
536  self._initRepo_initRepo(repoData)
537 
538  def _initRepo(self, repoData):
539  if repoData.repo is not None:
540  # this repository may have already been initialized by its children, in which case there is
541  # nothing more to do.
542  return
543  for parentRepoData in repoData.parentRepoDatas:
544  if parentRepoData.cfg.mapper != repoData.cfg.mapper:
545  continue
546  if parentRepoData.repo is None:
547  self._initRepo_initRepo(parentRepoData)
548  parentRegistry = parentRepoData.repo.getRegistry()
549  repoData.parentRegistry = parentRegistry if parentRegistry else parentRepoData.parentRegistry
550  if repoData.parentRegistry:
551  break
552  repoData.repo = Repository(repoData)
553 
554  def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
555  """Process, verify, and standardize the input arguments.
556  * Inputs can not be for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs)
557  `root`, `mapper`, and `mapperArgs` are Old Butler init API.
558  `inputs` and `outputs` are New Butler init API.
559  Old Butler and New Butler init API may not be mixed, Butler may be initialized with only the Old
560  arguments or the New arguments.
561  * Verify that if there is a readable output that there is exactly one output. (This restriction is in
562  place because all readable repositories must be parents of writable repositories, and for
563  consistency the DAG of readable repositories must always be the same. Keeping the list of parents
564  becomes very complicated in the presence of multiple readable output repositories. It is better to
565  only write to output repositories, and then create a new Butler instance and use the outputs as
566  inputs, and write to new output repositories.)
567  * Make a copy of inputs & outputs so they may be modified without changing the passed-in arguments.
568  * Convert any input/output values that are URI strings to RepositoryArgs.
569  * Listify inputs & outputs.
570  * Set default RW mode on inputs & outputs as needed.
571 
572  Parameters
573  ----------
574  Same as Butler.__init__
575 
576  Returns
577  -------
578  (list of RepositoryArgs, list of RepositoryArgs)
579  First item is a list to use as inputs.
580  Second item is a list to use as outputs.
581 
582  Raises
583  ------
584  RuntimeError
585  If Old Butler and New Butler arguments are both used this will raise.
586  If an output is readable there is more than one output this will raise.
587  """
588  # inputs and outputs may be modified, do not change the external value.
589  inputs = copy.deepcopy(inputs)
590  outputs = copy.deepcopy(outputs)
591 
592  isV1Args = inputs is None and outputs is None
593  if isV1Args:
594  inputs, outputs = self._convertV1Args_convertV1Args(root=root,
595  mapper=mapper,
596  mapperArgs=mapperArgs or None)
597  elif root or mapper or mapperArgs:
598  raise RuntimeError(
599  'Butler version 1 API (root, mapper, **mapperArgs) may '
600  'not be used with version 2 API (inputs, outputs)')
601  self.datasetTypeAliasDictdatasetTypeAliasDict = {}
602 
603  self.storagestorage = Storage()
604 
605  # make sure inputs and outputs are lists, and if list items are a string convert it RepositoryArgs.
606  inputs = listify(inputs)
607  outputs = listify(outputs)
608  inputs = [RepositoryArgs(cfgRoot=args)
609  if not isinstance(args, RepositoryArgs) else args for args in inputs]
610  outputs = [RepositoryArgs(cfgRoot=args)
611  if not isinstance(args, RepositoryArgs) else args for args in outputs]
612  # Set the default value of inputs & outputs, verify the required values ('r' for inputs, 'w' for
613  # outputs) and remove the 'w' from inputs if needed.
614  for args in inputs:
615  if args.mode is None:
616  args.mode = 'r'
617  elif 'rw' == args.mode:
618  args.mode = 'r'
619  elif 'r' != args.mode:
620  raise RuntimeError("The mode of an input should be readable.")
621  for args in outputs:
622  if args.mode is None:
623  args.mode = 'w'
624  elif 'w' not in args.mode:
625  raise RuntimeError("The mode of an output should be writable.")
626  # check for class instances in args.mapper (not allowed)
627  for args in inputs + outputs:
628  if (args.mapper and not isinstance(args.mapper, str)
629  and not inspect.isclass(args.mapper)):
630  self.loglog.warn(preinitedMapperWarning)
631  # if the output is readable, there must be only one output:
632  for o in outputs:
633  if 'r' in o.mode:
634  if len(outputs) > 1:
635  raise RuntimeError("Butler does not support multiple output repositories if any of the "
636  "outputs are readable.")
637 
638  # Handle the case where the output is readable and is also passed in as one of the inputs by removing
639  # the input. This supports a legacy use case in pipe_tasks where the input is also passed as the
640  # output, to the command line parser.
641  def inputIsInOutputs(inputArgs, outputArgsList):
642  for o in outputArgsList:
643  if ('r' in o.mode
644  and o.root == inputArgs.root
645  and o.mapper == inputArgs.mapper
646  and o.mapperArgs == inputArgs.mapperArgs
647  and o.tags == inputArgs.tags
648  and o.policy == inputArgs.policy):
649  self.loglog.debug(("Input repositoryArgs {} is also listed in outputs as readable; "
650  "throwing away the input.").format(inputArgs))
651  return True
652  return False
653 
654  inputs = [args for args in inputs if not inputIsInOutputs(args, outputs)]
655  return inputs, outputs
656 
657  @staticmethod
658  def _getParentVal(repoData):
659  """Get the value of this repoData as it should appear in the parents
660  list of other repositories"""
661  if repoData.isV1Repository:
662  return repoData.cfg
663  if repoData.cfgOrigin == 'nested':
664  return repoData.cfg
665  else:
666  return repoData.cfg.root
667 
668  @staticmethod
669  def _getParents(ofRepoData, repoInfo):
670  """Create a parents list of repoData from inputs and (readable) outputs."""
671  parents = []
672  # get the parents list of repoData:
673  for repoData in repoInfo:
674  if repoData is ofRepoData:
675  continue
676  if 'r' not in repoData.repoArgs.mode:
677  continue
678  parents.append(Butler._getParentVal(repoData))
679  return parents
680 
681  @staticmethod
682  def _getOldButlerRepositoryCfg(repositoryArgs):
683  if not Storage.isPosix(repositoryArgs.cfgRoot):
684  return None
685  if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
686  return None
687  if not repositoryArgs.mapper:
688  repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
689  cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
690  parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
691  if parent:
692  parent = Butler._getOldButlerRepositoryCfg(RepositoryArgs(cfgRoot=parent, mode='r'))
693  if parent is not None:
694  cfg.addParents([parent])
695  return cfg
696 
697  def _getRepositoryCfg(self, repositoryArgs):
698  """Try to get a repository from the location described by cfgRoot.
699 
700  Parameters
701  ----------
702  repositoryArgs : RepositoryArgs or string
703  Provides arguments to load an existing repository (or repositories). String is assumed to be a URI
704  and is used as the cfgRoot (URI to the location of the cfg file).
705 
706  Returned
707  --------
708  (RepositoryCfg or None, bool)
709  The RepositoryCfg, or None if one cannot be found, and True if the RepositoryCfg was created by
710  reading an Old Butler repository, or False if it is a New Butler Repository.
711  """
712  if not isinstance(repositoryArgs, RepositoryArgs):
713  repositoryArgs = RepositoryArgs(cfgRoot=repositoryArgs, mode='r')
714 
715  cfg = self.storagestorage.getRepositoryCfg(repositoryArgs.cfgRoot)
716  isOldButlerRepository = False
717  if cfg is None:
718  cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
719  if cfg is not None:
720  isOldButlerRepository = True
721  return cfg, isOldButlerRepository
722 
723  def _getCfgs(self, repoDataList):
724  """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData.
725  If the cfg exists, compare values. If values match then use the cfg as an "existing" cfg. If the
726  values do not match, use the cfg as a "nested" cfg.
727  If the cfg does not exist, the RepositoryArgs must be for a writable repository.
728 
729  Parameters
730  ----------
731  repoDataList : list of RepoData
732  The RepoData that are output and inputs of this Butler
733 
734  Raises
735  ------
736  RuntimeError
737  If the passed-in RepositoryArgs indicate an existing repository but other cfg parameters in those
738  RepositoryArgs don't
739  match the existing repository's cfg a RuntimeError will be raised.
740  """
741  def cfgMatchesArgs(args, cfg):
742  """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg"""
743  if args.mapper is not None and cfg.mapper != args.mapper:
744  return False
745  if args.mapperArgs is not None and cfg.mapperArgs != args.mapperArgs:
746  return False
747  if args.policy is not None and cfg.policy != args.policy:
748  return False
749  return True
750 
751  for repoData in repoDataList:
752  cfg, isOldButlerRepository = self._getRepositoryCfg_getRepositoryCfg(repoData.repoArgs)
753  if cfg is None:
754  if 'w' not in repoData.repoArgs.mode:
755  raise RuntimeError(
756  "No cfg found for read-only input repository at {}".format(repoData.repoArgs.cfgRoot))
757  repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
758  origin='new',
759  root=repoData.repoArgs.cfgRoot,
760  isV1Repository=isOldButlerRepository)
761  else:
762 
763  # This is a hack fix for an issue introduced by DM-11284; Old Butler parent repositories used
764  # to be stored as a path to the repository in the parents list and it was changed so that the
765  # whole RepositoryCfg, that described the Old Butler repository (including the mapperArgs that
766  # were used with it), was recorded as a "nested" repository cfg. That checkin did not account
767  # for the fact that there were repositoryCfg.yaml files in the world with only the path to
768  # Old Butler repositories in the parents list.
769  if cfg.parents:
770  for i, parent in enumerate(cfg.parents):
771  if isinstance(parent, RepositoryCfg):
772  continue
773  parentCfg, parentIsOldButlerRepository = self._getRepositoryCfg_getRepositoryCfg(parent)
774  if parentIsOldButlerRepository:
775  parentCfg.mapperArgs = cfg.mapperArgs
776  self.loglog.info(("Butler is replacing an Old Butler parent repository path '{}' "
777  "found in the parents list of a New Butler repositoryCfg: {} "
778  "with a repositoryCfg that includes the child repository's "
779  "mapperArgs: {}. This affects the instantiated RepositoryCfg "
780  "but does not change the persisted child repositoryCfg.yaml file."
781  ).format(parent, cfg, parentCfg))
782  cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
783 
784  if 'w' in repoData.repoArgs.mode:
785  # if it's an output repository, the RepositoryArgs must match the existing cfg.
786  if not cfgMatchesArgs(repoData.repoArgs, cfg):
787  raise RuntimeError(("The RepositoryArgs and RepositoryCfg must match for writable "
788  "repositories, RepositoryCfg:{}, RepositoryArgs:{}").format(
789  cfg, repoData.repoArgs))
790  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
791  isV1Repository=isOldButlerRepository)
792  else:
793  # if it's an input repository, the cfg can overwrite the in-repo cfg.
794  if cfgMatchesArgs(repoData.repoArgs, cfg):
795  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
796  isV1Repository=isOldButlerRepository)
797  else:
798  repoData.setCfg(cfg=cfg, origin='nested', root=None,
799  isV1Repository=isOldButlerRepository)
800 
801  def _addParents(self, repoDataList):
802  """For each repoData in the input list, see if its parents are the next items in the list, and if not
803  add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n.
804 
805  Parameters
806  ----------
807  repoDataList : list of RepoData
808  The RepoData for the Butler outputs + inputs.
809 
810  Raises
811  ------
812  RuntimeError
813  Raised if a RepositoryCfg can not be found at a location where a parent repository should be.
814  """
815  repoDataIdx = 0
816  while True:
817  if repoDataIdx == len(repoDataList):
818  break
819  repoData = repoDataList[repoDataIdx]
820  if 'r' not in repoData.repoArgs.mode:
821  repoDataIdx += 1
822  continue # the repoData only needs parents if it's readable.
823  if repoData.isNewRepository:
824  repoDataIdx += 1
825  continue # if it's new the parents will be the inputs of this butler.
826  if repoData.cfg.parents is None:
827  repoDataIdx += 1
828  continue # if there are no parents then there's nothing to do.
829  for repoParentIdx, repoParent in enumerate(repoData.cfg.parents):
830  parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
831  if not isinstance(repoParent, RepositoryCfg):
832  repoParentCfg, isOldButlerRepository = self._getRepositoryCfg_getRepositoryCfg(repoParent)
833  if repoParentCfg is not None:
834  cfgOrigin = 'existing'
835  else:
836  isOldButlerRepository = False
837  repoParentCfg = repoParent
838  cfgOrigin = 'nested'
839  if (parentIdxInRepoDataList < len(repoDataList)
840  and repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
841  continue
842  args = RepositoryArgs(cfgRoot=repoParentCfg.root, mode='r')
843  role = 'input' if repoData.role == 'output' else 'parent'
844  newRepoInfo = RepoData(args, role)
845  newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
846  isV1Repository=isOldButlerRepository)
847  repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
848  repoDataIdx += 1
849 
850  def _setAndVerifyParentsLists(self, repoDataList):
851  """Make a list of all the input repositories of this Butler, these are the parents of the outputs.
852  For new output repositories, set the parents in the RepositoryCfg. For existing output repositories
853  verify that the RepositoryCfg's parents match the parents list.
854 
855  Parameters
856  ----------
857  repoDataList : list of RepoData
858  All the RepoDatas loaded by this butler, in search order.
859 
860  Raises
861  ------
862  RuntimeError
863  If an existing output repository is loaded and its parents do not match the parents of this Butler
864  an error will be raised.
865  """
866  def getIOParents(ofRepoData, repoDataList):
867  """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable
868  outputs (not parents-of-parents) of this butler"""
869  parents = []
870  for repoData in repoDataList:
871  if repoData.role == 'parent':
872  continue
873  if repoData is ofRepoData:
874  continue
875  if repoData.role == 'output':
876  if 'r' in repoData.repoArgs.mode:
877  raise RuntimeError("If an output is readable it must be the only output.")
878  # and if this is the only output, this should have continued in
879  # "if repoData is ofRepoData"
880  continue
881  parents.append(self._getParentVal_getParentVal(repoData))
882  return parents
883 
884  for repoData in repoDataList:
885  if repoData.role != 'output':
886  continue
887  parents = getIOParents(repoData, repoDataList)
888  # if repoData is new, add the parent RepositoryCfgs to it.
889  if repoData.cfgOrigin == 'new':
890  repoData.cfg.addParents(parents)
891  elif repoData.cfgOrigin in ('existing', 'nested'):
892  if repoData.cfg.parents != parents:
893  try:
894  repoData.cfg.extendParents(parents)
895  except ParentsMismatch as e:
896  raise RuntimeError(("Inputs of this Butler:{} do not match parents of existing "
897  "writable cfg:{} (ParentMismatch exception: {}").format(
898  parents, repoData.cfg.parents, e))
899 
900  def _setDefaultMapper(self, repoDataList):
901  """Establish a default mapper if there is one and assign it to outputs that do not have a mapper
902  assigned.
903 
904  If all inputs have the same mapper it will be used as the default mapper.
905 
906  Parameters
907  ----------
908  repoDataList : list of RepoData
909  All the RepoDatas loaded by this butler, in search order.
910 
911  Raises
912  ------
913  RuntimeError
914  If a default mapper can not be established and there is an output that does not have a mapper.
915  """
916  needyOutputs = [rd for rd in repoDataList if rd.role == 'output' and rd.cfg.mapper is None]
917  if len(needyOutputs) == 0:
918  return
919  mappers = set([rd.cfg.mapper for rd in repoDataList if rd.role == 'input'])
920  if len(mappers) != 1:
921  inputs = [rd for rd in repoDataList if rd.role == 'input']
922  raise RuntimeError(
923  ("No default mapper could be established from inputs:{} and no mapper specified "
924  "for outputs:{}").format(inputs, needyOutputs))
925  defaultMapper = mappers.pop()
926  for repoData in needyOutputs:
927  repoData.cfg.mapper = defaultMapper
928 
929  def _connectParentRepoDatas(self, repoDataList):
930  """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it.
931 
932  Parameters
933  ----------
934  repoDataList : list of RepoData
935  All the RepoDatas loaded by this butler, in search order.
936 
937  Raises
938  ------
939  RuntimeError
940  When a parent is listed in the parents list but not found in the repoDataList. This is not
941  expected to ever happen and would indicate an internal Butler error.
942  """
943  for repoData in repoDataList:
944  for parent in repoData.cfg.parents:
945  parentToAdd = None
946  for otherRepoData in repoDataList:
947  if isinstance(parent, RepositoryCfg):
948  if otherRepoData.repoData.repoData.cfg == parent:
949  parentToAdd = otherRepoData.repoData
950  break
951  elif otherRepoData.repoData.cfg.root == parent:
952  parentToAdd = otherRepoData.repoData
953  break
954  if parentToAdd is None:
955  raise RuntimeError(
956  "Could not find a parent matching {} to add to {}".format(parent, repoData))
957  repoData.addParentRepoData(parentToAdd)
958 
959  @staticmethod
960  def _getParentRepoData(parent, repoDataList):
961  """get a parent RepoData from a cfg from a list of RepoData
962 
963  Parameters
964  ----------
965  parent : string or RepositoryCfg
966  cfgRoot of a repo or a cfg that describes the repo
967  repoDataList : list of RepoData
968  list to search in
969 
970  Returns
971  -------
972  RepoData or None
973  A RepoData if one can be found, else None
974  """
975  repoData = None
976  for otherRepoData in repoDataList:
977  if isinstance(parent, RepositoryCfg):
978  if otherRepoData.cfg == parent:
979  repoData = otherRepoData
980  break
981  elif otherRepoData.cfg.root == parent:
982  repoData = otherRepoData
983  break
984  return repoData
985 
986  def _setRepoDataTags(self):
987  """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
988  searches."""
989  def setTags(repoData, tags, context):
990  if id(repoData) in context:
991  return
992  repoData.addTags(tags)
993  context.add(id(repoData))
994  for parentRepoData in repoData.parentRepoDatas:
995  setTags(parentRepoData, tags, context)
996  for repoData in self._repos_repos.outputs() + self._repos_repos.inputs():
997  setTags(repoData.repoData, repoData.repoArgs.tags, set())
998 
999  def _convertV1Args(self, root, mapper, mapperArgs):
1000  """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs
1001  (inputs, outputs)
1002 
1003  Parameters
1004  ----------
1005  root : string
1006  Posix path to repository root
1007  mapper : class, class instance, or string
1008  Instantiated class, a class object to be instantiated, or a string that refers to a class that
1009  can be imported & used as the mapper.
1010  mapperArgs : dict
1011  RepositoryArgs & their values used when instantiating the mapper.
1012 
1013  Returns
1014  -------
1015  tuple
1016  (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__
1017  """
1018  if (mapper and not isinstance(mapper, str)
1019  and not inspect.isclass(mapper)):
1020  self.loglog.warn(preinitedMapperWarning)
1021  inputs = None
1022  if root is None:
1023  if hasattr(mapper, 'root'):
1024  # in legacy repositories, the mapper may be given the root directly.
1025  root = mapper.root
1026  else:
1027  # in the past root="None" could be used to mean root='.'
1028  root = '.'
1029  outputs = RepositoryArgs(mode='rw',
1030  root=root,
1031  mapper=mapper,
1032  mapperArgs=mapperArgs)
1033  return inputs, outputs
1034 
1035  def __repr__(self):
1036  return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1037  self.datasetTypeAliasDictdatasetTypeAliasDict, self._repos_repos)
1038 
1039  def _getDefaultMapper(self):
1040 
1041  """Get the default mapper. Currently this means if all the repositories use exactly the same mapper,
1042  that mapper may be considered the default.
1043 
1044  This definition may be changing; mappers may be able to exclude themselves as candidates for default,
1045  and they may nominate a different mapper instead. Also, we may not want to look at *all* the
1046  repositories, but only a depth-first search on each of the input & output repositories, and use the
1047  first-found mapper for each of those. TBD.
1048 
1049  Parameters
1050  ----------
1051  inputs : TYPE
1052  Description
1053 
1054  Returns
1055  -------
1056  Mapper class or None
1057  Returns the class type of the default mapper, or None if a default
1058  mapper can not be determined.
1059  """
1060  defaultMapper = None
1061 
1062  for inputRepoData in self._repos_repos.inputs():
1063  mapper = None
1064  if inputRepoData.cfg.mapper is not None:
1065  mapper = inputRepoData.cfg.mapper
1066  # if the mapper is:
1067  # * a string, import it.
1068  # * a class instance, get its class type
1069  # * a class, do nothing; use it
1070  if isinstance(mapper, str):
1071  mapper = doImport(mapper)
1072  elif not inspect.isclass(mapper):
1073  mapper = mapper.__class__
1074  # If no mapper has been found, note the first found mapper.
1075  # Then, if a mapper has been found and each next mapper matches it,
1076  # continue looking for mappers.
1077  # If a mapper has been found and another non-matching mapper is
1078  # found then we have no default, return None.
1079  if defaultMapper is None:
1080  defaultMapper = mapper
1081  elif mapper == defaultMapper:
1082  continue
1083  elif mapper is not None:
1084  return None
1085  return defaultMapper
1086 
1087  def _assignDefaultMapper(self, defaultMapper):
1088  for repoData in self._repos_repos.all().values():
1089  if repoData.cfg.mapper is None and (repoData.isNewRepository or repoData.isV1Repository):
1090  if defaultMapper is None:
1091  raise RuntimeError(
1092  "No mapper specified for %s and no default mapper could be determined." %
1093  repoData.args)
1094  repoData.cfg.mapper = defaultMapper
1095 
1096  @staticmethod
1097  def getMapperClass(root):
1098  """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at
1099  that location or in a parent location.
1100 
1101  As we abstract the storage and support different types of storage locations this method will be
1102  moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
1103  return Storage.getMapperClass(root)
1104 
1105  def defineAlias(self, alias, datasetType):
1106  """Register an alias that will be substituted in datasetTypes.
1107 
1108  Parameters
1109  ----------
1110  alias - string
1111  The alias keyword. It may start with @ or not. It may not contain @ except as the first character.
1112  datasetType - string
1113  The string that will be substituted when @alias is passed into datasetType. It may not contain '@'
1114  """
1115  # verify formatting of alias:
1116  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
1117  atLoc = alias.rfind('@')
1118  if atLoc == -1:
1119  alias = "@" + str(alias)
1120  elif atLoc > 0:
1121  raise RuntimeError("Badly formatted alias string: %s" % (alias,))
1122 
1123  # verify that datasetType does not contain '@'
1124  if datasetType.count('@') != 0:
1125  raise RuntimeError("Badly formatted type string: %s" % (datasetType))
1126 
1127  # verify that the alias keyword does not start with another alias keyword,
1128  # and vice versa
1129  for key in self.datasetTypeAliasDictdatasetTypeAliasDict:
1130  if key.startswith(alias) or alias.startswith(key):
1131  raise RuntimeError("Alias: %s overlaps with existing alias: %s" % (alias, key))
1132 
1133  self.datasetTypeAliasDictdatasetTypeAliasDict[alias] = datasetType
1134 
1135  def getKeys(self, datasetType=None, level=None, tag=None):
1136  """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
1137  entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
1138  float, string).
1139 
1140  Parameters
1141  ----------
1142  datasetType - string
1143  The type of dataset to get keys for, entire collection if None.
1144  level - string
1145  The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the
1146  mapper should lookup the default level.
1147  tags - any, or list of any
1148  If tag is specified then the repo will only be used if the tag
1149  or a tag in the list matches a tag used for that repository.
1150 
1151  Returns
1152  -------
1153  Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for
1154  the dataset type or the entire collection if None. The dict values are the basic Python types
1155  corresponding to the keys (int, float, string).
1156  """
1157  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1158 
1159  keys = None
1160  tag = setify(tag)
1161  for repoData in self._repos_repos.inputs():
1162  if not tag or len(tag.intersection(repoData.tags)) > 0:
1163  keys = repoData.repo.getKeys(datasetType, level)
1164  # An empty dict is a valid "found" condition for keys. The only value for keys that should
1165  # cause the search to continue is None
1166  if keys is not None:
1167  break
1168  return keys
1169 
1170  def getDatasetTypes(self, tag=None):
1171  """Get the valid dataset types for all known repos or those matching
1172  the tags.
1173 
1174  Parameters
1175  ----------
1176  tag - any, or list of any
1177  If tag is specified then the repo will only be used if the tag
1178  or a tag in the list matches a tag used for that repository.
1179 
1180  Returns
1181  -------
1182  Returns the dataset types as a set of strings.
1183  """
1184  datasetTypes = set()
1185  tag = setify(tag)
1186  for repoData in self._repos_repos.outputs() + self._repos_repos.inputs():
1187  if not tag or len(tag.intersection(repoData.tags)) > 0:
1188  datasetTypes = datasetTypes.union(
1189  repoData.repo.mappers()[0].getDatasetTypes())
1190  return datasetTypes
1191 
1192  def queryMetadata(self, datasetType, format, dataId={}, **rest):
1193  """Returns the valid values for one or more keys when given a partial
1194  input collection data id.
1195 
1196  Parameters
1197  ----------
1198  datasetType - string
1199  The type of dataset to inquire about.
1200  format - str, tuple
1201  Key or tuple of keys to be returned.
1202  dataId - DataId, dict
1203  The partial data id.
1204  **rest -
1205  Keyword arguments for the partial data id.
1206 
1207  Returns
1208  -------
1209  A list of valid values or tuples of valid values as specified by the
1210  format.
1211  """
1212 
1213  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1214  dataId = DataId(dataId)
1215  dataId.update(**rest)
1216  format = sequencify(format)
1217 
1218  tuples = None
1219  for repoData in self._repos_repos.inputs():
1220  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
1221  tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1222  if tuples:
1223  break
1224 
1225  if not tuples:
1226  return []
1227 
1228  if len(format) == 1:
1229  ret = []
1230  for x in tuples:
1231  try:
1232  ret.append(x[0])
1233  except TypeError:
1234  ret.append(x)
1235  return ret
1236 
1237  return tuples
1238 
1239  def datasetExists(self, datasetType, dataId={}, write=False, **rest):
1240  """Determines if a dataset file exists.
1241 
1242  Parameters
1243  ----------
1244  datasetType - string
1245  The type of dataset to inquire about.
1246  dataId - DataId, dict
1247  The data id of the dataset.
1248  write - bool
1249  If True, look only in locations where the dataset could be written,
1250  and return True only if it is present in all of them.
1251  **rest keyword arguments for the data id.
1252 
1253  Returns
1254  -------
1255  exists - bool
1256  True if the dataset exists or is non-file-based.
1257  """
1258  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1259  dataId = DataId(dataId)
1260  dataId.update(**rest)
1261  locations = self._locate_locate(datasetType, dataId, write=write)
1262  if not write: # when write=False, locations is not a sequence
1263  if locations is None:
1264  return False
1265  locations = [locations]
1266 
1267  if not locations: # empty list
1268  return False
1269 
1270  for location in locations:
1271  # If the location is a ButlerComposite (as opposed to a ButlerLocation),
1272  # verify the component objects exist.
1273  if isinstance(location, ButlerComposite):
1274  for name, componentInfo in location.componentInfo.items():
1275  if componentInfo.subset:
1276  subset = self.subsetsubset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1277  exists = all([obj.datasetExists() for obj in subset])
1278  else:
1279  exists = self.datasetExistsdatasetExists(componentInfo.datasetType, location.dataId)
1280  if exists is False:
1281  return False
1282  else:
1283  if not location.repository.exists(location):
1284  return False
1285  return True
1286 
1287  def _locate(self, datasetType, dataId, write):
1288  """Get one or more ButlerLocations and/or ButlercComposites.
1289 
1290  Parameters
1291  ----------
1292  datasetType : string
1293  The datasetType that is being searched for. The datasetType may be followed by a dot and
1294  a component name (component names are specified in the policy). IE datasetType.componentName
1295 
1296  dataId : dict or DataId class instance
1297  The dataId
1298 
1299  write : bool
1300  True if this is a search to write an object. False if it is a search to read an object. This
1301  affects what type (an object or a container) is returned.
1302 
1303  Returns
1304  -------
1305  If write is False, will return either a single object or None. If write is True, will return a list
1306  (which may be empty)
1307  """
1308  repos = self._repos_repos.outputs() if write else self._repos_repos.inputs()
1309  locations = []
1310  for repoData in repos:
1311  # enforce dataId & repository tags when reading:
1312  if not write and dataId.tag and len(dataId.tag.intersection(repoData.tags)) == 0:
1313  continue
1314  components = datasetType.split('.')
1315  datasetType = components[0]
1316  components = components[1:]
1317  try:
1318  location = repoData.repo.map(datasetType, dataId, write=write)
1319  except NoResults:
1320  continue
1321  if location is None:
1322  continue
1323  location.datasetType = datasetType # todo is there a better way than monkey patching here?
1324  if len(components) > 0:
1325  if not isinstance(location, ButlerComposite):
1326  raise RuntimeError("The location for a dotted datasetType must be a composite.")
1327  # replace the first component name with the datasetType
1328  components[0] = location.componentInfo[components[0]].datasetType
1329  # join components back into a dot-delimited string
1330  datasetType = '.'.join(components)
1331  location = self._locate_locate(datasetType, dataId, write)
1332  # if a component location is not found, we can not continue with this repo, move to next repo.
1333  if location is None:
1334  break
1335  # if reading, only one location is desired.
1336  if location:
1337  if not write:
1338  # If there is a bypass function for this dataset type, we can't test to see if the object
1339  # exists in storage, because the bypass function may not actually use the location
1340  # according to the template. Instead, execute the bypass function and include its results
1341  # in the bypass attribute of the location. The bypass function may fail for any reason,
1342  # the most common case being that a file does not exist. If it raises an exception
1343  # indicating such, we ignore the bypass function and proceed as though it does not exist.
1344  if hasattr(location.mapper, "bypass_" + location.datasetType):
1345  bypass = self._getBypassFunc_getBypassFunc(location, dataId)
1346  try:
1347  bypass = bypass()
1348  location.bypass = bypass
1349  except (NoResults, IOError):
1350  self.loglog.debug("Continuing dataset search while evaluating "
1351  "bypass function for Dataset type:{} Data ID:{} at "
1352  "location {}".format(datasetType, dataId, location))
1353  # If a location was found but the location does not exist, keep looking in input
1354  # repositories (the registry may have had enough data for a lookup even thought the object
1355  # exists in a different repository.)
1356  if (isinstance(location, ButlerComposite) or hasattr(location, 'bypass')
1357  or location.repository.exists(location)):
1358  return location
1359  else:
1360  try:
1361  locations.extend(location)
1362  except TypeError:
1363  locations.append(location)
1364  if not write:
1365  return None
1366  return locations
1367 
1368  @staticmethod
1369  def _getBypassFunc(location, dataId):
1370  pythonType = location.getPythonType()
1371  if pythonType is not None:
1372  if isinstance(pythonType, str):
1373  pythonType = doImport(pythonType)
1374  bypassFunc = getattr(location.mapper, "bypass_" + location.datasetType)
1375  return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1376 
1377  def get(self, datasetType, dataId=None, immediate=True, **rest):
1378  """Retrieves a dataset given an input collection data id.
1379 
1380  Parameters
1381  ----------
1382  datasetType - string
1383  The type of dataset to retrieve.
1384  dataId - dict
1385  The data id.
1386  immediate - bool
1387  If False use a proxy for delayed loading.
1388  **rest
1389  keyword arguments for the data id.
1390 
1391  Returns
1392  -------
1393  An object retrieved from the dataset (or a proxy for one).
1394  """
1395  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1396  dataId = DataId(dataId)
1397  dataId.update(**rest)
1398 
1399  location = self._locate_locate(datasetType, dataId, write=False)
1400  if location is None:
1401  raise NoResults("No locations for get:", datasetType, dataId)
1402  self.loglog.debug("Get type=%s keys=%s from %s", datasetType, dataId, str(location))
1403 
1404  if hasattr(location, 'bypass'):
1405  # this type loader block should get moved into a helper someplace, and duplications removed.
1406  def callback():
1407  return location.bypass
1408  else:
1409  def callback():
1410  return self._read_read(location)
1411  if location.mapper.canStandardize(location.datasetType):
1412  innerCallback = callback
1413 
1414  def callback():
1415  return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1416  if immediate:
1417  return callback()
1418  return ReadProxy(callback)
1419 
1420  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1421  """Persists a dataset given an output collection data id.
1422 
1423  Parameters
1424  ----------
1425  obj -
1426  The object to persist.
1427  datasetType - string
1428  The type of dataset to persist.
1429  dataId - dict
1430  The data id.
1431  doBackup - bool
1432  If True, rename existing instead of overwriting.
1433  WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race
1434  conditions.
1435  **rest
1436  Keyword arguments for the data id.
1437  """
1438  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1439  dataId = DataId(dataId)
1440  dataId.update(**rest)
1441 
1442  locations = self._locate_locate(datasetType, dataId, write=True)
1443  if not locations:
1444  raise NoResults("No locations for put:", datasetType, dataId)
1445  for location in locations:
1446  if isinstance(location, ButlerComposite):
1447  disassembler = location.disassembler if location.disassembler else genericDisassembler
1448  disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1449  for name, info in location.componentInfo.items():
1450  if not info.inputOnly:
1451  self.putput(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1452  else:
1453  if doBackup:
1454  location.getRepository().backup(location.datasetType, dataId)
1455  location.getRepository().write(location, obj)
1456 
1457  def subset(self, datasetType, level=None, dataId={}, **rest):
1458  """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
1459 
1460  Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the
1461  dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or
1462  sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs.
1463  Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists().
1464 
1465  Parameters
1466  ----------
1467  datasetType - string
1468  The type of dataset collection to subset
1469  level - string
1470  The level of dataId at which to subset. Use an empty string if the mapper should look up the
1471  default level.
1472  dataId - dict
1473  The data id.
1474  **rest
1475  Keyword arguments for the data id.
1476 
1477  Returns
1478  -------
1479  subset - ButlerSubset
1480  Collection of ButlerDataRefs for datasets matching the data id.
1481 
1482  Examples
1483  -----------
1484  To print the full dataIds for all r-band measurements in a source catalog
1485  (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`):
1486 
1487  >>> subset = butler.subset('src', filter='r')
1488  >>> for data_ref in subset: print(data_ref.dataId)
1489  """
1490  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1491 
1492  # Currently expected behavior of subset is that if specified level is None then the mapper's default
1493  # level should be used. Convention for level within Butler is that an empty string is used to indicate
1494  # 'get default'.
1495  if level is None:
1496  level = ''
1497 
1498  dataId = DataId(dataId)
1499  dataId.update(**rest)
1500  return ButlerSubset(self, datasetType, level, dataId)
1501 
1502  def dataRef(self, datasetType, level=None, dataId={}, **rest):
1503  """Returns a single ButlerDataRef.
1504 
1505  Given a complete dataId specified in dataId and **rest, find the unique dataset at the given level
1506  specified by a dataId key (e.g. visit or sensor or amp for a camera) and return a ButlerDataRef.
1507 
1508  Parameters
1509  ----------
1510  datasetType - string
1511  The type of dataset collection to reference
1512  level - string
1513  The level of dataId at which to reference
1514  dataId - dict
1515  The data id.
1516  **rest
1517  Keyword arguments for the data id.
1518 
1519  Returns
1520  -------
1521  dataRef - ButlerDataRef
1522  ButlerDataRef for dataset matching the data id
1523  """
1524 
1525  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1526  dataId = DataId(dataId)
1527  subset = self.subsetsubset(datasetType, level, dataId, **rest)
1528  if len(subset) != 1:
1529  raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
1530  (str(datasetType), str(level), str(dataId), str(rest)))
1531  return ButlerDataRef(subset, subset.cache[0])
1532 
1533  def getUri(self, datasetType, dataId=None, write=False, **rest):
1534  """Return the URI for a dataset
1535 
1536  .. warning:: This is intended only for debugging. The URI should
1537  never be used for anything other than printing.
1538 
1539  .. note:: In the event there are multiple URIs for read, we return only
1540  the first.
1541 
1542  .. note:: getUri() does not currently support composite datasets.
1543 
1544  Parameters
1545  ----------
1546  datasetType : `str`
1547  The dataset type of interest.
1548  dataId : `dict`, optional
1549  The data identifier.
1550  write : `bool`, optional
1551  Return the URI for writing?
1552  rest : `dict`, optional
1553  Keyword arguments for the data id.
1554 
1555  Returns
1556  -------
1557  uri : `str`
1558  URI for dataset.
1559  """
1560  datasetType = self._resolveDatasetTypeAlias_resolveDatasetTypeAlias(datasetType)
1561  dataId = DataId(dataId)
1562  dataId.update(**rest)
1563  locations = self._locate_locate(datasetType, dataId, write=write)
1564  if locations is None:
1565  raise NoResults("No locations for getUri: ", datasetType, dataId)
1566 
1567  if write:
1568  # Follow the write path
1569  # Return the first valid write location.
1570  for location in locations:
1571  if isinstance(location, ButlerComposite):
1572  for name, info in location.componentInfo.items():
1573  if not info.inputOnly:
1574  return self.getUrigetUri(info.datasetType, location.dataId, write=True)
1575  else:
1576  return location.getLocationsWithRoot()[0]
1577  # fall back to raise
1578  raise NoResults("No locations for getUri(write=True): ", datasetType, dataId)
1579  else:
1580  # Follow the read path, only return the first valid read
1581  return locations.getLocationsWithRoot()[0]
1582 
1583  def _read(self, location):
1584  """Unpersist an object using data inside a ButlerLocation or ButlerComposite object.
1585 
1586  Parameters
1587  ----------
1588  location : ButlerLocation or ButlerComposite
1589  A ButlerLocation or ButlerComposite instance populated with data needed to read the object.
1590 
1591  Returns
1592  -------
1593  object
1594  An instance of the object specified by the location.
1595  """
1596  self.loglog.debug("Starting read from %s", location)
1597 
1598  if isinstance(location, ButlerComposite):
1599  for name, componentInfo in location.componentInfo.items():
1600  if componentInfo.subset:
1601  subset = self.subsetsubset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1602  componentInfo.obj = [obj.get() for obj in subset]
1603  else:
1604  obj = self.getget(componentInfo.datasetType, location.dataId, immediate=True)
1605  componentInfo.obj = obj
1606  assembler = location.assembler or genericAssembler
1607  results = assembler(dataId=location.dataId, componentInfo=location.componentInfo,
1608  cls=location.python)
1609  return results
1610  else:
1611  results = location.repository.read(location)
1612  if len(results) == 1:
1613  results = results[0]
1614  self.loglog.debug("Ending read from %s", location)
1615  return results
1616 
1617  def __reduce__(self):
1618  ret = (_unreduce, (self._initArgs_initArgs, self.datasetTypeAliasDictdatasetTypeAliasDict))
1619  return ret
1620 
1621  def _resolveDatasetTypeAlias(self, datasetType):
1622  """Replaces all the known alias keywords in the given string with the alias value.
1623 
1624  Parameters
1625  ----------
1626  datasetType - string
1627  A datasetType string to search & replace on
1628 
1629  Returns
1630  -------
1631  datasetType - string
1632  The de-aliased string
1633  """
1634  for key in self.datasetTypeAliasDictdatasetTypeAliasDict:
1635  # if all aliases have been replaced, bail out
1636  if datasetType.find('@') == -1:
1637  break
1638  datasetType = datasetType.replace(key, self.datasetTypeAliasDictdatasetTypeAliasDict[key])
1639 
1640  # If an alias specifier can not be resolved then throw.
1641  if datasetType.find('@') != -1:
1642  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType))
1643 
1644  return datasetType
1645 
1646 
1647 def _unreduce(initArgs, datasetTypeAliasDict):
1648  mapperArgs = initArgs.pop('mapperArgs')
1649  initArgs.update(mapperArgs)
1650  butler = Butler(**initArgs)
1651  butler.datasetTypeAliasDict = datasetTypeAliasDict
1652  return butler
table::Key< int > id
Definition: Detector.cc:162
def __init__(self, cls, repoCfg)
Definition: butler.py:58
def _initRepo(self, repoData)
Definition: butler.py:538
def _resolveDatasetTypeAlias(self, datasetType)
Definition: butler.py:1621
def queryMetadata(self, datasetType, format, dataId={}, **rest)
Definition: butler.py:1192
def getKeys(self, datasetType=None, level=None, tag=None)
Definition: butler.py:1135
def datasetExists(self, datasetType, dataId={}, write=False, **rest)
Definition: butler.py:1239
def _getCfgs(self, repoDataList)
Definition: butler.py:723
def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
Definition: butler.py:507
def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
Definition: butler.py:554
def dataRef(self, datasetType, level=None, dataId={}, **rest)
Definition: butler.py:1502
def getUri(self, datasetType, dataId=None, write=False, **rest)
Definition: butler.py:1533
def _addParents(self, repoDataList)
Definition: butler.py:801
def subset(self, datasetType, level=None, dataId={}, **rest)
Definition: butler.py:1457
def _convertV1Args(self, root, mapper, mapperArgs)
Definition: butler.py:999
def defineAlias(self, alias, datasetType)
Definition: butler.py:1105
def get(self, datasetType, dataId=None, immediate=True, **rest)
Definition: butler.py:1377
def _setDefaultMapper(self, repoDataList)
Definition: butler.py:900
def getDatasetTypes(self, tag=None)
Definition: butler.py:1170
def put(self, obj, datasetType, dataId={}, doBackup=False, **rest)
Definition: butler.py:1420
def _locate(self, datasetType, dataId, write)
Definition: butler.py:1287
def _getRepositoryCfg(self, repositoryArgs)
Definition: butler.py:697
def _getBypassFunc(location, dataId)
Definition: butler.py:1369
def _connectParentRepoDatas(self, repoDataList)
Definition: butler.py:929
def _setAndVerifyParentsLists(self, repoDataList)
Definition: butler.py:850
def __init__(self, args, role)
Definition: butler.py:121
def setCfg(self, cfg, origin, root, isV1Repository)
Definition: butler.py:164
def addParentRepoData(self, parentRepoData)
Definition: butler.py:233
def getParentRepoDatas(self, context=None)
Definition: butler.py:205
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
void write(OutputArchiveHandle &handle) const override
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
Definition: Log.h:717
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174