LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 import copy
29 import inspect
30 
31 import yaml
32 
33 from lsst.log import Log
34 from . import ReadProxy, ButlerSubset, ButlerDataRef, \
35  Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
36  RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
37  genericDisassembler, PosixStorage, ParentsMismatch
38 
39 preinitedMapperWarning = ("Passing an instantiated mapper into " +
40  "Butler.__init__ will prevent Butler from passing " +
41  "parentRegistry or repositoryCfg information to " +
42  "the mapper, which is done only at init time. " +
43  "It is better to pass a importable string or " +
44  "class object.")
45 
46 
47 class ButlerCfg(Policy, yaml.YAMLObject):
48  """Represents a Butler configuration.
49 
50  .. warning::
51 
52  cfg is 'wet paint' and very likely to change. Use of it in production
53  code other than via the 'old butler' API is strongly discouraged.
54  """
55  yaml_tag = u"!ButlerCfg"
56 
57  def __init__(self, cls, repoCfg):
58  super().__init__({'repoCfg': repoCfg, 'cls': cls})
59 
60 
61 class RepoData:
62  """Container object for repository data used by Butler
63 
64  Parameters
65  ----------
66  args : RepositoryArgs
67  The arguments that are used to find or create the RepositoryCfg.
68  role : string
69  "input", "output", or "parent", indicating why Butler loaded this repository.
70  * input: the Repository was passed as a Butler input.
71  * output: the Repository was passed as a Butler output.
72  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
73 
74  Attributes
75  ----------
76  cfg: RepositoryCfg
77  The configuration for the Repository.
78 
79  _cfgOrigin : string
80  "new", "existing", or "nested". Indicates the origin of the repository and its RepositoryCfg:
81  * new: it was created by this instance of Butler, and this instance of Butler will generate the
82  RepositoryCfg file.
83  * existing: it was found (via the root or cfgRoot argument)
84  * nested: the full RepositoryCfg was nested in another RepositoryCfg's parents list (this can happen
85  if parameters of an input specified by RepositoryArgs or dict does not entirely match an existing
86  RepositoryCfg).
87 
88  cfgRoot : string
89  Path or URI to the location of the RepositoryCfg file.
90 
91  repo : lsst.daf.persistence.Repository
92  The Repository class instance.
93 
94  parentRepoDatas : list of RepoData
95  The parents of this Repository, as indicated this Repository's RepositoryCfg. If this is a new
96  Repository then these are the inputs to this Butler (and will be saved in the RepositoryCfg). These
97  RepoData objects are not owned by this RepoData, these are references to peer RepoData objects in the
98  Butler's RepoDataContainer.
99 
100  isV1Repository : bool
101  True if this is an Old Butler repository. In this case the repository does not have a RepositoryCfg
102  file. It may have a _mapper file and may have a _parent symlink. It will never be treated as a "new"
103  repository, i.e. even though there is not a RepositoryCfg file, one will not be generated.
104  If False, this is a New Butler repository and is specified by RepositoryCfg file.
105 
106  tags : set
107  These are values that may be used to restrict the search of input repositories. Details are available
108  in the RepositoryArgs and DataId classes.
109 
110  role : string
111  "input", "output", or "parent", indicating why Butler loaded this repository.
112  * input: the Repository was passed as a Butler input.
113  * output: the Repository was passed as a Butler output.
114  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
115 
116  _repoArgs : RepositoryArgs
117  Contains the arguments that were used to specify this Repository.
118  """
119 
120  def __init__(self, args, role):
121  self.cfg = None
122  self._cfgOrigin = None
123  self.cfgRoot = None
124  self.repo = None
125  self.parentRepoDatas = []
126  self.isV1Repository = False
127  self.tags = set()
128  self.role = role
129  self.parentRegistry = None
130  self._repoArgs = args
131 
132  @property
133  def repoArgs(self):
134  return self._repoArgs
135 
136  @property
137  def repoData(self):
138  return self
139 
140  def __repr__(self):
141  return ("{}(id={},"
142  "repoArgs={}"
143  "cfg={!r},"
144  "cfgOrigin={},"
145  "cfgRoot={}," +
146  "repo={},"
147  "parentRepoDatas={}," +
148  "isV1Repository={},"
149  "role={}," +
150  "parentRegistry={})").format(
151  self.__class__.__name__,
152  id(self),
153  self.repoArgs,
154  self.cfg,
155  self.cfgOrigin,
156  self.cfgRoot,
157  self.repo,
158  [id(p) for p in self.parentRepoDatas],
159  self.isV1Repository,
160  self.role,
161  self.parentRegistry)
162 
163  def setCfg(self, cfg, origin, root, isV1Repository):
164  """Set information about the cfg into the RepoData
165 
166  Parameters
167  ----------
168  cfg : RepositoryCfg
169  The RepositoryCfg for the repo.
170  origin : string
171  'new', 'existing', or 'nested'
172  root : string
173  URI or absolute path to the location of the RepositoryCfg.yaml file.
174 
175  Returns
176  -------
177  None
178  """
179  if origin not in ('new', 'existing', 'nested'):
180  raise RuntimeError("Invalid value for origin:{}".format(origin))
181  self.cfg = cfg
182  self._cfgOrigin = origin
183  self.cfgRoot = root
184  self.isV1Repository = isV1Repository
185 
186  @property
187  def cfgOrigin(self):
188  return self._cfgOrigin
189 
190  @property
191  def isNewRepository(self):
192  return self.cfgOrigin == 'new'
193 
194  @property
195  def role(self):
196  return self._role
197 
198  @role.setter
199  def role(self, val):
200  if val not in ('input', 'output', 'parent'):
201  raise RuntimeError("Invalid value for role: {}".format(val))
202  self._role = val
203 
204  def getParentRepoDatas(self, context=None):
205  """Get the parents & grandparents etc of this repo data, in depth-first search order.
206 
207  Duplicate entries will be removed in cases where the same parent appears more than once in the parent
208  graph.
209 
210  Parameters
211  ----------
212  context : set, optional
213  Users should typically omit context and accept the default argument. Context is used to keep a set
214  of known RepoDatas when calling this function recursively, for duplicate elimination.
215 
216  Returns
217  -------
218  list of RepoData
219  A list of the parents & grandparents etc of a given repo data, in depth-first search order.
220  """
221  if context is None:
222  context = set()
223  parents = []
224  if id(self) in context:
225  return parents
226  context.add(id(self))
227  for parent in self.parentRepoDatas:
228  parents.append(parent)
229  parents += parent.getParentRepoDatas(context)
230  return parents
231 
232  def addParentRepoData(self, parentRepoData):
233  self.parentRepoDatas.append(parentRepoData)
234 
235  def addTags(self, tags):
236  self.tags = self.tags.union(tags)
237 
238 
240  """Container object for RepoData instances owned by a Butler instance.
241 
242  Parameters
243  ----------
244  repoDataList : list of RepoData
245  repoData - RepoData instance to add
246  """
247 
248  def __init__(self, repoDataList):
249  self._inputs = None
250  self._outputs = None
251  self._all = repoDataList
252  self._buildLookupLists()
253 
254  def inputs(self):
255  """Get a list of RepoData that are used to as inputs to the Butler.
256  The list is created lazily as needed, and cached.
257 
258  Returns
259  -------
260  A list of RepoData with readable repositories, in the order to be used when searching.
261  """
262  if self._inputs is None:
263  raise RuntimeError("Inputs not yet initialized.")
264  return self._inputs
265 
266  def outputs(self):
267  """Get a list of RepoData that are used to as outputs to the Butler.
268  The list is created lazily as needed, and cached.
269 
270  Returns
271  -------
272  A list of RepoData with writable repositories, in the order to be use when searching.
273  """
274  if self._outputs is None:
275  raise RuntimeError("Outputs not yet initialized.")
276  return self._outputs
277 
278  def all(self):
279  """Get a list of all RepoData that are used to as by the Butler.
280  The list is created lazily as needed, and cached.
281 
282  Returns
283  -------
284  A list of RepoData with writable repositories, in the order to be use when searching.
285  """
286  return self._all
287 
288  def __repr__(self):
289  return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
290  self.__class__.__name__,
291  self._inputs,
292  self._outputs,
293  self._all)
294 
295  def _buildLookupLists(self):
296  """Build the inputs and outputs lists based on the order of self.all()."""
297 
298  def addToList(repoData, lst):
299  """Add a repoData and each of its parents (depth first) to a list"""
300  if id(repoData) in alreadyAdded:
301  return
302  lst.append(repoData)
303  alreadyAdded.add(id(repoData))
304  for parent in repoData.parentRepoDatas:
305  addToList(parent, lst)
306 
307  if self._inputs is not None or self._outputs is not None:
308  raise RuntimeError("Lookup lists are already built.")
309  inputs = [repoData for repoData in self.all() if repoData.role == 'input']
310  outputs = [repoData for repoData in self.all() if repoData.role == 'output']
311  self._inputs = []
312  alreadyAdded = set()
313  for repoData in outputs:
314  if 'r' in repoData.repoArgs.mode:
315  addToList(repoData.repoData, self._inputs)
316  for repoData in inputs:
317  addToList(repoData.repoData, self._inputs)
318  self._outputs = [repoData.repoData for repoData in outputs]
319 
320 
321 class Butler:
322  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
323 
324  A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
325  intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the
326  object containing the data. For example, an ExposureF object might be used to hold the data for a raw
327  image, a post-ISR image, a calibrated science image, or a difference image. These would all be different
328  dataset types.
329 
330  A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if
331  given a partial data identifier. It can check for the existence of a file containing a dataset given its
332  type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
333  an appropriate location when given its associated data identifier.
334 
335  Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is
336  lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved
337  and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not
338  using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
339  function, contained in the input mapper object, must perform any necessary manipulations to force the
340  retrieved object to conform to standards, including translating metadata.
341 
342  Public methods:
343 
344  __init__(self, root, mapper=None, **mapperArgs)
345 
346  defineAlias(self, alias, datasetType)
347 
348  getKeys(self, datasetType=None, level=None)
349 
350  queryMetadata(self, datasetType, format=None, dataId={}, **rest)
351 
352  datasetExists(self, datasetType, dataId={}, **rest)
353 
354  get(self, datasetType, dataId={}, immediate=False, **rest)
355 
356  put(self, obj, datasetType, dataId={}, **rest)
357 
358  subset(self, datasetType, level=None, dataId={}, **rest)
359 
360  dataRef(self, datasetType, level=None, dataId={}, **rest)
361 
362  Initialization:
363 
364  The preferred method of initialization is to use the `inputs` and `outputs` __init__ parameters. These
365  are described in the parameters section, below.
366 
367  For backward compatibility: this initialization method signature can take a posix root path, and
368  optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
369  argument. However, for this to work in a backward compatible way it creates a single repository that is
370  used as both an input and an output repository. This is NOT preferred, and will likely break any
371  provenance system we have in place.
372 
373  Parameters
374  ----------
375  root : string
376  .. note:: Deprecated in 12_0
377  `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for
378  multiple-repository support.
379  A file system path. Will only work with a PosixRepository.
380  mapper : string or instance
381  .. note:: Deprecated in 12_0
382  `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for
383  multiple-repository support.
384  Provides a mapper to be used with Butler.
385  mapperArgs : dict
386  .. note:: Deprecated in 12_0
387  `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for
388  multiple-repository support.
389  Provides arguments to be passed to the mapper if the mapper input argument is a class type to be
390  instantiated by Butler.
391  inputs : RepositoryArgs, dict, or string
392  Can be a single item or a list. Provides arguments to load an existing repository (or repositories).
393  String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local
394  file system URI does not have to start with 'file://' and in this way can be a relative path). The
395  `RepositoryArgs` class can be used to provide more parameters with which to initialize a repository
396  (such as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation for more
397  details). A dict may be used as shorthand for a `RepositoryArgs` class instance. The dict keys must
398  match parameters to the `RepositoryArgs.__init__` function.
399  outputs : RepositoryArgs, dict, or string
400  Provides arguments to load one or more existing repositories or create new ones. The different types
401  are handled the same as for `inputs`.
402 
403  The Butler init sequence loads all of the input and output repositories.
404  This creates the object hierarchy to read from and write to them. Each
405  repository can have 0 or more parents, which also get loaded as inputs.
406  This becomes a DAG of repositories. Ultimately, Butler creates a list of
407  these Repositories in the order that they are used.
408 
409  Initialization Sequence
410  =======================
411 
412  During initialization Butler creates a Repository class instance & support structure for each object
413  passed to `inputs` and `outputs` as well as the parent repositories recorded in the `RepositoryCfg` of
414  each existing readable repository.
415 
416  This process is complex. It is explained below to shed some light on the intent of each step.
417 
418  1. Input Argument Standardization
419  ---------------------------------
420 
421  In `Butler._processInputArguments` the input arguments are verified to be legal (and a RuntimeError is
422  raised if not), and they are converted into an expected format that is used for the rest of the Butler
423  init sequence. See the docstring for `_processInputArguments`.
424 
425  2. Create RepoData Objects
426  --------------------------
427 
428  Butler uses an object, called `RepoData`, to keep track of information about each repository; each
429  repository is contained in a single `RepoData`. The attributes are explained in its docstring.
430 
431  After `_processInputArguments`, a RepoData is instantiated and put in a list for each repository in
432  `outputs` and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output and input
433  repositories (but not parent repositories) that this Butler instance will use.
434 
435  3. Get `RepositoryCfg`s
436  -----------------------
437 
438  `Butler._getCfgs` gets the `RepositoryCfg` for each repository the `repoDataList`. The behavior is
439  described in the docstring.
440 
441  4. Add Parents
442  --------------
443 
444  `Butler._addParents` then considers the parents list in the `RepositoryCfg` of each `RepoData` in the
445  `repoDataList` and inserts new `RepoData` objects for each parent not represented in the proper location
446  in the `repoDataList`. Ultimately a flat list is built to represent the DAG of readable repositories
447  represented in depth-first order.
448 
449  5. Set and Verify Parents of Outputs
450  ------------------------------------
451 
452  To be able to load parent repositories when output repositories are used as inputs, the input repositories
453  are recorded as parents in the `RepositoryCfg` file of new output repositories. When an output repository
454  already exists, for consistency the Butler's inputs must match the list of parents specified the already-
455  existing output repository's `RepositoryCfg` file.
456 
457  In `Butler._setAndVerifyParentsLists`, the list of parents is recorded in the `RepositoryCfg` of new
458  repositories. For existing repositories the list of parents is compared with the `RepositoryCfg`'s parents
459  list, and if they do not match a `RuntimeError` is raised.
460 
461  6. Set the Default Mapper
462  -------------------------
463 
464  If all the input repositories use the same mapper then we can assume that mapper to be the
465  "default mapper". If there are new output repositories whose `RepositoryArgs` do not specify a mapper and
466  there is a default mapper then the new output repository will be set to use that default mapper.
467 
468  This is handled in `Butler._setDefaultMapper`.
469 
470  7. Cache References to Parent RepoDatas
471  ---------------------------------------
472 
473  In `Butler._connectParentRepoDatas`, in each `RepoData` in `repoDataList`, a list of `RepoData` object
474  references is built that matches the parents specified in that `RepoData`'s `RepositoryCfg`.
475 
476  This list is used later to find things in that repository's parents, without considering peer repository's
477  parents. (e.g. finding the registry of a parent)
478 
479  8. Set Tags
480  -----------
481 
482  Tags are described at https://ldm-463.lsst.io/v/draft/#tagging
483 
484  In `Butler._setRepoDataTags`, for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded
485  in a set, and added to the tags set in each of its parents, for ease of lookup when mapping.
486 
487  9. Find Parent Registry and Instantiate RepoData
488  ------------------------------------------------
489 
490  At this point there is enough information to instantiate the `Repository` instances. There is one final
491  step before instantiating the Repository, which is to try to get a parent registry that can be used by the
492  child repository. The criteria for "can be used" is spelled out in `Butler._setParentRegistry`. However,
493  to get the registry from the parent, the parent must be instantiated. The `repoDataList`, in depth-first
494  search order, is built so that the most-dependent repositories are first, and the least dependent
495  repositories are last. So the `repoDataList` is reversed and the Repositories are instantiated in that
496  order; for each RepoData a parent registry is searched for, and then the Repository is instantiated with
497  whatever registry could be found."""
498 
499  GENERATION = 2
500  """This is a Generation 2 Butler.
501  """
502 
503  def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
504  self._initArgs = {'root': root, 'mapper': mapper, 'inputs': inputs, 'outputs': outputs,
505  'mapperArgs': mapperArgs}
506 
507  self.log = Log.getLogger("daf.persistence.butler")
508 
509  inputs, outputs = self._processInputArguments(
510  root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
511 
512  # convert the RepoArgs into RepoData
513  inputs = [RepoData(args, 'input') for args in inputs]
514  outputs = [RepoData(args, 'output') for args in outputs]
515  repoDataList = outputs + inputs
516 
517  self._getCfgs(repoDataList)
518 
519  self._addParents(repoDataList)
520 
521  self._setAndVerifyParentsLists(repoDataList)
522 
523  self._setDefaultMapper(repoDataList)
524 
525  self._connectParentRepoDatas(repoDataList)
526 
527  self._repos = RepoDataContainer(repoDataList)
528 
529  self._setRepoDataTags()
530 
531  for repoData in repoDataList:
532  self._initRepo(repoData)
533 
534  def _initRepo(self, repoData):
535  if repoData.repo is not None:
536  # this repository may have already been initialized by its children, in which case there is
537  # nothing more to do.
538  return
539  for parentRepoData in repoData.parentRepoDatas:
540  if parentRepoData.cfg.mapper != repoData.cfg.mapper:
541  continue
542  if parentRepoData.repo is None:
543  self._initRepo(parentRepoData)
544  parentRegistry = parentRepoData.repo.getRegistry()
545  repoData.parentRegistry = parentRegistry if parentRegistry else parentRepoData.parentRegistry
546  if repoData.parentRegistry:
547  break
548  repoData.repo = Repository(repoData)
549 
550  def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
551  """Process, verify, and standardize the input arguments.
552  * Inputs can not be for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs)
553  `root`, `mapper`, and `mapperArgs` are Old Butler init API.
554  `inputs` and `outputs` are New Butler init API.
555  Old Butler and New Butler init API may not be mixed, Butler may be initialized with only the Old
556  arguments or the New arguments.
557  * Verify that if there is a readable output that there is exactly one output. (This restriction is in
558  place because all readable repositories must be parents of writable repositories, and for
559  consistency the DAG of readable repositories must always be the same. Keeping the list of parents
560  becomes very complicated in the presence of multiple readable output repositories. It is better to
561  only write to output repositories, and then create a new Butler instance and use the outputs as
562  inputs, and write to new output repositories.)
563  * Make a copy of inputs & outputs so they may be modified without changing the passed-in arguments.
564  * Convert any input/output values that are URI strings to RepositoryArgs.
565  * Listify inputs & outputs.
566  * Set default RW mode on inputs & outputs as needed.
567 
568  Parameters
569  ----------
570  Same as Butler.__init__
571 
572  Returns
573  -------
574  (list of RepositoryArgs, list of RepositoryArgs)
575  First item is a list to use as inputs.
576  Second item is a list to use as outputs.
577 
578  Raises
579  ------
580  RuntimeError
581  If Old Butler and New Butler arguments are both used this will raise.
582  If an output is readable there is more than one output this will raise.
583  """
584  # inputs and outputs may be modified, do not change the external value.
585  inputs = copy.deepcopy(inputs)
586  outputs = copy.deepcopy(outputs)
587 
588  isV1Args = inputs is None and outputs is None
589  if isV1Args:
590  inputs, outputs = self._convertV1Args(root=root,
591  mapper=mapper,
592  mapperArgs=mapperArgs or None)
593  elif root or mapper or mapperArgs:
594  raise RuntimeError(
595  'Butler version 1 API (root, mapper, **mapperArgs) may ' +
596  'not be used with version 2 API (inputs, outputs)')
598 
599  self.storage = Storage()
600 
601  # make sure inputs and outputs are lists, and if list items are a string convert it RepositoryArgs.
602  inputs = listify(inputs)
603  outputs = listify(outputs)
604  inputs = [RepositoryArgs(cfgRoot=args)
605  if not isinstance(args, RepositoryArgs) else args for args in inputs]
606  outputs = [RepositoryArgs(cfgRoot=args)
607  if not isinstance(args, RepositoryArgs) else args for args in outputs]
608  # Set the default value of inputs & outputs, verify the required values ('r' for inputs, 'w' for
609  # outputs) and remove the 'w' from inputs if needed.
610  for args in inputs:
611  if args.mode is None:
612  args.mode = 'r'
613  elif 'rw' == args.mode:
614  args.mode = 'r'
615  elif 'r' != args.mode:
616  raise RuntimeError("The mode of an input should be readable.")
617  for args in outputs:
618  if args.mode is None:
619  args.mode = 'w'
620  elif 'w' not in args.mode:
621  raise RuntimeError("The mode of an output should be writable.")
622  # check for class instances in args.mapper (not allowed)
623  for args in inputs + outputs:
624  if (args.mapper and not isinstance(args.mapper, str) and
625  not inspect.isclass(args.mapper)):
626  self.log.warn(preinitedMapperWarning)
627  # if the output is readable, there must be only one output:
628  for o in outputs:
629  if 'r' in o.mode:
630  if len(outputs) > 1:
631  raise RuntimeError("Butler does not support multiple output repositories if any of the "
632  "outputs are readable.")
633 
634  # Handle the case where the output is readable and is also passed in as one of the inputs by removing
635  # the input. This supports a legacy use case in pipe_tasks where the input is also passed as the
636  # output, to the command line parser.
637  def inputIsInOutputs(inputArgs, outputArgsList):
638  for o in outputArgsList:
639  if ('r' in o.mode and
640  o.root == inputArgs.root and
641  o.mapper == inputArgs.mapper and
642  o.mapperArgs == inputArgs.mapperArgs and
643  o.tags == inputArgs.tags and
644  o.policy == inputArgs.policy):
645  self.log.debug(("Input repositoryArgs {} is also listed in outputs as readable; " +
646  "throwing away the input.").format(inputArgs))
647  return True
648  return False
649 
650  inputs = [args for args in inputs if not inputIsInOutputs(args, outputs)]
651  return inputs, outputs
652 
653  @staticmethod
654  def _getParentVal(repoData):
655  """Get the value of this repoData as it should appear in the parents
656  list of other repositories"""
657  if repoData.isV1Repository:
658  return repoData.cfg
659  if repoData.cfgOrigin == 'nested':
660  return repoData.cfg
661  else:
662  return repoData.cfg.root
663 
664  @staticmethod
665  def _getParents(ofRepoData, repoInfo):
666  """Create a parents list of repoData from inputs and (readable) outputs."""
667  parents = []
668  # get the parents list of repoData:
669  for repoData in repoInfo:
670  if repoData is ofRepoData:
671  continue
672  if 'r' not in repoData.repoArgs.mode:
673  continue
674  parents.append(Butler._getParentVal(repoData))
675  return parents
676 
677  @staticmethod
678  def _getOldButlerRepositoryCfg(repositoryArgs):
679  if not Storage.isPosix(repositoryArgs.cfgRoot):
680  return None
681  if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
682  return None
683  if not repositoryArgs.mapper:
684  repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
685  cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
686  parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
687  if parent:
688  parent = Butler._getOldButlerRepositoryCfg(RepositoryArgs(cfgRoot=parent, mode='r'))
689  if parent is not None:
690  cfg.addParents([parent])
691  return cfg
692 
693  def _getRepositoryCfg(self, repositoryArgs):
694  """Try to get a repository from the location described by cfgRoot.
695 
696  Parameters
697  ----------
698  repositoryArgs : RepositoryArgs or string
699  Provides arguments to load an existing repository (or repositories). String is assumed to be a URI
700  and is used as the cfgRoot (URI to the location of the cfg file).
701 
702  Returned
703  --------
704  (RepositoryCfg or None, bool)
705  The RepositoryCfg, or None if one cannot be found, and True if the RepositoryCfg was created by
706  reading an Old Butler repository, or False if it is a New Butler Repository.
707  """
708  if not isinstance(repositoryArgs, RepositoryArgs):
709  repositoryArgs = RepositoryArgs(cfgRoot=repositoryArgs, mode='r')
710 
711  cfg = self.storage.getRepositoryCfg(repositoryArgs.cfgRoot)
712  isOldButlerRepository = False
713  if cfg is None:
714  cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
715  if cfg is not None:
716  isOldButlerRepository = True
717  return cfg, isOldButlerRepository
718 
719  def _getCfgs(self, repoDataList):
720  """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData.
721  If the cfg exists, compare values. If values match then use the cfg as an "existing" cfg. If the
722  values do not match, use the cfg as a "nested" cfg.
723  If the cfg does not exist, the RepositoryArgs must be for a writable repository.
724 
725  Parameters
726  ----------
727  repoDataList : list of RepoData
728  The RepoData that are output and inputs of this Butler
729 
730  Raises
731  ------
732  RuntimeError
733  If the passed-in RepositoryArgs indicate an existing repository but other cfg parameters in those
734  RepositoryArgs don't
735  match the existing repository's cfg a RuntimeError will be raised.
736  """
737  def cfgMatchesArgs(args, cfg):
738  """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg"""
739  if args.mapper is not None and cfg.mapper != args.mapper:
740  return False
741  if args.mapperArgs is not None and cfg.mapperArgs != args.mapperArgs:
742  return False
743  if args.policy is not None and cfg.policy != args.policy:
744  return False
745  return True
746 
747  for repoData in repoDataList:
748  cfg, isOldButlerRepository = self._getRepositoryCfg(repoData.repoArgs)
749  if cfg is None:
750  if 'w' not in repoData.repoArgs.mode:
751  raise RuntimeError(
752  "No cfg found for read-only input repository at {}".format(repoData.repoArgs.cfgRoot))
753  repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
754  origin='new',
755  root=repoData.repoArgs.cfgRoot,
756  isV1Repository=isOldButlerRepository)
757  else:
758 
759  # This is a hack fix for an issue introduced by DM-11284; Old Butler parent repositories used
760  # to be stored as a path to the repository in the parents list and it was changed so that the
761  # whole RepositoryCfg, that described the Old Butler repository (including the mapperArgs that
762  # were used with it), was recorded as a "nested" repository cfg. That checkin did not account
763  # for the fact that there were repositoryCfg.yaml files in the world with only the path to
764  # Old Butler repositories in the parents list.
765  if cfg.parents:
766  for i, parent in enumerate(cfg.parents):
767  if isinstance(parent, RepositoryCfg):
768  continue
769  parentCfg, parentIsOldButlerRepository = self._getRepositoryCfg(parent)
770  if parentIsOldButlerRepository:
771  parentCfg.mapperArgs = cfg.mapperArgs
772  self.log.info(("Butler is replacing an Old Butler parent repository path '{}' "
773  "found in the parents list of a New Butler repositoryCfg: {} "
774  "with a repositoryCfg that includes the child repository's "
775  "mapperArgs: {}. This affects the instantiated RepositoryCfg "
776  "but does not change the persisted child repositoryCfg.yaml file."
777  ).format(parent, cfg, parentCfg))
778  cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
779 
780  if 'w' in repoData.repoArgs.mode:
781  # if it's an output repository, the RepositoryArgs must match the existing cfg.
782  if not cfgMatchesArgs(repoData.repoArgs, cfg):
783  raise RuntimeError(("The RepositoryArgs and RepositoryCfg must match for writable " +
784  "repositories, RepositoryCfg:{}, RepositoryArgs:{}").format(
785  cfg, repoData.repoArgs))
786  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
787  isV1Repository=isOldButlerRepository)
788  else:
789  # if it's an input repository, the cfg can overwrite the in-repo cfg.
790  if cfgMatchesArgs(repoData.repoArgs, cfg):
791  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
792  isV1Repository=isOldButlerRepository)
793  else:
794  repoData.setCfg(cfg=cfg, origin='nested', root=None,
795  isV1Repository=isOldButlerRepository)
796 
797  def _addParents(self, repoDataList):
798  """For each repoData in the input list, see if its parents are the next items in the list, and if not
799  add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n.
800 
801  Parameters
802  ----------
803  repoDataList : list of RepoData
804  The RepoData for the Butler outputs + inputs.
805 
806  Raises
807  ------
808  RuntimeError
809  Raised if a RepositoryCfg can not be found at a location where a parent repository should be.
810  """
811  repoDataIdx = 0
812  while True:
813  if repoDataIdx == len(repoDataList):
814  break
815  repoData = repoDataList[repoDataIdx]
816  if 'r' not in repoData.repoArgs.mode:
817  repoDataIdx += 1
818  continue # the repoData only needs parents if it's readable.
819  if repoData.isNewRepository:
820  repoDataIdx += 1
821  continue # if it's new the parents will be the inputs of this butler.
822  if repoData.cfg.parents is None:
823  repoDataIdx += 1
824  continue # if there are no parents then there's nothing to do.
825  for repoParentIdx, repoParent in enumerate(repoData.cfg.parents):
826  parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
827  if not isinstance(repoParent, RepositoryCfg):
828  repoParentCfg, isOldButlerRepository = self._getRepositoryCfg(repoParent)
829  if repoParentCfg is not None:
830  cfgOrigin = 'existing'
831  else:
832  isOldButlerRepository = False
833  repoParentCfg = repoParent
834  cfgOrigin = 'nested'
835  if (parentIdxInRepoDataList < len(repoDataList) and
836  repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
837  continue
838  args = RepositoryArgs(cfgRoot=repoParentCfg.root, mode='r')
839  role = 'input' if repoData.role == 'output' else 'parent'
840  newRepoInfo = RepoData(args, role)
841  newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
842  isV1Repository=isOldButlerRepository)
843  repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
844  repoDataIdx += 1
845 
846  def _setAndVerifyParentsLists(self, repoDataList):
847  """Make a list of all the input repositories of this Butler, these are the parents of the outputs.
848  For new output repositories, set the parents in the RepositoryCfg. For existing output repositories
849  verify that the RepositoryCfg's parents match the parents list.
850 
851  Parameters
852  ----------
853  repoDataList : list of RepoData
854  All the RepoDatas loaded by this butler, in search order.
855 
856  Raises
857  ------
858  RuntimeError
859  If an existing output repository is loaded and its parents do not match the parents of this Butler
860  an error will be raised.
861  """
862  def getIOParents(ofRepoData, repoDataList):
863  """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable
864  outputs (not parents-of-parents) of this butler"""
865  parents = []
866  for repoData in repoDataList:
867  if repoData.role == 'parent':
868  continue
869  if repoData is ofRepoData:
870  continue
871  if repoData.role == 'output':
872  if 'r' in repoData.repoArgs.mode:
873  raise RuntimeError("If an output is readable it must be the only output.")
874  # and if this is the only output, this should have continued in
875  # "if repoData is ofRepoData"
876  continue
877  parents.append(self._getParentVal(repoData))
878  return parents
879 
880  for repoData in repoDataList:
881  if repoData.role != 'output':
882  continue
883  parents = getIOParents(repoData, repoDataList)
884  # if repoData is new, add the parent RepositoryCfgs to it.
885  if repoData.cfgOrigin == 'new':
886  repoData.cfg.addParents(parents)
887  elif repoData.cfgOrigin in ('existing', 'nested'):
888  if repoData.cfg.parents != parents:
889  try:
890  repoData.cfg.extendParents(parents)
891  except ParentsMismatch as e:
892  raise RuntimeError(("Inputs of this Butler:{} do not match parents of existing " +
893  "writable cfg:{} (ParentMismatch exception: {}").format(
894  parents, repoData.cfg.parents, e))
895 
896  def _setDefaultMapper(self, repoDataList):
897  """Establish a default mapper if there is one and assign it to outputs that do not have a mapper
898  assigned.
899 
900  If all inputs have the same mapper it will be used as the default mapper.
901 
902  Parameters
903  ----------
904  repoDataList : list of RepoData
905  All the RepoDatas loaded by this butler, in search order.
906 
907  Raises
908  ------
909  RuntimeError
910  If a default mapper can not be established and there is an output that does not have a mapper.
911  """
912  needyOutputs = [rd for rd in repoDataList if rd.role == 'output' and rd.cfg.mapper is None]
913  if len(needyOutputs) == 0:
914  return
915  mappers = set([rd.cfg.mapper for rd in repoDataList if rd.role == 'input'])
916  if len(mappers) != 1:
917  inputs = [rd for rd in repoDataList if rd.role == 'input']
918  raise RuntimeError(
919  ("No default mapper could be established from inputs:{} and no mapper specified " +
920  "for outputs:{}").format(inputs, needyOutputs))
921  defaultMapper = mappers.pop()
922  for repoData in needyOutputs:
923  repoData.cfg.mapper = defaultMapper
924 
925  def _connectParentRepoDatas(self, repoDataList):
926  """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it.
927 
928  Parameters
929  ----------
930  repoDataList : list of RepoData
931  All the RepoDatas loaded by this butler, in search order.
932 
933  Raises
934  ------
935  RuntimeError
936  When a parent is listed in the parents list but not found in the repoDataList. This is not
937  expected to ever happen and would indicate an internal Butler error.
938  """
939  for repoData in repoDataList:
940  for parent in repoData.cfg.parents:
941  parentToAdd = None
942  for otherRepoData in repoDataList:
943  if isinstance(parent, RepositoryCfg):
944  if otherRepoData.repoData.repoData.cfg == parent:
945  parentToAdd = otherRepoData.repoData
946  break
947  elif otherRepoData.repoData.cfg.root == parent:
948  parentToAdd = otherRepoData.repoData
949  break
950  if parentToAdd is None:
951  raise RuntimeError(
952  "Could not find a parent matching {} to add to {}".format(parent, repoData))
953  repoData.addParentRepoData(parentToAdd)
954 
955  @staticmethod
956  def _getParentRepoData(parent, repoDataList):
957  """get a parent RepoData from a cfg from a list of RepoData
958 
959  Parameters
960  ----------
961  parent : string or RepositoryCfg
962  cfgRoot of a repo or a cfg that describes the repo
963  repoDataList : list of RepoData
964  list to search in
965 
966  Returns
967  -------
968  RepoData or None
969  A RepoData if one can be found, else None
970  """
971  repoData = None
972  for otherRepoData in repoDataList:
973  if isinstance(parent, RepositoryCfg):
974  if otherRepoData.cfg == parent:
975  repoData = otherRepoData
976  break
977  elif otherRepoData.cfg.root == parent:
978  repoData = otherRepoData
979  break
980  return repoData
981 
982  def _setRepoDataTags(self):
983  """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
984  searches."""
985  def setTags(repoData, tags, context):
986  if id(repoData) in context:
987  return
988  repoData.addTags(tags)
989  context.add(id(repoData))
990  for parentRepoData in repoData.parentRepoDatas:
991  setTags(parentRepoData, tags, context)
992  for repoData in self._repos.outputs() + self._repos.inputs():
993  setTags(repoData.repoData, repoData.repoArgs.tags, set())
994 
995  def _convertV1Args(self, root, mapper, mapperArgs):
996  """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs
997  (inputs, outputs)
998 
999  Parameters
1000  ----------
1001  root : string
1002  Posix path to repository root
1003  mapper : class, class instance, or string
1004  Instantiated class, a class object to be instantiated, or a string that refers to a class that
1005  can be imported & used as the mapper.
1006  mapperArgs : dict
1007  RepositoryArgs & their values used when instantiating the mapper.
1008 
1009  Returns
1010  -------
1011  tuple
1012  (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__
1013  """
1014  if (mapper and not isinstance(mapper, str) and
1015  not inspect.isclass(mapper)):
1016  self.log.warn(preinitedMapperWarning)
1017  inputs = None
1018  if root is None:
1019  if hasattr(mapper, 'root'):
1020  # in legacy repositories, the mapper may be given the root directly.
1021  root = mapper.root
1022  else:
1023  # in the past root="None" could be used to mean root='.'
1024  root = '.'
1025  outputs = RepositoryArgs(mode='rw',
1026  root=root,
1027  mapper=mapper,
1028  mapperArgs=mapperArgs)
1029  return inputs, outputs
1030 
1031  def __repr__(self):
1032  return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1033  self.datasetTypeAliasDict, self._repos)
1034 
1035  def _getDefaultMapper(self):
1036 
1037  """Get the default mapper. Currently this means if all the repositories use exactly the same mapper,
1038  that mapper may be considered the default.
1039 
1040  This definition may be changing; mappers may be able to exclude themselves as candidates for default,
1041  and they may nominate a different mapper instead. Also, we may not want to look at *all* the
1042  repositories, but only a depth-first search on each of the input & output repositories, and use the
1043  first-found mapper for each of those. TBD.
1044 
1045  Parameters
1046  ----------
1047  inputs : TYPE
1048  Description
1049 
1050  Returns
1051  -------
1052  Mapper class or None
1053  Returns the class type of the default mapper, or None if a default
1054  mapper can not be determined.
1055  """
1056  defaultMapper = None
1057 
1058  for inputRepoData in self._repos.inputs():
1059  mapper = None
1060  if inputRepoData.cfg.mapper is not None:
1061  mapper = inputRepoData.cfg.mapper
1062  # if the mapper is:
1063  # * a string, import it.
1064  # * a class instance, get its class type
1065  # * a class, do nothing; use it
1066  if isinstance(mapper, str):
1067  mapper = doImport(mapper)
1068  elif not inspect.isclass(mapper):
1069  mapper = mapper.__class__
1070  # If no mapper has been found, note the first found mapper.
1071  # Then, if a mapper has been found and each next mapper matches it,
1072  # continue looking for mappers.
1073  # If a mapper has been found and another non-matching mapper is
1074  # found then we have no default, return None.
1075  if defaultMapper is None:
1076  defaultMapper = mapper
1077  elif mapper == defaultMapper:
1078  continue
1079  elif mapper is not None:
1080  return None
1081  return defaultMapper
1082 
1083  def _assignDefaultMapper(self, defaultMapper):
1084  for repoData in self._repos.all().values():
1085  if repoData.cfg.mapper is None and (repoData.isNewRepository or repoData.isV1Repository):
1086  if defaultMapper is None:
1087  raise RuntimeError(
1088  "No mapper specified for %s and no default mapper could be determined." %
1089  repoData.args)
1090  repoData.cfg.mapper = defaultMapper
1091 
1092  @staticmethod
1093  def getMapperClass(root):
1094  """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at
1095  that location or in a parent location.
1096 
1097  As we abstract the storage and support different types of storage locations this method will be
1098  moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
1099  return Storage.getMapperClass(root)
1100 
1101  def defineAlias(self, alias, datasetType):
1102  """Register an alias that will be substituted in datasetTypes.
1103 
1104  Parameters
1105  ----------
1106  alias - string
1107  The alias keyword. It may start with @ or not. It may not contain @ except as the first character.
1108  datasetType - string
1109  The string that will be substituted when @alias is passed into datasetType. It may not contain '@'
1110  """
1111  # verify formatting of alias:
1112  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
1113  atLoc = alias.rfind('@')
1114  if atLoc == -1:
1115  alias = "@" + str(alias)
1116  elif atLoc > 0:
1117  raise RuntimeError("Badly formatted alias string: %s" % (alias,))
1118 
1119  # verify that datasetType does not contain '@'
1120  if datasetType.count('@') != 0:
1121  raise RuntimeError("Badly formatted type string: %s" % (datasetType))
1122 
1123  # verify that the alias keyword does not start with another alias keyword,
1124  # and vice versa
1125  for key in self.datasetTypeAliasDict:
1126  if key.startswith(alias) or alias.startswith(key):
1127  raise RuntimeError("Alias: %s overlaps with existing alias: %s" % (alias, key))
1128 
1129  self.datasetTypeAliasDict[alias] = datasetType
1130 
1131  def getKeys(self, datasetType=None, level=None, tag=None):
1132  """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
1133  entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
1134  float, string).
1135 
1136  Parameters
1137  ----------
1138  datasetType - string
1139  The type of dataset to get keys for, entire collection if None.
1140  level - string
1141  The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the
1142  mapper should lookup the default level.
1143  tags - any, or list of any
1144  Any object that can be tested to be the same as the tag in a dataId passed into butler input
1145  functions. Applies only to input repositories: If tag is specified by the dataId then the repo
1146  will only be read from used if the tag in the dataId matches a tag used for that repository.
1147 
1148  Returns
1149  -------
1150  Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for
1151  the dataset type or the entire collection if None. The dict values are the basic Python types
1152  corresponding to the keys (int, float, string).
1153  """
1154  datasetType = self._resolveDatasetTypeAlias(datasetType)
1155 
1156  keys = None
1157  tag = setify(tag)
1158  for repoData in self._repos.inputs():
1159  if not tag or len(tag.intersection(repoData.tags)) > 0:
1160  keys = repoData.repo.getKeys(datasetType, level)
1161  # An empty dict is a valid "found" condition for keys. The only value for keys that should
1162  # cause the search to continue is None
1163  if keys is not None:
1164  break
1165  return keys
1166 
1167  def queryMetadata(self, datasetType, format, dataId={}, **rest):
1168  """Returns the valid values for one or more keys when given a partial
1169  input collection data id.
1170 
1171  Parameters
1172  ----------
1173  datasetType - string
1174  The type of dataset to inquire about.
1175  format - str, tuple
1176  Key or tuple of keys to be returned.
1177  dataId - DataId, dict
1178  The partial data id.
1179  **rest -
1180  Keyword arguments for the partial data id.
1181 
1182  Returns
1183  -------
1184  A list of valid values or tuples of valid values as specified by the
1185  format.
1186  """
1187 
1188  datasetType = self._resolveDatasetTypeAlias(datasetType)
1189  dataId = DataId(dataId)
1190  dataId.update(**rest)
1191  format = sequencify(format)
1192 
1193  tuples = None
1194  for repoData in self._repos.inputs():
1195  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
1196  tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1197  if tuples:
1198  break
1199 
1200  if not tuples:
1201  return []
1202 
1203  if len(format) == 1:
1204  ret = []
1205  for x in tuples:
1206  try:
1207  ret.append(x[0])
1208  except TypeError:
1209  ret.append(x)
1210  return ret
1211 
1212  return tuples
1213 
1214  def datasetExists(self, datasetType, dataId={}, write=False, **rest):
1215  """Determines if a dataset file exists.
1216 
1217  Parameters
1218  ----------
1219  datasetType - string
1220  The type of dataset to inquire about.
1221  dataId - DataId, dict
1222  The data id of the dataset.
1223  write - bool
1224  If True, look only in locations where the dataset could be written,
1225  and return True only if it is present in all of them.
1226  **rest keyword arguments for the data id.
1227 
1228  Returns
1229  -------
1230  exists - bool
1231  True if the dataset exists or is non-file-based.
1232  """
1233  datasetType = self._resolveDatasetTypeAlias(datasetType)
1234  dataId = DataId(dataId)
1235  dataId.update(**rest)
1236  locations = self._locate(datasetType, dataId, write=write)
1237  if not write: # when write=False, locations is not a sequence
1238  if locations is None:
1239  return False
1240  locations = [locations]
1241 
1242  if not locations: # empty list
1243  return False
1244 
1245  for location in locations:
1246  # If the location is a ButlerComposite (as opposed to a ButlerLocation),
1247  # verify the component objects exist.
1248  if isinstance(location, ButlerComposite):
1249  for name, componentInfo in location.componentInfo.items():
1250  if componentInfo.subset:
1251  subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1252  exists = all([obj.datasetExists() for obj in subset])
1253  else:
1254  exists = self.datasetExists(componentInfo.datasetType, location.dataId)
1255  if exists is False:
1256  return False
1257  else:
1258  if not location.repository.exists(location):
1259  return False
1260  return True
1261 
1262  def _locate(self, datasetType, dataId, write):
1263  """Get one or more ButlerLocations and/or ButlercComposites.
1264 
1265  Parameters
1266  ----------
1267  datasetType : string
1268  The datasetType that is being searched for. The datasetType may be followed by a dot and
1269  a component name (component names are specified in the policy). IE datasetType.componentName
1270 
1271  dataId : dict or DataId class instance
1272  The dataId
1273 
1274  write : bool
1275  True if this is a search to write an object. False if it is a search to read an object. This
1276  affects what type (an object or a container) is returned.
1277 
1278  Returns
1279  -------
1280  If write is False, will return either a single object or None. If write is True, will return a list
1281  (which may be empty)
1282  """
1283  repos = self._repos.outputs() if write else self._repos.inputs()
1284  locations = []
1285  for repoData in repos:
1286  # enforce dataId & repository tags when reading:
1287  if not write and dataId.tag and len(dataId.tag.intersection(repoData.tags)) == 0:
1288  continue
1289  components = datasetType.split('.')
1290  datasetType = components[0]
1291  components = components[1:]
1292  try:
1293  location = repoData.repo.map(datasetType, dataId, write=write)
1294  except NoResults:
1295  continue
1296  if location is None:
1297  continue
1298  location.datasetType = datasetType # todo is there a better way than monkey patching here?
1299  if len(components) > 0:
1300  if not isinstance(location, ButlerComposite):
1301  raise RuntimeError("The location for a dotted datasetType must be a composite.")
1302  # replace the first component name with the datasetType
1303  components[0] = location.componentInfo[components[0]].datasetType
1304  # join components back into a dot-delimited string
1305  datasetType = '.'.join(components)
1306  location = self._locate(datasetType, dataId, write)
1307  # if a component location is not found, we can not continue with this repo, move to next repo.
1308  if location is None:
1309  break
1310  # if reading, only one location is desired.
1311  if location:
1312  if not write:
1313  # If there is a bypass function for this dataset type, we can't test to see if the object
1314  # exists in storage, because the bypass function may not actually use the location
1315  # according to the template. Instead, execute the bypass function and include its results
1316  # in the bypass attribute of the location. The bypass function may fail for any reason,
1317  # the most common case being that a file does not exist. If it raises an exception
1318  # indicating such, we ignore the bypass function and proceed as though it does not exist.
1319  if hasattr(location.mapper, "bypass_" + location.datasetType):
1320  bypass = self._getBypassFunc(location, dataId)
1321  try:
1322  bypass = bypass()
1323  location.bypass = bypass
1324  except (NoResults, IOError):
1325  self.log.debug("Continuing dataset search while evaluating "
1326  "bypass function for Dataset type:{} Data ID:{} at "
1327  "location {}".format(datasetType, dataId, location))
1328  # If a location was found but the location does not exist, keep looking in input
1329  # repositories (the registry may have had enough data for a lookup even thought the object
1330  # exists in a different repository.)
1331  if (isinstance(location, ButlerComposite) or hasattr(location, 'bypass') or
1332  location.repository.exists(location)):
1333  return location
1334  else:
1335  try:
1336  locations.extend(location)
1337  except TypeError:
1338  locations.append(location)
1339  if not write:
1340  return None
1341  return locations
1342 
1343  @staticmethod
1344  def _getBypassFunc(location, dataId):
1345  pythonType = location.getPythonType()
1346  if pythonType is not None:
1347  if isinstance(pythonType, str):
1348  pythonType = doImport(pythonType)
1349  bypassFunc = getattr(location.mapper, "bypass_" + location.datasetType)
1350  return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1351 
1352  def get(self, datasetType, dataId=None, immediate=True, **rest):
1353  """Retrieves a dataset given an input collection data id.
1354 
1355  Parameters
1356  ----------
1357  datasetType - string
1358  The type of dataset to retrieve.
1359  dataId - dict
1360  The data id.
1361  immediate - bool
1362  If False use a proxy for delayed loading.
1363  **rest
1364  keyword arguments for the data id.
1365 
1366  Returns
1367  -------
1368  An object retrieved from the dataset (or a proxy for one).
1369  """
1370  datasetType = self._resolveDatasetTypeAlias(datasetType)
1371  dataId = DataId(dataId)
1372  dataId.update(**rest)
1373 
1374  location = self._locate(datasetType, dataId, write=False)
1375  if location is None:
1376  raise NoResults("No locations for get:", datasetType, dataId)
1377  self.log.debug("Get type=%s keys=%s from %s", datasetType, dataId, str(location))
1378 
1379  if hasattr(location, 'bypass'):
1380  # this type loader block should get moved into a helper someplace, and duplications removed.
1381  def callback():
1382  return location.bypass
1383  else:
1384  def callback():
1385  return self._read(location)
1386  if location.mapper.canStandardize(location.datasetType):
1387  innerCallback = callback
1388 
1389  def callback():
1390  return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1391  if immediate:
1392  return callback()
1393  return ReadProxy(callback)
1394 
1395  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1396  """Persists a dataset given an output collection data id.
1397 
1398  Parameters
1399  ----------
1400  obj -
1401  The object to persist.
1402  datasetType - string
1403  The type of dataset to persist.
1404  dataId - dict
1405  The data id.
1406  doBackup - bool
1407  If True, rename existing instead of overwriting.
1408  WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race
1409  conditions.
1410  **rest
1411  Keyword arguments for the data id.
1412  """
1413  datasetType = self._resolveDatasetTypeAlias(datasetType)
1414  dataId = DataId(dataId)
1415  dataId.update(**rest)
1416 
1417  locations = self._locate(datasetType, dataId, write=True)
1418  if not locations:
1419  raise NoResults("No locations for put:", datasetType, dataId)
1420  for location in locations:
1421  if isinstance(location, ButlerComposite):
1422  disassembler = location.disassembler if location.disassembler else genericDisassembler
1423  disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1424  for name, info in location.componentInfo.items():
1425  if not info.inputOnly:
1426  self.put(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1427  else:
1428  if doBackup:
1429  location.getRepository().backup(location.datasetType, dataId)
1430  location.getRepository().write(location, obj)
1431 
1432  def subset(self, datasetType, level=None, dataId={}, **rest):
1433  """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
1434 
1435  Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the
1436  dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or
1437  sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs.
1438  Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists().
1439 
1440  Parameters
1441  ----------
1442  datasetType - string
1443  The type of dataset collection to subset
1444  level - string
1445  The level of dataId at which to subset. Use an empty string if the mapper should look up the
1446  default level.
1447  dataId - dict
1448  The data id.
1449  **rest
1450  Keyword arguments for the data id.
1451 
1452  Returns
1453  -------
1454  subset - ButlerSubset
1455  Collection of ButlerDataRefs for datasets matching the data id.
1456 
1457  Examples
1458  -----------
1459  To print the full dataIds for all r-band measurements in a source catalog
1460  (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`):
1461 
1462  >>> subset = butler.subset('src', filter='r')
1463  >>> for data_ref in subset: print(data_ref.dataId)
1464  """
1465  datasetType = self._resolveDatasetTypeAlias(datasetType)
1466 
1467  # Currently expected behavior of subset is that if specified level is None then the mapper's default
1468  # level should be used. Convention for level within Butler is that an empty string is used to indicate
1469  # 'get default'.
1470  if level is None:
1471  level = ''
1472 
1473  dataId = DataId(dataId)
1474  dataId.update(**rest)
1475  return ButlerSubset(self, datasetType, level, dataId)
1476 
1477  def dataRef(self, datasetType, level=None, dataId={}, **rest):
1478  """Returns a single ButlerDataRef.
1479 
1480  Given a complete dataId specified in dataId and **rest, find the unique dataset at the given level
1481  specified by a dataId key (e.g. visit or sensor or amp for a camera) and return a ButlerDataRef.
1482 
1483  Parameters
1484  ----------
1485  datasetType - string
1486  The type of dataset collection to reference
1487  level - string
1488  The level of dataId at which to reference
1489  dataId - dict
1490  The data id.
1491  **rest
1492  Keyword arguments for the data id.
1493 
1494  Returns
1495  -------
1496  dataRef - ButlerDataRef
1497  ButlerDataRef for dataset matching the data id
1498  """
1499 
1500  datasetType = self._resolveDatasetTypeAlias(datasetType)
1501  dataId = DataId(dataId)
1502  subset = self.subset(datasetType, level, dataId, **rest)
1503  if len(subset) != 1:
1504  raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
1505  (str(datasetType), str(level), str(dataId), str(rest)))
1506  return ButlerDataRef(subset, subset.cache[0])
1507 
1508  def getUri(self, datasetType, dataId=None, write=False, **rest):
1509  """Return the URI for a dataset
1510 
1511  .. warning:: This is intended only for debugging. The URI should
1512  never be used for anything other than printing.
1513 
1514  .. note:: In the event there are multiple URIs for read, we return only
1515  the first.
1516 
1517  .. note:: getUri() does not currently support composite datasets.
1518 
1519  Parameters
1520  ----------
1521  datasetType : `str`
1522  The dataset type of interest.
1523  dataId : `dict`, optional
1524  The data identifier.
1525  write : `bool`, optional
1526  Return the URI for writing?
1527  rest : `dict`, optional
1528  Keyword arguments for the data id.
1529 
1530  Returns
1531  -------
1532  uri : `str`
1533  URI for dataset.
1534  """
1535  datasetType = self._resolveDatasetTypeAlias(datasetType)
1536  dataId = DataId(dataId)
1537  dataId.update(**rest)
1538  locations = self._locate(datasetType, dataId, write=write)
1539  if locations is None:
1540  raise NoResults("No locations for getUri: ", datasetType, dataId)
1541 
1542  if write:
1543  # Follow the write path
1544  # Return the first valid write location.
1545  for location in locations:
1546  if isinstance(location, ButlerComposite):
1547  for name, info in location.componentInfo.items():
1548  if not info.inputOnly:
1549  return self.getUri(info.datasetType, location.dataId, write=True)
1550  else:
1551  return location.getLocationsWithRoot()[0]
1552  # fall back to raise
1553  raise NoResults("No locations for getUri(write=True): ", datasetType, dataId)
1554  else:
1555  # Follow the read path, only return the first valid read
1556  return locations.getLocationsWithRoot()[0]
1557 
1558  def _read(self, location):
1559  """Unpersist an object using data inside a ButlerLocation or ButlerComposite object.
1560 
1561  Parameters
1562  ----------
1563  location : ButlerLocation or ButlerComposite
1564  A ButlerLocation or ButlerComposite instance populated with data needed to read the object.
1565 
1566  Returns
1567  -------
1568  object
1569  An instance of the object specified by the location.
1570  """
1571  self.log.debug("Starting read from %s", location)
1572 
1573  if isinstance(location, ButlerComposite):
1574  for name, componentInfo in location.componentInfo.items():
1575  if componentInfo.subset:
1576  subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1577  componentInfo.obj = [obj.get() for obj in subset]
1578  else:
1579  obj = self.get(componentInfo.datasetType, location.dataId, immediate=True)
1580  componentInfo.obj = obj
1581  assembler = location.assembler or genericAssembler
1582  results = assembler(dataId=location.dataId, componentInfo=location.componentInfo,
1583  cls=location.python)
1584  return results
1585  else:
1586  results = location.repository.read(location)
1587  if len(results) == 1:
1588  results = results[0]
1589  self.log.debug("Ending read from %s", location)
1590  return results
1591 
1592  def __reduce__(self):
1593  ret = (_unreduce, (self._initArgs, self.datasetTypeAliasDict))
1594  return ret
1595 
1596  def _resolveDatasetTypeAlias(self, datasetType):
1597  """Replaces all the known alias keywords in the given string with the alias value.
1598 
1599  Parameters
1600  ----------
1601  datasetType - string
1602  A datasetType string to search & replace on
1603 
1604  Returns
1605  -------
1606  datasetType - string
1607  The de-aliased string
1608  """
1609  for key in self.datasetTypeAliasDict:
1610  # if all aliases have been replaced, bail out
1611  if datasetType.find('@') == -1:
1612  break
1613  datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key])
1614 
1615  # If an alias specifier can not be resolved then throw.
1616  if datasetType.find('@') != -1:
1617  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType))
1618 
1619  return datasetType
1620 
1621 
1622 def _unreduce(initArgs, datasetTypeAliasDict):
1623  mapperArgs = initArgs.pop('mapperArgs')
1624  initArgs.update(mapperArgs)
1625  butler = Butler(**initArgs)
1626  butler.datasetTypeAliasDict = datasetTypeAliasDict
1627  return butler
def write(self, patchRef, catalog)
Write the output.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def _resolveDatasetTypeAlias(self, datasetType)
Definition: butler.py:1596
def datasetExists(self, datasetType, dataId={}, write=False, rest)
Definition: butler.py:1214
def _convertV1Args(self, root, mapper, mapperArgs)
Definition: butler.py:995
def __init__(self, root=None, mapper=None, inputs=None, outputs=None, mapperArgs)
Definition: butler.py:503
def setCfg(self, cfg, origin, root, isV1Repository)
Definition: butler.py:163
def _getRepositoryCfg(self, repositoryArgs)
Definition: butler.py:693
def getParentRepoDatas(self, context=None)
Definition: butler.py:204
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
table::Key< int > id
Definition: Detector.cc:162
daf::base::PropertySet * set
Definition: fits.cc:902
def _getCfgs(self, repoDataList)
Definition: butler.py:719
def subset(self, datasetType, level=None, dataId={}, rest)
Definition: butler.py:1432
def __init__(self, cls, repoCfg)
Definition: butler.py:57
Definition: Log.h:706
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def _initRepo(self, repoData)
Definition: butler.py:534
def _setDefaultMapper(self, repoDataList)
Definition: butler.py:896
def getUri(self, datasetType, dataId=None, write=False, rest)
Definition: butler.py:1508
def defineAlias(self, alias, datasetType)
Definition: butler.py:1101
def _connectParentRepoDatas(self, repoDataList)
Definition: butler.py:925
def _addParents(self, repoDataList)
Definition: butler.py:797
def getKeys(self, datasetType=None, level=None, tag=None)
Definition: butler.py:1131
def doImport(pythonType)
Definition: utils.py:104
def _getBypassFunc(location, dataId)
Definition: butler.py:1344
def put(self, obj, datasetType, dataId={}, doBackup=False, rest)
Definition: butler.py:1395
def queryMetadata(self, datasetType, format, dataId={}, rest)
Definition: butler.py:1167
def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, mapperArgs)
Definition: butler.py:550
def addParentRepoData(self, parentRepoData)
Definition: butler.py:232
def _locate(self, datasetType, dataId, write)
Definition: butler.py:1262
def _setAndVerifyParentsLists(self, repoDataList)
Definition: butler.py:846
def get(self, datasetType, dataId=None, immediate=True, rest)
Definition: butler.py:1352
def __init__(self, args, role)
Definition: butler.py:120