LSSTApplications  20.0.0
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 import copy
29 import inspect
30 
31 import yaml
32 
33 from lsst.log import Log
34 from . import ReadProxy, ButlerSubset, ButlerDataRef, \
35  Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
36  RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
37  genericDisassembler, PosixStorage, ParentsMismatch
38 
39 preinitedMapperWarning = ("Passing an instantiated mapper into "
40  "Butler.__init__ will prevent Butler from passing "
41  "parentRegistry or repositoryCfg information to "
42  "the mapper, which is done only at init time. "
43  "It is better to pass a importable string or "
44  "class object.")
45 
46 
47 class ButlerCfg(Policy, yaml.YAMLObject):
48  """Represents a Butler configuration.
49 
50  .. warning::
51 
52  cfg is 'wet paint' and very likely to change. Use of it in production
53  code other than via the 'old butler' API is strongly discouraged.
54  """
55  yaml_tag = u"!ButlerCfg"
56 
57  def __init__(self, cls, repoCfg):
58  super().__init__({'repoCfg': repoCfg, 'cls': cls})
59 
60 
61 class RepoData:
62  """Container object for repository data used by Butler
63 
64  Parameters
65  ----------
66  args : RepositoryArgs
67  The arguments that are used to find or create the RepositoryCfg.
68  role : string
69  "input", "output", or "parent", indicating why Butler loaded this repository.
70  * input: the Repository was passed as a Butler input.
71  * output: the Repository was passed as a Butler output.
72  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
73 
74  Attributes
75  ----------
76  cfg: RepositoryCfg
77  The configuration for the Repository.
78 
79  _cfgOrigin : string
80  "new", "existing", or "nested". Indicates the origin of the repository and its RepositoryCfg:
81  * new: it was created by this instance of Butler, and this instance of Butler will generate the
82  RepositoryCfg file.
83  * existing: it was found (via the root or cfgRoot argument)
84  * nested: the full RepositoryCfg was nested in another RepositoryCfg's parents list (this can happen
85  if parameters of an input specified by RepositoryArgs or dict does not entirely match an existing
86  RepositoryCfg).
87 
88  cfgRoot : string
89  Path or URI to the location of the RepositoryCfg file.
90 
91  repo : lsst.daf.persistence.Repository
92  The Repository class instance.
93 
94  parentRepoDatas : list of RepoData
95  The parents of this Repository, as indicated this Repository's RepositoryCfg. If this is a new
96  Repository then these are the inputs to this Butler (and will be saved in the RepositoryCfg). These
97  RepoData objects are not owned by this RepoData, these are references to peer RepoData objects in the
98  Butler's RepoDataContainer.
99 
100  isV1Repository : bool
101  True if this is an Old Butler repository. In this case the repository does not have a RepositoryCfg
102  file. It may have a _mapper file and may have a _parent symlink. It will never be treated as a "new"
103  repository, i.e. even though there is not a RepositoryCfg file, one will not be generated.
104  If False, this is a New Butler repository and is specified by RepositoryCfg file.
105 
106  tags : set
107  These are values that may be used to restrict the search of input repositories. Details are available
108  in the RepositoryArgs and DataId classes.
109 
110  role : string
111  "input", "output", or "parent", indicating why Butler loaded this repository.
112  * input: the Repository was passed as a Butler input.
113  * output: the Repository was passed as a Butler output.
114  * parent: the Repository was specified in the RepositoryCfg parents list of a readable repository.
115 
116  _repoArgs : RepositoryArgs
117  Contains the arguments that were used to specify this Repository.
118  """
119 
120  def __init__(self, args, role):
121  self.cfg = None
122  self._cfgOrigin = None
123  self.cfgRoot = None
124  self.repo = None
125  self.parentRepoDatas = []
126  self.isV1Repository = False
127  self.tags = set()
128  self.role = role
129  self.parentRegistry = None
130  self._repoArgs = args
131 
132  @property
133  def repoArgs(self):
134  return self._repoArgs
135 
136  @property
137  def repoData(self):
138  return self
139 
140  def __repr__(self):
141  return ("{}(id={},"
142  "repoArgs={}"
143  "cfg={!r},"
144  "cfgOrigin={},"
145  "cfgRoot={},"
146  "repo={},"
147  "parentRepoDatas={},"
148  "isV1Repository={},"
149  "role={},"
150  "parentRegistry={})").format(
151  self.__class__.__name__,
152  id(self),
153  self.repoArgs,
154  self.cfg,
155  self.cfgOrigin,
156  self.cfgRoot,
157  self.repo,
158  [id(p) for p in self.parentRepoDatas],
159  self.isV1Repository,
160  self.role,
161  self.parentRegistry)
162 
163  def setCfg(self, cfg, origin, root, isV1Repository):
164  """Set information about the cfg into the RepoData
165 
166  Parameters
167  ----------
168  cfg : RepositoryCfg
169  The RepositoryCfg for the repo.
170  origin : string
171  'new', 'existing', or 'nested'
172  root : string
173  URI or absolute path to the location of the RepositoryCfg.yaml file.
174 
175  Returns
176  -------
177  None
178  """
179  if origin not in ('new', 'existing', 'nested'):
180  raise RuntimeError("Invalid value for origin:{}".format(origin))
181  self.cfg = cfg
182  self._cfgOrigin = origin
183  self.cfgRoot = root
184  self.isV1Repository = isV1Repository
185 
186  @property
187  def cfgOrigin(self):
188  return self._cfgOrigin
189 
190  @property
191  def isNewRepository(self):
192  return self.cfgOrigin == 'new'
193 
194  @property
195  def role(self):
196  return self._role
197 
198  @role.setter
199  def role(self, val):
200  if val not in ('input', 'output', 'parent'):
201  raise RuntimeError("Invalid value for role: {}".format(val))
202  self._role = val
203 
204  def getParentRepoDatas(self, context=None):
205  """Get the parents & grandparents etc of this repo data, in depth-first search order.
206 
207  Duplicate entries will be removed in cases where the same parent appears more than once in the parent
208  graph.
209 
210  Parameters
211  ----------
212  context : set, optional
213  Users should typically omit context and accept the default argument. Context is used to keep a set
214  of known RepoDatas when calling this function recursively, for duplicate elimination.
215 
216  Returns
217  -------
218  list of RepoData
219  A list of the parents & grandparents etc of a given repo data, in depth-first search order.
220  """
221  if context is None:
222  context = set()
223  parents = []
224  if id(self) in context:
225  return parents
226  context.add(id(self))
227  for parent in self.parentRepoDatas:
228  parents.append(parent)
229  parents += parent.getParentRepoDatas(context)
230  return parents
231 
232  def addParentRepoData(self, parentRepoData):
233  self.parentRepoDatas.append(parentRepoData)
234 
235  def addTags(self, tags):
236  self.tags = self.tags.union(tags)
237 
238 
240  """Container object for RepoData instances owned by a Butler instance.
241 
242  Parameters
243  ----------
244  repoDataList : list of RepoData
245  repoData - RepoData instance to add
246  """
247 
248  def __init__(self, repoDataList):
249  self._inputs = None
250  self._outputs = None
251  self._all = repoDataList
252  self._buildLookupLists()
253 
254  def inputs(self):
255  """Get a list of RepoData that are used to as inputs to the Butler.
256  The list is created lazily as needed, and cached.
257 
258  Returns
259  -------
260  A list of RepoData with readable repositories, in the order to be used when searching.
261  """
262  if self._inputs is None:
263  raise RuntimeError("Inputs not yet initialized.")
264  return self._inputs
265 
266  def outputs(self):
267  """Get a list of RepoData that are used to as outputs to the Butler.
268  The list is created lazily as needed, and cached.
269 
270  Returns
271  -------
272  A list of RepoData with writable repositories, in the order to be use when searching.
273  """
274  if self._outputs is None:
275  raise RuntimeError("Outputs not yet initialized.")
276  return self._outputs
277 
278  def all(self):
279  """Get a list of all RepoData that are used to as by the Butler.
280  The list is created lazily as needed, and cached.
281 
282  Returns
283  -------
284  A list of RepoData with writable repositories, in the order to be use when searching.
285  """
286  return self._all
287 
288  def __repr__(self):
289  return "%s(_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
290  self.__class__.__name__,
291  self._inputs,
292  self._outputs,
293  self._all)
294 
295  def _buildLookupLists(self):
296  """Build the inputs and outputs lists based on the order of self.all()."""
297 
298  def addToList(repoData, lst):
299  """Add a repoData and each of its parents (depth first) to a list"""
300  if id(repoData) in alreadyAdded:
301  return
302  lst.append(repoData)
303  alreadyAdded.add(id(repoData))
304  for parent in repoData.parentRepoDatas:
305  addToList(parent, lst)
306 
307  if self._inputs is not None or self._outputs is not None:
308  raise RuntimeError("Lookup lists are already built.")
309  inputs = [repoData for repoData in self.all() if repoData.role == 'input']
310  outputs = [repoData for repoData in self.all() if repoData.role == 'output']
311  self._inputs = []
312  alreadyAdded = set()
313  for repoData in outputs:
314  if 'r' in repoData.repoArgs.mode:
315  addToList(repoData.repoData, self._inputs)
316  for repoData in inputs:
317  addToList(repoData.repoData, self._inputs)
318  self._outputs = [repoData.repoData for repoData in outputs]
319 
320 
321 class Butler:
322  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
323 
324  A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
325  intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the
326  object containing the data. For example, an ExposureF object might be used to hold the data for a raw
327  image, a post-ISR image, a calibrated science image, or a difference image. These would all be different
328  dataset types.
329 
330  A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if
331  given a partial data identifier. It can check for the existence of a file containing a dataset given its
332  type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
333  an appropriate location when given its associated data identifier.
334 
335  Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is
336  lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved
337  and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not
338  using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
339  function, contained in the input mapper object, must perform any necessary manipulations to force the
340  retrieved object to conform to standards, including translating metadata.
341 
342  Public methods:
343 
344  __init__(self, root, mapper=None, **mapperArgs)
345 
346  defineAlias(self, alias, datasetType)
347 
348  getKeys(self, datasetType=None, level=None)
349 
350  getDatasetTypes(self)
351 
352  queryMetadata(self, datasetType, format=None, dataId={}, **rest)
353 
354  datasetExists(self, datasetType, dataId={}, **rest)
355 
356  get(self, datasetType, dataId={}, immediate=False, **rest)
357 
358  put(self, obj, datasetType, dataId={}, **rest)
359 
360  subset(self, datasetType, level=None, dataId={}, **rest)
361 
362  dataRef(self, datasetType, level=None, dataId={}, **rest)
363 
364  Initialization:
365 
366  The preferred method of initialization is to use the `inputs` and `outputs` __init__ parameters. These
367  are described in the parameters section, below.
368 
369  For backward compatibility: this initialization method signature can take a posix root path, and
370  optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
371  argument. However, for this to work in a backward compatible way it creates a single repository that is
372  used as both an input and an output repository. This is NOT preferred, and will likely break any
373  provenance system we have in place.
374 
375  Parameters
376  ----------
377  root : string
378  .. note:: Deprecated in 12_0
379  `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for
380  multiple-repository support.
381  A file system path. Will only work with a PosixRepository.
382  mapper : string or instance
383  .. note:: Deprecated in 12_0
384  `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for
385  multiple-repository support.
386  Provides a mapper to be used with Butler.
387  mapperArgs : dict
388  .. note:: Deprecated in 12_0
389  `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for
390  multiple-repository support.
391  Provides arguments to be passed to the mapper if the mapper input argument is a class type to be
392  instantiated by Butler.
393  inputs : RepositoryArgs, dict, or string
394  Can be a single item or a list. Provides arguments to load an existing repository (or repositories).
395  String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local
396  file system URI does not have to start with 'file://' and in this way can be a relative path). The
397  `RepositoryArgs` class can be used to provide more parameters with which to initialize a repository
398  (such as `mapper`, `mapperArgs`, `tags`, etc. See the `RepositoryArgs` documentation for more
399  details). A dict may be used as shorthand for a `RepositoryArgs` class instance. The dict keys must
400  match parameters to the `RepositoryArgs.__init__` function.
401  outputs : RepositoryArgs, dict, or string
402  Provides arguments to load one or more existing repositories or create new ones. The different types
403  are handled the same as for `inputs`.
404 
405  The Butler init sequence loads all of the input and output repositories.
406  This creates the object hierarchy to read from and write to them. Each
407  repository can have 0 or more parents, which also get loaded as inputs.
408  This becomes a DAG of repositories. Ultimately, Butler creates a list of
409  these Repositories in the order that they are used.
410 
411  Initialization Sequence
412  =======================
413 
414  During initialization Butler creates a Repository class instance & support structure for each object
415  passed to `inputs` and `outputs` as well as the parent repositories recorded in the `RepositoryCfg` of
416  each existing readable repository.
417 
418  This process is complex. It is explained below to shed some light on the intent of each step.
419 
420  1. Input Argument Standardization
421  ---------------------------------
422 
423  In `Butler._processInputArguments` the input arguments are verified to be legal (and a RuntimeError is
424  raised if not), and they are converted into an expected format that is used for the rest of the Butler
425  init sequence. See the docstring for `_processInputArguments`.
426 
427  2. Create RepoData Objects
428  --------------------------
429 
430  Butler uses an object, called `RepoData`, to keep track of information about each repository; each
431  repository is contained in a single `RepoData`. The attributes are explained in its docstring.
432 
433  After `_processInputArguments`, a RepoData is instantiated and put in a list for each repository in
434  `outputs` and `inputs`. This list of RepoData, the `repoDataList`, now represents all the output and input
435  repositories (but not parent repositories) that this Butler instance will use.
436 
437  3. Get `RepositoryCfg`s
438  -----------------------
439 
440  `Butler._getCfgs` gets the `RepositoryCfg` for each repository the `repoDataList`. The behavior is
441  described in the docstring.
442 
443  4. Add Parents
444  --------------
445 
446  `Butler._addParents` then considers the parents list in the `RepositoryCfg` of each `RepoData` in the
447  `repoDataList` and inserts new `RepoData` objects for each parent not represented in the proper location
448  in the `repoDataList`. Ultimately a flat list is built to represent the DAG of readable repositories
449  represented in depth-first order.
450 
451  5. Set and Verify Parents of Outputs
452  ------------------------------------
453 
454  To be able to load parent repositories when output repositories are used as inputs, the input repositories
455  are recorded as parents in the `RepositoryCfg` file of new output repositories. When an output repository
456  already exists, for consistency the Butler's inputs must match the list of parents specified the already-
457  existing output repository's `RepositoryCfg` file.
458 
459  In `Butler._setAndVerifyParentsLists`, the list of parents is recorded in the `RepositoryCfg` of new
460  repositories. For existing repositories the list of parents is compared with the `RepositoryCfg`'s parents
461  list, and if they do not match a `RuntimeError` is raised.
462 
463  6. Set the Default Mapper
464  -------------------------
465 
466  If all the input repositories use the same mapper then we can assume that mapper to be the
467  "default mapper". If there are new output repositories whose `RepositoryArgs` do not specify a mapper and
468  there is a default mapper then the new output repository will be set to use that default mapper.
469 
470  This is handled in `Butler._setDefaultMapper`.
471 
472  7. Cache References to Parent RepoDatas
473  ---------------------------------------
474 
475  In `Butler._connectParentRepoDatas`, in each `RepoData` in `repoDataList`, a list of `RepoData` object
476  references is built that matches the parents specified in that `RepoData`'s `RepositoryCfg`.
477 
478  This list is used later to find things in that repository's parents, without considering peer repository's
479  parents. (e.g. finding the registry of a parent)
480 
481  8. Set Tags
482  -----------
483 
484  Tags are described at https://ldm-463.lsst.io/v/draft/#tagging
485 
486  In `Butler._setRepoDataTags`, for each `RepoData`, the tags specified by its `RepositoryArgs` are recorded
487  in a set, and added to the tags set in each of its parents, for ease of lookup when mapping.
488 
489  9. Find Parent Registry and Instantiate RepoData
490  ------------------------------------------------
491 
492  At this point there is enough information to instantiate the `Repository` instances. There is one final
493  step before instantiating the Repository, which is to try to get a parent registry that can be used by the
494  child repository. The criteria for "can be used" is spelled out in `Butler._setParentRegistry`. However,
495  to get the registry from the parent, the parent must be instantiated. The `repoDataList`, in depth-first
496  search order, is built so that the most-dependent repositories are first, and the least dependent
497  repositories are last. So the `repoDataList` is reversed and the Repositories are instantiated in that
498  order; for each RepoData a parent registry is searched for, and then the Repository is instantiated with
499  whatever registry could be found."""
500 
501  GENERATION = 2
502  """This is a Generation 2 Butler.
503  """
504 
505  def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
506  self._initArgs = {'root': root, 'mapper': mapper, 'inputs': inputs, 'outputs': outputs,
507  'mapperArgs': mapperArgs}
508 
509  self.log = Log.getLogger("daf.persistence.butler")
510 
511  inputs, outputs = self._processInputArguments(
512  root=root, mapper=mapper, inputs=inputs, outputs=outputs, **mapperArgs)
513 
514  # convert the RepoArgs into RepoData
515  inputs = [RepoData(args, 'input') for args in inputs]
516  outputs = [RepoData(args, 'output') for args in outputs]
517  repoDataList = outputs + inputs
518 
519  self._getCfgs(repoDataList)
520 
521  self._addParents(repoDataList)
522 
523  self._setAndVerifyParentsLists(repoDataList)
524 
525  self._setDefaultMapper(repoDataList)
526 
527  self._connectParentRepoDatas(repoDataList)
528 
529  self._repos = RepoDataContainer(repoDataList)
530 
531  self._setRepoDataTags()
532 
533  for repoData in repoDataList:
534  self._initRepo(repoData)
535 
536  def _initRepo(self, repoData):
537  if repoData.repo is not None:
538  # this repository may have already been initialized by its children, in which case there is
539  # nothing more to do.
540  return
541  for parentRepoData in repoData.parentRepoDatas:
542  if parentRepoData.cfg.mapper != repoData.cfg.mapper:
543  continue
544  if parentRepoData.repo is None:
545  self._initRepo(parentRepoData)
546  parentRegistry = parentRepoData.repo.getRegistry()
547  repoData.parentRegistry = parentRegistry if parentRegistry else parentRepoData.parentRegistry
548  if repoData.parentRegistry:
549  break
550  repoData.repo = Repository(repoData)
551 
552  def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
553  """Process, verify, and standardize the input arguments.
554  * Inputs can not be for Old Butler (root, mapper, mapperArgs) AND New Butler (inputs, outputs)
555  `root`, `mapper`, and `mapperArgs` are Old Butler init API.
556  `inputs` and `outputs` are New Butler init API.
557  Old Butler and New Butler init API may not be mixed, Butler may be initialized with only the Old
558  arguments or the New arguments.
559  * Verify that if there is a readable output that there is exactly one output. (This restriction is in
560  place because all readable repositories must be parents of writable repositories, and for
561  consistency the DAG of readable repositories must always be the same. Keeping the list of parents
562  becomes very complicated in the presence of multiple readable output repositories. It is better to
563  only write to output repositories, and then create a new Butler instance and use the outputs as
564  inputs, and write to new output repositories.)
565  * Make a copy of inputs & outputs so they may be modified without changing the passed-in arguments.
566  * Convert any input/output values that are URI strings to RepositoryArgs.
567  * Listify inputs & outputs.
568  * Set default RW mode on inputs & outputs as needed.
569 
570  Parameters
571  ----------
572  Same as Butler.__init__
573 
574  Returns
575  -------
576  (list of RepositoryArgs, list of RepositoryArgs)
577  First item is a list to use as inputs.
578  Second item is a list to use as outputs.
579 
580  Raises
581  ------
582  RuntimeError
583  If Old Butler and New Butler arguments are both used this will raise.
584  If an output is readable there is more than one output this will raise.
585  """
586  # inputs and outputs may be modified, do not change the external value.
587  inputs = copy.deepcopy(inputs)
588  outputs = copy.deepcopy(outputs)
589 
590  isV1Args = inputs is None and outputs is None
591  if isV1Args:
592  inputs, outputs = self._convertV1Args(root=root,
593  mapper=mapper,
594  mapperArgs=mapperArgs or None)
595  elif root or mapper or mapperArgs:
596  raise RuntimeError(
597  'Butler version 1 API (root, mapper, **mapperArgs) may '
598  'not be used with version 2 API (inputs, outputs)')
600 
601  self.storage = Storage()
602 
603  # make sure inputs and outputs are lists, and if list items are a string convert it RepositoryArgs.
604  inputs = listify(inputs)
605  outputs = listify(outputs)
606  inputs = [RepositoryArgs(cfgRoot=args)
607  if not isinstance(args, RepositoryArgs) else args for args in inputs]
608  outputs = [RepositoryArgs(cfgRoot=args)
609  if not isinstance(args, RepositoryArgs) else args for args in outputs]
610  # Set the default value of inputs & outputs, verify the required values ('r' for inputs, 'w' for
611  # outputs) and remove the 'w' from inputs if needed.
612  for args in inputs:
613  if args.mode is None:
614  args.mode = 'r'
615  elif 'rw' == args.mode:
616  args.mode = 'r'
617  elif 'r' != args.mode:
618  raise RuntimeError("The mode of an input should be readable.")
619  for args in outputs:
620  if args.mode is None:
621  args.mode = 'w'
622  elif 'w' not in args.mode:
623  raise RuntimeError("The mode of an output should be writable.")
624  # check for class instances in args.mapper (not allowed)
625  for args in inputs + outputs:
626  if (args.mapper and not isinstance(args.mapper, str)
627  and not inspect.isclass(args.mapper)):
628  self.log.warn(preinitedMapperWarning)
629  # if the output is readable, there must be only one output:
630  for o in outputs:
631  if 'r' in o.mode:
632  if len(outputs) > 1:
633  raise RuntimeError("Butler does not support multiple output repositories if any of the "
634  "outputs are readable.")
635 
636  # Handle the case where the output is readable and is also passed in as one of the inputs by removing
637  # the input. This supports a legacy use case in pipe_tasks where the input is also passed as the
638  # output, to the command line parser.
639  def inputIsInOutputs(inputArgs, outputArgsList):
640  for o in outputArgsList:
641  if ('r' in o.mode
642  and o.root == inputArgs.root
643  and o.mapper == inputArgs.mapper
644  and o.mapperArgs == inputArgs.mapperArgs
645  and o.tags == inputArgs.tags
646  and o.policy == inputArgs.policy):
647  self.log.debug(("Input repositoryArgs {} is also listed in outputs as readable; "
648  "throwing away the input.").format(inputArgs))
649  return True
650  return False
651 
652  inputs = [args for args in inputs if not inputIsInOutputs(args, outputs)]
653  return inputs, outputs
654 
655  @staticmethod
656  def _getParentVal(repoData):
657  """Get the value of this repoData as it should appear in the parents
658  list of other repositories"""
659  if repoData.isV1Repository:
660  return repoData.cfg
661  if repoData.cfgOrigin == 'nested':
662  return repoData.cfg
663  else:
664  return repoData.cfg.root
665 
666  @staticmethod
667  def _getParents(ofRepoData, repoInfo):
668  """Create a parents list of repoData from inputs and (readable) outputs."""
669  parents = []
670  # get the parents list of repoData:
671  for repoData in repoInfo:
672  if repoData is ofRepoData:
673  continue
674  if 'r' not in repoData.repoArgs.mode:
675  continue
676  parents.append(Butler._getParentVal(repoData))
677  return parents
678 
679  @staticmethod
680  def _getOldButlerRepositoryCfg(repositoryArgs):
681  if not Storage.isPosix(repositoryArgs.cfgRoot):
682  return None
683  if not PosixStorage.v1RepoExists(repositoryArgs.cfgRoot):
684  return None
685  if not repositoryArgs.mapper:
686  repositoryArgs.mapper = PosixStorage.getMapperClass(repositoryArgs.cfgRoot)
687  cfg = RepositoryCfg.makeFromArgs(repositoryArgs)
688  parent = PosixStorage.getParentSymlinkPath(repositoryArgs.cfgRoot)
689  if parent:
690  parent = Butler._getOldButlerRepositoryCfg(RepositoryArgs(cfgRoot=parent, mode='r'))
691  if parent is not None:
692  cfg.addParents([parent])
693  return cfg
694 
695  def _getRepositoryCfg(self, repositoryArgs):
696  """Try to get a repository from the location described by cfgRoot.
697 
698  Parameters
699  ----------
700  repositoryArgs : RepositoryArgs or string
701  Provides arguments to load an existing repository (or repositories). String is assumed to be a URI
702  and is used as the cfgRoot (URI to the location of the cfg file).
703 
704  Returned
705  --------
706  (RepositoryCfg or None, bool)
707  The RepositoryCfg, or None if one cannot be found, and True if the RepositoryCfg was created by
708  reading an Old Butler repository, or False if it is a New Butler Repository.
709  """
710  if not isinstance(repositoryArgs, RepositoryArgs):
711  repositoryArgs = RepositoryArgs(cfgRoot=repositoryArgs, mode='r')
712 
713  cfg = self.storage.getRepositoryCfg(repositoryArgs.cfgRoot)
714  isOldButlerRepository = False
715  if cfg is None:
716  cfg = Butler._getOldButlerRepositoryCfg(repositoryArgs)
717  if cfg is not None:
718  isOldButlerRepository = True
719  return cfg, isOldButlerRepository
720 
721  def _getCfgs(self, repoDataList):
722  """Get or make a RepositoryCfg for each RepoData, and add the cfg to the RepoData.
723  If the cfg exists, compare values. If values match then use the cfg as an "existing" cfg. If the
724  values do not match, use the cfg as a "nested" cfg.
725  If the cfg does not exist, the RepositoryArgs must be for a writable repository.
726 
727  Parameters
728  ----------
729  repoDataList : list of RepoData
730  The RepoData that are output and inputs of this Butler
731 
732  Raises
733  ------
734  RuntimeError
735  If the passed-in RepositoryArgs indicate an existing repository but other cfg parameters in those
736  RepositoryArgs don't
737  match the existing repository's cfg a RuntimeError will be raised.
738  """
739  def cfgMatchesArgs(args, cfg):
740  """Test if there are any values in an RepositoryArgs that conflict with the values in a cfg"""
741  if args.mapper is not None and cfg.mapper != args.mapper:
742  return False
743  if args.mapperArgs is not None and cfg.mapperArgs != args.mapperArgs:
744  return False
745  if args.policy is not None and cfg.policy != args.policy:
746  return False
747  return True
748 
749  for repoData in repoDataList:
750  cfg, isOldButlerRepository = self._getRepositoryCfg(repoData.repoArgs)
751  if cfg is None:
752  if 'w' not in repoData.repoArgs.mode:
753  raise RuntimeError(
754  "No cfg found for read-only input repository at {}".format(repoData.repoArgs.cfgRoot))
755  repoData.setCfg(cfg=RepositoryCfg.makeFromArgs(repoData.repoArgs),
756  origin='new',
757  root=repoData.repoArgs.cfgRoot,
758  isV1Repository=isOldButlerRepository)
759  else:
760 
761  # This is a hack fix for an issue introduced by DM-11284; Old Butler parent repositories used
762  # to be stored as a path to the repository in the parents list and it was changed so that the
763  # whole RepositoryCfg, that described the Old Butler repository (including the mapperArgs that
764  # were used with it), was recorded as a "nested" repository cfg. That checkin did not account
765  # for the fact that there were repositoryCfg.yaml files in the world with only the path to
766  # Old Butler repositories in the parents list.
767  if cfg.parents:
768  for i, parent in enumerate(cfg.parents):
769  if isinstance(parent, RepositoryCfg):
770  continue
771  parentCfg, parentIsOldButlerRepository = self._getRepositoryCfg(parent)
772  if parentIsOldButlerRepository:
773  parentCfg.mapperArgs = cfg.mapperArgs
774  self.log.info(("Butler is replacing an Old Butler parent repository path '{}' "
775  "found in the parents list of a New Butler repositoryCfg: {} "
776  "with a repositoryCfg that includes the child repository's "
777  "mapperArgs: {}. This affects the instantiated RepositoryCfg "
778  "but does not change the persisted child repositoryCfg.yaml file."
779  ).format(parent, cfg, parentCfg))
780  cfg._parents[i] = cfg._normalizeParents(cfg.root, [parentCfg])[0]
781 
782  if 'w' in repoData.repoArgs.mode:
783  # if it's an output repository, the RepositoryArgs must match the existing cfg.
784  if not cfgMatchesArgs(repoData.repoArgs, cfg):
785  raise RuntimeError(("The RepositoryArgs and RepositoryCfg must match for writable "
786  "repositories, RepositoryCfg:{}, RepositoryArgs:{}").format(
787  cfg, repoData.repoArgs))
788  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
789  isV1Repository=isOldButlerRepository)
790  else:
791  # if it's an input repository, the cfg can overwrite the in-repo cfg.
792  if cfgMatchesArgs(repoData.repoArgs, cfg):
793  repoData.setCfg(cfg=cfg, origin='existing', root=repoData.repoArgs.cfgRoot,
794  isV1Repository=isOldButlerRepository)
795  else:
796  repoData.setCfg(cfg=cfg, origin='nested', root=None,
797  isV1Repository=isOldButlerRepository)
798 
799  def _addParents(self, repoDataList):
800  """For each repoData in the input list, see if its parents are the next items in the list, and if not
801  add the parent, so that the repoDataList includes parents and is in order to operate depth-first 0..n.
802 
803  Parameters
804  ----------
805  repoDataList : list of RepoData
806  The RepoData for the Butler outputs + inputs.
807 
808  Raises
809  ------
810  RuntimeError
811  Raised if a RepositoryCfg can not be found at a location where a parent repository should be.
812  """
813  repoDataIdx = 0
814  while True:
815  if repoDataIdx == len(repoDataList):
816  break
817  repoData = repoDataList[repoDataIdx]
818  if 'r' not in repoData.repoArgs.mode:
819  repoDataIdx += 1
820  continue # the repoData only needs parents if it's readable.
821  if repoData.isNewRepository:
822  repoDataIdx += 1
823  continue # if it's new the parents will be the inputs of this butler.
824  if repoData.cfg.parents is None:
825  repoDataIdx += 1
826  continue # if there are no parents then there's nothing to do.
827  for repoParentIdx, repoParent in enumerate(repoData.cfg.parents):
828  parentIdxInRepoDataList = repoDataIdx + repoParentIdx + 1
829  if not isinstance(repoParent, RepositoryCfg):
830  repoParentCfg, isOldButlerRepository = self._getRepositoryCfg(repoParent)
831  if repoParentCfg is not None:
832  cfgOrigin = 'existing'
833  else:
834  isOldButlerRepository = False
835  repoParentCfg = repoParent
836  cfgOrigin = 'nested'
837  if (parentIdxInRepoDataList < len(repoDataList)
838  and repoDataList[parentIdxInRepoDataList].cfg == repoParentCfg):
839  continue
840  args = RepositoryArgs(cfgRoot=repoParentCfg.root, mode='r')
841  role = 'input' if repoData.role == 'output' else 'parent'
842  newRepoInfo = RepoData(args, role)
843  newRepoInfo.repoData.setCfg(cfg=repoParentCfg, origin=cfgOrigin, root=args.cfgRoot,
844  isV1Repository=isOldButlerRepository)
845  repoDataList.insert(parentIdxInRepoDataList, newRepoInfo)
846  repoDataIdx += 1
847 
848  def _setAndVerifyParentsLists(self, repoDataList):
849  """Make a list of all the input repositories of this Butler, these are the parents of the outputs.
850  For new output repositories, set the parents in the RepositoryCfg. For existing output repositories
851  verify that the RepositoryCfg's parents match the parents list.
852 
853  Parameters
854  ----------
855  repoDataList : list of RepoData
856  All the RepoDatas loaded by this butler, in search order.
857 
858  Raises
859  ------
860  RuntimeError
861  If an existing output repository is loaded and its parents do not match the parents of this Butler
862  an error will be raised.
863  """
864  def getIOParents(ofRepoData, repoDataList):
865  """make a parents list for repo in `ofRepoData` that is comprised of inputs and readable
866  outputs (not parents-of-parents) of this butler"""
867  parents = []
868  for repoData in repoDataList:
869  if repoData.role == 'parent':
870  continue
871  if repoData is ofRepoData:
872  continue
873  if repoData.role == 'output':
874  if 'r' in repoData.repoArgs.mode:
875  raise RuntimeError("If an output is readable it must be the only output.")
876  # and if this is the only output, this should have continued in
877  # "if repoData is ofRepoData"
878  continue
879  parents.append(self._getParentVal(repoData))
880  return parents
881 
882  for repoData in repoDataList:
883  if repoData.role != 'output':
884  continue
885  parents = getIOParents(repoData, repoDataList)
886  # if repoData is new, add the parent RepositoryCfgs to it.
887  if repoData.cfgOrigin == 'new':
888  repoData.cfg.addParents(parents)
889  elif repoData.cfgOrigin in ('existing', 'nested'):
890  if repoData.cfg.parents != parents:
891  try:
892  repoData.cfg.extendParents(parents)
893  except ParentsMismatch as e:
894  raise RuntimeError(("Inputs of this Butler:{} do not match parents of existing "
895  "writable cfg:{} (ParentMismatch exception: {}").format(
896  parents, repoData.cfg.parents, e))
897 
898  def _setDefaultMapper(self, repoDataList):
899  """Establish a default mapper if there is one and assign it to outputs that do not have a mapper
900  assigned.
901 
902  If all inputs have the same mapper it will be used as the default mapper.
903 
904  Parameters
905  ----------
906  repoDataList : list of RepoData
907  All the RepoDatas loaded by this butler, in search order.
908 
909  Raises
910  ------
911  RuntimeError
912  If a default mapper can not be established and there is an output that does not have a mapper.
913  """
914  needyOutputs = [rd for rd in repoDataList if rd.role == 'output' and rd.cfg.mapper is None]
915  if len(needyOutputs) == 0:
916  return
917  mappers = set([rd.cfg.mapper for rd in repoDataList if rd.role == 'input'])
918  if len(mappers) != 1:
919  inputs = [rd for rd in repoDataList if rd.role == 'input']
920  raise RuntimeError(
921  ("No default mapper could be established from inputs:{} and no mapper specified "
922  "for outputs:{}").format(inputs, needyOutputs))
923  defaultMapper = mappers.pop()
924  for repoData in needyOutputs:
925  repoData.cfg.mapper = defaultMapper
926 
927  def _connectParentRepoDatas(self, repoDataList):
928  """For each RepoData in repoDataList, find its parent in the repoDataList and cache a reference to it.
929 
930  Parameters
931  ----------
932  repoDataList : list of RepoData
933  All the RepoDatas loaded by this butler, in search order.
934 
935  Raises
936  ------
937  RuntimeError
938  When a parent is listed in the parents list but not found in the repoDataList. This is not
939  expected to ever happen and would indicate an internal Butler error.
940  """
941  for repoData in repoDataList:
942  for parent in repoData.cfg.parents:
943  parentToAdd = None
944  for otherRepoData in repoDataList:
945  if isinstance(parent, RepositoryCfg):
946  if otherRepoData.repoData.repoData.cfg == parent:
947  parentToAdd = otherRepoData.repoData
948  break
949  elif otherRepoData.repoData.cfg.root == parent:
950  parentToAdd = otherRepoData.repoData
951  break
952  if parentToAdd is None:
953  raise RuntimeError(
954  "Could not find a parent matching {} to add to {}".format(parent, repoData))
955  repoData.addParentRepoData(parentToAdd)
956 
957  @staticmethod
958  def _getParentRepoData(parent, repoDataList):
959  """get a parent RepoData from a cfg from a list of RepoData
960 
961  Parameters
962  ----------
963  parent : string or RepositoryCfg
964  cfgRoot of a repo or a cfg that describes the repo
965  repoDataList : list of RepoData
966  list to search in
967 
968  Returns
969  -------
970  RepoData or None
971  A RepoData if one can be found, else None
972  """
973  repoData = None
974  for otherRepoData in repoDataList:
975  if isinstance(parent, RepositoryCfg):
976  if otherRepoData.cfg == parent:
977  repoData = otherRepoData
978  break
979  elif otherRepoData.cfg.root == parent:
980  repoData = otherRepoData
981  break
982  return repoData
983 
984  def _setRepoDataTags(self):
985  """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
986  searches."""
987  def setTags(repoData, tags, context):
988  if id(repoData) in context:
989  return
990  repoData.addTags(tags)
991  context.add(id(repoData))
992  for parentRepoData in repoData.parentRepoDatas:
993  setTags(parentRepoData, tags, context)
994  for repoData in self._repos.outputs() + self._repos.inputs():
995  setTags(repoData.repoData, repoData.repoArgs.tags, set())
996 
997  def _convertV1Args(self, root, mapper, mapperArgs):
998  """Convert Old Butler RepositoryArgs (root, mapper, mapperArgs) to New Butler RepositoryArgs
999  (inputs, outputs)
1000 
1001  Parameters
1002  ----------
1003  root : string
1004  Posix path to repository root
1005  mapper : class, class instance, or string
1006  Instantiated class, a class object to be instantiated, or a string that refers to a class that
1007  can be imported & used as the mapper.
1008  mapperArgs : dict
1009  RepositoryArgs & their values used when instantiating the mapper.
1010 
1011  Returns
1012  -------
1013  tuple
1014  (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__
1015  """
1016  if (mapper and not isinstance(mapper, str)
1017  and not inspect.isclass(mapper)):
1018  self.log.warn(preinitedMapperWarning)
1019  inputs = None
1020  if root is None:
1021  if hasattr(mapper, 'root'):
1022  # in legacy repositories, the mapper may be given the root directly.
1023  root = mapper.root
1024  else:
1025  # in the past root="None" could be used to mean root='.'
1026  root = '.'
1027  outputs = RepositoryArgs(mode='rw',
1028  root=root,
1029  mapper=mapper,
1030  mapperArgs=mapperArgs)
1031  return inputs, outputs
1032 
1033  def __repr__(self):
1034  return 'Butler(datasetTypeAliasDict=%s, repos=%s)' % (
1035  self.datasetTypeAliasDict, self._repos)
1036 
1037  def _getDefaultMapper(self):
1038 
1039  """Get the default mapper. Currently this means if all the repositories use exactly the same mapper,
1040  that mapper may be considered the default.
1041 
1042  This definition may be changing; mappers may be able to exclude themselves as candidates for default,
1043  and they may nominate a different mapper instead. Also, we may not want to look at *all* the
1044  repositories, but only a depth-first search on each of the input & output repositories, and use the
1045  first-found mapper for each of those. TBD.
1046 
1047  Parameters
1048  ----------
1049  inputs : TYPE
1050  Description
1051 
1052  Returns
1053  -------
1054  Mapper class or None
1055  Returns the class type of the default mapper, or None if a default
1056  mapper can not be determined.
1057  """
1058  defaultMapper = None
1059 
1060  for inputRepoData in self._repos.inputs():
1061  mapper = None
1062  if inputRepoData.cfg.mapper is not None:
1063  mapper = inputRepoData.cfg.mapper
1064  # if the mapper is:
1065  # * a string, import it.
1066  # * a class instance, get its class type
1067  # * a class, do nothing; use it
1068  if isinstance(mapper, str):
1069  mapper = doImport(mapper)
1070  elif not inspect.isclass(mapper):
1071  mapper = mapper.__class__
1072  # If no mapper has been found, note the first found mapper.
1073  # Then, if a mapper has been found and each next mapper matches it,
1074  # continue looking for mappers.
1075  # If a mapper has been found and another non-matching mapper is
1076  # found then we have no default, return None.
1077  if defaultMapper is None:
1078  defaultMapper = mapper
1079  elif mapper == defaultMapper:
1080  continue
1081  elif mapper is not None:
1082  return None
1083  return defaultMapper
1084 
1085  def _assignDefaultMapper(self, defaultMapper):
1086  for repoData in self._repos.all().values():
1087  if repoData.cfg.mapper is None and (repoData.isNewRepository or repoData.isV1Repository):
1088  if defaultMapper is None:
1089  raise RuntimeError(
1090  "No mapper specified for %s and no default mapper could be determined." %
1091  repoData.args)
1092  repoData.cfg.mapper = defaultMapper
1093 
1094  @staticmethod
1095  def getMapperClass(root):
1096  """posix-only; gets the mapper class at the path specified by root (if a file _mapper can be found at
1097  that location or in a parent location.
1098 
1099  As we abstract the storage and support different types of storage locations this method will be
1100  moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
1101  return Storage.getMapperClass(root)
1102 
1103  def defineAlias(self, alias, datasetType):
1104  """Register an alias that will be substituted in datasetTypes.
1105 
1106  Parameters
1107  ----------
1108  alias - string
1109  The alias keyword. It may start with @ or not. It may not contain @ except as the first character.
1110  datasetType - string
1111  The string that will be substituted when @alias is passed into datasetType. It may not contain '@'
1112  """
1113  # verify formatting of alias:
1114  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
1115  atLoc = alias.rfind('@')
1116  if atLoc == -1:
1117  alias = "@" + str(alias)
1118  elif atLoc > 0:
1119  raise RuntimeError("Badly formatted alias string: %s" % (alias,))
1120 
1121  # verify that datasetType does not contain '@'
1122  if datasetType.count('@') != 0:
1123  raise RuntimeError("Badly formatted type string: %s" % (datasetType))
1124 
1125  # verify that the alias keyword does not start with another alias keyword,
1126  # and vice versa
1127  for key in self.datasetTypeAliasDict:
1128  if key.startswith(alias) or alias.startswith(key):
1129  raise RuntimeError("Alias: %s overlaps with existing alias: %s" % (alias, key))
1130 
1131  self.datasetTypeAliasDict[alias] = datasetType
1132 
1133  def getKeys(self, datasetType=None, level=None, tag=None):
1134  """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
1135  entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
1136  float, string).
1137 
1138  Parameters
1139  ----------
1140  datasetType - string
1141  The type of dataset to get keys for, entire collection if None.
1142  level - string
1143  The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the
1144  mapper should lookup the default level.
1145  tags - any, or list of any
1146  If tag is specified then the repo will only be used if the tag
1147  or a tag in the list matches a tag used for that repository.
1148 
1149  Returns
1150  -------
1151  Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for
1152  the dataset type or the entire collection if None. The dict values are the basic Python types
1153  corresponding to the keys (int, float, string).
1154  """
1155  datasetType = self._resolveDatasetTypeAlias(datasetType)
1156 
1157  keys = None
1158  tag = setify(tag)
1159  for repoData in self._repos.inputs():
1160  if not tag or len(tag.intersection(repoData.tags)) > 0:
1161  keys = repoData.repo.getKeys(datasetType, level)
1162  # An empty dict is a valid "found" condition for keys. The only value for keys that should
1163  # cause the search to continue is None
1164  if keys is not None:
1165  break
1166  return keys
1167 
1168  def getDatasetTypes(self, tag=None):
1169  """Get the valid dataset types for all known repos or those matching
1170  the tags.
1171 
1172  Parameters
1173  ----------
1174  tag - any, or list of any
1175  If tag is specified then the repo will only be used if the tag
1176  or a tag in the list matches a tag used for that repository.
1177 
1178  Returns
1179  -------
1180  Returns the dataset types as a set of strings.
1181  """
1182  datasetTypes = set()
1183  tag = setify(tag)
1184  for repoData in self._repos.outputs() + self._repos.inputs():
1185  if not tag or len(tag.intersection(repoData.tags)) > 0:
1186  datasetTypes = datasetTypes.union(
1187  repoData.repo.mappers()[0].getDatasetTypes())
1188  return datasetTypes
1189 
1190  def queryMetadata(self, datasetType, format, dataId={}, **rest):
1191  """Returns the valid values for one or more keys when given a partial
1192  input collection data id.
1193 
1194  Parameters
1195  ----------
1196  datasetType - string
1197  The type of dataset to inquire about.
1198  format - str, tuple
1199  Key or tuple of keys to be returned.
1200  dataId - DataId, dict
1201  The partial data id.
1202  **rest -
1203  Keyword arguments for the partial data id.
1204 
1205  Returns
1206  -------
1207  A list of valid values or tuples of valid values as specified by the
1208  format.
1209  """
1210 
1211  datasetType = self._resolveDatasetTypeAlias(datasetType)
1212  dataId = DataId(dataId)
1213  dataId.update(**rest)
1214  format = sequencify(format)
1215 
1216  tuples = None
1217  for repoData in self._repos.inputs():
1218  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
1219  tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
1220  if tuples:
1221  break
1222 
1223  if not tuples:
1224  return []
1225 
1226  if len(format) == 1:
1227  ret = []
1228  for x in tuples:
1229  try:
1230  ret.append(x[0])
1231  except TypeError:
1232  ret.append(x)
1233  return ret
1234 
1235  return tuples
1236 
1237  def datasetExists(self, datasetType, dataId={}, write=False, **rest):
1238  """Determines if a dataset file exists.
1239 
1240  Parameters
1241  ----------
1242  datasetType - string
1243  The type of dataset to inquire about.
1244  dataId - DataId, dict
1245  The data id of the dataset.
1246  write - bool
1247  If True, look only in locations where the dataset could be written,
1248  and return True only if it is present in all of them.
1249  **rest keyword arguments for the data id.
1250 
1251  Returns
1252  -------
1253  exists - bool
1254  True if the dataset exists or is non-file-based.
1255  """
1256  datasetType = self._resolveDatasetTypeAlias(datasetType)
1257  dataId = DataId(dataId)
1258  dataId.update(**rest)
1259  locations = self._locate(datasetType, dataId, write=write)
1260  if not write: # when write=False, locations is not a sequence
1261  if locations is None:
1262  return False
1263  locations = [locations]
1264 
1265  if not locations: # empty list
1266  return False
1267 
1268  for location in locations:
1269  # If the location is a ButlerComposite (as opposed to a ButlerLocation),
1270  # verify the component objects exist.
1271  if isinstance(location, ButlerComposite):
1272  for name, componentInfo in location.componentInfo.items():
1273  if componentInfo.subset:
1274  subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1275  exists = all([obj.datasetExists() for obj in subset])
1276  else:
1277  exists = self.datasetExists(componentInfo.datasetType, location.dataId)
1278  if exists is False:
1279  return False
1280  else:
1281  if not location.repository.exists(location):
1282  return False
1283  return True
1284 
1285  def _locate(self, datasetType, dataId, write):
1286  """Get one or more ButlerLocations and/or ButlercComposites.
1287 
1288  Parameters
1289  ----------
1290  datasetType : string
1291  The datasetType that is being searched for. The datasetType may be followed by a dot and
1292  a component name (component names are specified in the policy). IE datasetType.componentName
1293 
1294  dataId : dict or DataId class instance
1295  The dataId
1296 
1297  write : bool
1298  True if this is a search to write an object. False if it is a search to read an object. This
1299  affects what type (an object or a container) is returned.
1300 
1301  Returns
1302  -------
1303  If write is False, will return either a single object or None. If write is True, will return a list
1304  (which may be empty)
1305  """
1306  repos = self._repos.outputs() if write else self._repos.inputs()
1307  locations = []
1308  for repoData in repos:
1309  # enforce dataId & repository tags when reading:
1310  if not write and dataId.tag and len(dataId.tag.intersection(repoData.tags)) == 0:
1311  continue
1312  components = datasetType.split('.')
1313  datasetType = components[0]
1314  components = components[1:]
1315  try:
1316  location = repoData.repo.map(datasetType, dataId, write=write)
1317  except NoResults:
1318  continue
1319  if location is None:
1320  continue
1321  location.datasetType = datasetType # todo is there a better way than monkey patching here?
1322  if len(components) > 0:
1323  if not isinstance(location, ButlerComposite):
1324  raise RuntimeError("The location for a dotted datasetType must be a composite.")
1325  # replace the first component name with the datasetType
1326  components[0] = location.componentInfo[components[0]].datasetType
1327  # join components back into a dot-delimited string
1328  datasetType = '.'.join(components)
1329  location = self._locate(datasetType, dataId, write)
1330  # if a component location is not found, we can not continue with this repo, move to next repo.
1331  if location is None:
1332  break
1333  # if reading, only one location is desired.
1334  if location:
1335  if not write:
1336  # If there is a bypass function for this dataset type, we can't test to see if the object
1337  # exists in storage, because the bypass function may not actually use the location
1338  # according to the template. Instead, execute the bypass function and include its results
1339  # in the bypass attribute of the location. The bypass function may fail for any reason,
1340  # the most common case being that a file does not exist. If it raises an exception
1341  # indicating such, we ignore the bypass function and proceed as though it does not exist.
1342  if hasattr(location.mapper, "bypass_" + location.datasetType):
1343  bypass = self._getBypassFunc(location, dataId)
1344  try:
1345  bypass = bypass()
1346  location.bypass = bypass
1347  except (NoResults, IOError):
1348  self.log.debug("Continuing dataset search while evaluating "
1349  "bypass function for Dataset type:{} Data ID:{} at "
1350  "location {}".format(datasetType, dataId, location))
1351  # If a location was found but the location does not exist, keep looking in input
1352  # repositories (the registry may have had enough data for a lookup even thought the object
1353  # exists in a different repository.)
1354  if (isinstance(location, ButlerComposite) or hasattr(location, 'bypass')
1355  or location.repository.exists(location)):
1356  return location
1357  else:
1358  try:
1359  locations.extend(location)
1360  except TypeError:
1361  locations.append(location)
1362  if not write:
1363  return None
1364  return locations
1365 
1366  @staticmethod
1367  def _getBypassFunc(location, dataId):
1368  pythonType = location.getPythonType()
1369  if pythonType is not None:
1370  if isinstance(pythonType, str):
1371  pythonType = doImport(pythonType)
1372  bypassFunc = getattr(location.mapper, "bypass_" + location.datasetType)
1373  return lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
1374 
1375  def get(self, datasetType, dataId=None, immediate=True, **rest):
1376  """Retrieves a dataset given an input collection data id.
1377 
1378  Parameters
1379  ----------
1380  datasetType - string
1381  The type of dataset to retrieve.
1382  dataId - dict
1383  The data id.
1384  immediate - bool
1385  If False use a proxy for delayed loading.
1386  **rest
1387  keyword arguments for the data id.
1388 
1389  Returns
1390  -------
1391  An object retrieved from the dataset (or a proxy for one).
1392  """
1393  datasetType = self._resolveDatasetTypeAlias(datasetType)
1394  dataId = DataId(dataId)
1395  dataId.update(**rest)
1396 
1397  location = self._locate(datasetType, dataId, write=False)
1398  if location is None:
1399  raise NoResults("No locations for get:", datasetType, dataId)
1400  self.log.debug("Get type=%s keys=%s from %s", datasetType, dataId, str(location))
1401 
1402  if hasattr(location, 'bypass'):
1403  # this type loader block should get moved into a helper someplace, and duplications removed.
1404  def callback():
1405  return location.bypass
1406  else:
1407  def callback():
1408  return self._read(location)
1409  if location.mapper.canStandardize(location.datasetType):
1410  innerCallback = callback
1411 
1412  def callback():
1413  return location.mapper.standardize(location.datasetType, innerCallback(), dataId)
1414  if immediate:
1415  return callback()
1416  return ReadProxy(callback)
1417 
1418  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
1419  """Persists a dataset given an output collection data id.
1420 
1421  Parameters
1422  ----------
1423  obj -
1424  The object to persist.
1425  datasetType - string
1426  The type of dataset to persist.
1427  dataId - dict
1428  The data id.
1429  doBackup - bool
1430  If True, rename existing instead of overwriting.
1431  WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race
1432  conditions.
1433  **rest
1434  Keyword arguments for the data id.
1435  """
1436  datasetType = self._resolveDatasetTypeAlias(datasetType)
1437  dataId = DataId(dataId)
1438  dataId.update(**rest)
1439 
1440  locations = self._locate(datasetType, dataId, write=True)
1441  if not locations:
1442  raise NoResults("No locations for put:", datasetType, dataId)
1443  for location in locations:
1444  if isinstance(location, ButlerComposite):
1445  disassembler = location.disassembler if location.disassembler else genericDisassembler
1446  disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
1447  for name, info in location.componentInfo.items():
1448  if not info.inputOnly:
1449  self.put(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
1450  else:
1451  if doBackup:
1452  location.getRepository().backup(location.datasetType, dataId)
1453  location.getRepository().write(location, obj)
1454 
1455  def subset(self, datasetType, level=None, dataId={}, **rest):
1456  """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
1457 
1458  Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the
1459  dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or
1460  sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs.
1461  Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists().
1462 
1463  Parameters
1464  ----------
1465  datasetType - string
1466  The type of dataset collection to subset
1467  level - string
1468  The level of dataId at which to subset. Use an empty string if the mapper should look up the
1469  default level.
1470  dataId - dict
1471  The data id.
1472  **rest
1473  Keyword arguments for the data id.
1474 
1475  Returns
1476  -------
1477  subset - ButlerSubset
1478  Collection of ButlerDataRefs for datasets matching the data id.
1479 
1480  Examples
1481  -----------
1482  To print the full dataIds for all r-band measurements in a source catalog
1483  (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`):
1484 
1485  >>> subset = butler.subset('src', filter='r')
1486  >>> for data_ref in subset: print(data_ref.dataId)
1487  """
1488  datasetType = self._resolveDatasetTypeAlias(datasetType)
1489 
1490  # Currently expected behavior of subset is that if specified level is None then the mapper's default
1491  # level should be used. Convention for level within Butler is that an empty string is used to indicate
1492  # 'get default'.
1493  if level is None:
1494  level = ''
1495 
1496  dataId = DataId(dataId)
1497  dataId.update(**rest)
1498  return ButlerSubset(self, datasetType, level, dataId)
1499 
1500  def dataRef(self, datasetType, level=None, dataId={}, **rest):
1501  """Returns a single ButlerDataRef.
1502 
1503  Given a complete dataId specified in dataId and **rest, find the unique dataset at the given level
1504  specified by a dataId key (e.g. visit or sensor or amp for a camera) and return a ButlerDataRef.
1505 
1506  Parameters
1507  ----------
1508  datasetType - string
1509  The type of dataset collection to reference
1510  level - string
1511  The level of dataId at which to reference
1512  dataId - dict
1513  The data id.
1514  **rest
1515  Keyword arguments for the data id.
1516 
1517  Returns
1518  -------
1519  dataRef - ButlerDataRef
1520  ButlerDataRef for dataset matching the data id
1521  """
1522 
1523  datasetType = self._resolveDatasetTypeAlias(datasetType)
1524  dataId = DataId(dataId)
1525  subset = self.subset(datasetType, level, dataId, **rest)
1526  if len(subset) != 1:
1527  raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
1528  (str(datasetType), str(level), str(dataId), str(rest)))
1529  return ButlerDataRef(subset, subset.cache[0])
1530 
1531  def getUri(self, datasetType, dataId=None, write=False, **rest):
1532  """Return the URI for a dataset
1533 
1534  .. warning:: This is intended only for debugging. The URI should
1535  never be used for anything other than printing.
1536 
1537  .. note:: In the event there are multiple URIs for read, we return only
1538  the first.
1539 
1540  .. note:: getUri() does not currently support composite datasets.
1541 
1542  Parameters
1543  ----------
1544  datasetType : `str`
1545  The dataset type of interest.
1546  dataId : `dict`, optional
1547  The data identifier.
1548  write : `bool`, optional
1549  Return the URI for writing?
1550  rest : `dict`, optional
1551  Keyword arguments for the data id.
1552 
1553  Returns
1554  -------
1555  uri : `str`
1556  URI for dataset.
1557  """
1558  datasetType = self._resolveDatasetTypeAlias(datasetType)
1559  dataId = DataId(dataId)
1560  dataId.update(**rest)
1561  locations = self._locate(datasetType, dataId, write=write)
1562  if locations is None:
1563  raise NoResults("No locations for getUri: ", datasetType, dataId)
1564 
1565  if write:
1566  # Follow the write path
1567  # Return the first valid write location.
1568  for location in locations:
1569  if isinstance(location, ButlerComposite):
1570  for name, info in location.componentInfo.items():
1571  if not info.inputOnly:
1572  return self.getUri(info.datasetType, location.dataId, write=True)
1573  else:
1574  return location.getLocationsWithRoot()[0]
1575  # fall back to raise
1576  raise NoResults("No locations for getUri(write=True): ", datasetType, dataId)
1577  else:
1578  # Follow the read path, only return the first valid read
1579  return locations.getLocationsWithRoot()[0]
1580 
1581  def _read(self, location):
1582  """Unpersist an object using data inside a ButlerLocation or ButlerComposite object.
1583 
1584  Parameters
1585  ----------
1586  location : ButlerLocation or ButlerComposite
1587  A ButlerLocation or ButlerComposite instance populated with data needed to read the object.
1588 
1589  Returns
1590  -------
1591  object
1592  An instance of the object specified by the location.
1593  """
1594  self.log.debug("Starting read from %s", location)
1595 
1596  if isinstance(location, ButlerComposite):
1597  for name, componentInfo in location.componentInfo.items():
1598  if componentInfo.subset:
1599  subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
1600  componentInfo.obj = [obj.get() for obj in subset]
1601  else:
1602  obj = self.get(componentInfo.datasetType, location.dataId, immediate=True)
1603  componentInfo.obj = obj
1604  assembler = location.assembler or genericAssembler
1605  results = assembler(dataId=location.dataId, componentInfo=location.componentInfo,
1606  cls=location.python)
1607  return results
1608  else:
1609  results = location.repository.read(location)
1610  if len(results) == 1:
1611  results = results[0]
1612  self.log.debug("Ending read from %s", location)
1613  return results
1614 
1615  def __reduce__(self):
1616  ret = (_unreduce, (self._initArgs, self.datasetTypeAliasDict))
1617  return ret
1618 
1619  def _resolveDatasetTypeAlias(self, datasetType):
1620  """Replaces all the known alias keywords in the given string with the alias value.
1621 
1622  Parameters
1623  ----------
1624  datasetType - string
1625  A datasetType string to search & replace on
1626 
1627  Returns
1628  -------
1629  datasetType - string
1630  The de-aliased string
1631  """
1632  for key in self.datasetTypeAliasDict:
1633  # if all aliases have been replaced, bail out
1634  if datasetType.find('@') == -1:
1635  break
1636  datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key])
1637 
1638  # If an alias specifier can not be resolved then throw.
1639  if datasetType.find('@') != -1:
1640  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType))
1641 
1642  return datasetType
1643 
1644 
1645 def _unreduce(initArgs, datasetTypeAliasDict):
1646  mapperArgs = initArgs.pop('mapperArgs')
1647  initArgs.update(mapperArgs)
1648  butler = Butler(**initArgs)
1649  butler.datasetTypeAliasDict = datasetTypeAliasDict
1650  return butler
lsst::daf::persistence.butler.RepoData.isNewRepository
def isNewRepository(self)
Definition: butler.py:191
lsst::log.log.logContinued.warn
def warn(fmt, *args)
Definition: logContinued.py:202
lsst::daf::persistence.butler.Butler._connectParentRepoDatas
def _connectParentRepoDatas(self, repoDataList)
Definition: butler.py:927
lsst::daf::persistence.butler.Butler.put
def put(self, obj, datasetType, dataId={}, doBackup=False, **rest)
Definition: butler.py:1418
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:198
lsst::daf::persistence.butler.Butler.getDatasetTypes
def getDatasetTypes(self, tag=None)
Definition: butler.py:1168
lsst::daf::persistence.butler.Butler.getUri
def getUri(self, datasetType, dataId=None, write=False, **rest)
Definition: butler.py:1531
lsst::daf::persistence.butler.RepoDataContainer.inputs
def inputs(self)
Definition: butler.py:254
lsst::daf::persistence.butler.RepoData.repo
repo
Definition: butler.py:124
cmd.commands.callback
callback
Definition: commands.py:44
lsst::daf::persistence.butler.RepoData.getParentRepoDatas
def getParentRepoDatas(self, context=None)
Definition: butler.py:204
lsst::daf::persistence.butler.RepoDataContainer.all
def all(self)
Definition: butler.py:278
lsst::daf::persistence.butler.Butler._initRepo
def _initRepo(self, repoData)
Definition: butler.py:536
lsst.gdb.afw.printers.debug
bool debug
Definition: printers.py:9
lsst::daf::persistence.butler.Butler.dataRef
def dataRef(self, datasetType, level=None, dataId={}, **rest)
Definition: butler.py:1500
lsst::daf::persistence.butler.Butler._locate
def _locate(self, datasetType, dataId, write)
Definition: butler.py:1285
pex.config.history.format
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
lsst::daf::persistence.butler.Butler.storage
storage
Definition: butler.py:601
lsst::daf::persistence.butler.Butler.datasetTypeAliasDict
datasetTypeAliasDict
Definition: butler.py:599
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst::daf::persistence.butler.ButlerCfg
Definition: butler.py:47
lsst::daf::persistence.butler.RepoData._cfgOrigin
_cfgOrigin
Definition: butler.py:122
lsst::daf::persistence.butler.Butler.subset
def subset(self, datasetType, level=None, dataId={}, **rest)
Definition: butler.py:1455
lsst::daf::persistence.butler.RepoData.repoData
def repoData(self)
Definition: butler.py:137
lsst::daf::persistence.butler.RepoDataContainer._buildLookupLists
def _buildLookupLists(self)
Definition: butler.py:295
lsst::daf::persistence.butler.Butler.__init__
def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
Definition: butler.py:505
lsst::daf::persistence.butler.RepoData.role
role
Definition: butler.py:128
lsst::daf::persistence.butler.RepoDataContainer._inputs
_inputs
Definition: butler.py:249
lsst::geom::all
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
Definition: CoordinateExpr.h:81
lsst::daf::persistence.butler.Butler.getMapperClass
def getMapperClass(root)
Definition: butler.py:1095
lsst::daf::persistence.butler.RepoData.addTags
def addTags(self, tags)
Definition: butler.py:235
lsst::daf::persistence.butler.RepoData.parentRepoDatas
parentRepoDatas
Definition: butler.py:125
lsst.pipe.tasks.mergeDetections.write
def write(self, patchRef, catalog)
Write the output.
Definition: mergeDetections.py:388
lsst::daf::persistence.butler.Butler._getCfgs
def _getCfgs(self, repoDataList)
Definition: butler.py:721
lsst::daf::persistence.butler.Butler._setRepoDataTags
def _setRepoDataTags(self)
Definition: butler.py:984
lsst::daf::persistence.butler.RepoData.setCfg
def setCfg(self, cfg, origin, root, isV1Repository)
Definition: butler.py:163
lsst::daf::persistence.butler.RepoDataContainer.__init__
def __init__(self, repoDataList)
Definition: butler.py:248
lsst::daf::persistence.butler.Butler
Definition: butler.py:321
lsst::daf::persistence.utils.doImport
def doImport(pythonType)
Definition: utils.py:104
lsst::daf::persistence.butler.RepoDataContainer.__repr__
def __repr__(self)
Definition: butler.py:288
lsst::daf::persistence.butler.RepoData.cfgOrigin
cfgOrigin
Definition: butler.py:192
lsst::daf::persistence.butler.Butler._setDefaultMapper
def _setDefaultMapper(self, repoDataList)
Definition: butler.py:898
lsst::daf::persistence.utils.sequencify
def sequencify(x)
Definition: utils.py:67
id
table::Key< int > id
Definition: Detector.cc:162
lsst::daf::persistence.butler.Butler._initArgs
_initArgs
Definition: butler.py:506
lsst::daf::persistence.butler.Butler._getBypassFunc
def _getBypassFunc(location, dataId)
Definition: butler.py:1367
lsst::log
Definition: Log.h:706
lsst::daf::persistence.butler.Butler.getKeys
def getKeys(self, datasetType=None, level=None, tag=None)
Definition: butler.py:1133
lsst::daf::persistence.butler.RepoData
Definition: butler.py:61
lsst::daf::persistence.utils.listify
def listify(x)
Definition: utils.py:29
lsst::daf::persistence.butler.RepoData.repoArgs
def repoArgs(self)
Definition: butler.py:133
lsst::daf::persistence.butler.Butler.queryMetadata
def queryMetadata(self, datasetType, format, dataId={}, **rest)
Definition: butler.py:1190
lsst::daf::persistence.butler.ButlerCfg.__init__
def __init__(self, cls, repoCfg)
Definition: butler.py:57
lsst::daf::persistence.butler.RepoData.cfgRoot
cfgRoot
Definition: butler.py:123
lsst::daf::persistence.utils.setify
def setify(x)
Definition: utils.py:80
lsst::daf::persistence.butler.RepoDataContainer._outputs
_outputs
Definition: butler.py:250
lsst::daf::persistence.butler.Butler._convertV1Args
def _convertV1Args(self, root, mapper, mapperArgs)
Definition: butler.py:997
lsst::daf::persistence.butler.Butler.__repr__
def __repr__(self)
Definition: butler.py:1033
lsst::daf::persistence.butler.Butler.get
def get(self, datasetType, dataId=None, immediate=True, **rest)
Definition: butler.py:1375
lsst::daf::persistence.butler.RepoData._role
_role
Definition: butler.py:202
lsst::daf::persistence.butler.RepoData._repoArgs
_repoArgs
Definition: butler.py:130
lsst::daf::persistence.butler.Butler.datasetExists
def datasetExists(self, datasetType, dataId={}, write=False, **rest)
Definition: butler.py:1237
lsst::daf::persistence.butler.RepoData.parentRegistry
parentRegistry
Definition: butler.py:129
lsst::daf::persistence.butler.Butler._processInputArguments
def _processInputArguments(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs)
Definition: butler.py:552
lsst::daf::persistence.butler.Butler._resolveDatasetTypeAlias
def _resolveDatasetTypeAlias(self, datasetType)
Definition: butler.py:1619
lsst::daf::persistence.butler.RepoData.addParentRepoData
def addParentRepoData(self, parentRepoData)
Definition: butler.py:232
lsst::daf::persistence.butler.RepoDataContainer
Definition: butler.py:239
lsst::daf::persistence.butler.RepoData.__repr__
def __repr__(self)
Definition: butler.py:140
lsst::daf::persistence.butler.RepoData.__init__
def __init__(self, args, role)
Definition: butler.py:120
lsst::daf::persistence.butler.Butler._read
def _read(self, location)
Definition: butler.py:1581
lsst::daf::persistence.butler.Butler._addParents
def _addParents(self, repoDataList)
Definition: butler.py:799
lsst::daf::persistence.butler.Butler.log
log
Definition: butler.py:509
lsst::daf::persistence.butler.Butler.defineAlias
def defineAlias(self, alias, datasetType)
Definition: butler.py:1103
lsst::daf::persistence.butler.Butler._getParentVal
def _getParentVal(repoData)
Definition: butler.py:656
lsst::daf::persistence.butler.RepoData.isV1Repository
isV1Repository
Definition: butler.py:126
lsst::daf::persistence.butler.RepoData.cfg
cfg
Definition: butler.py:121
lsst::daf::persistence.butler.Butler.__reduce__
def __reduce__(self)
Definition: butler.py:1615
lsst::daf::persistence.butler.Butler._getRepositoryCfg
def _getRepositoryCfg(self, repositoryArgs)
Definition: butler.py:695
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst::daf::persistence.butler.RepoDataContainer.outputs
def outputs(self)
Definition: butler.py:266
lsst::daf::persistence.butler.RepoDataContainer._all
_all
Definition: butler.py:251
lsst::daf::persistence.butler.Butler._setAndVerifyParentsLists
def _setAndVerifyParentsLists(self, repoDataList)
Definition: butler.py:848
lsst::daf::persistence.butler.Butler._repos
_repos
Definition: butler.py:529
lsst::daf::persistence.butler.RepoData.tags
tags
Definition: butler.py:127