LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 from future import standard_library
29 standard_library.install_aliases()
30 from builtins import str
31 from past.builtins import basestring
32 from builtins import object
33 
34 import collections
35 import copy
36 import inspect
37 import json
38 import os
39 import weakref
40 
41 import yaml
42 
43 from lsst.log import Log
44 import lsst.pex.policy as pexPolicy
45 from . import LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, Persistence, \
46  Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
47  RepositoryArgs, listify, setify, sequencify, doImport, ButlerComposite, genericAssembler, \
48  genericDisassembler, PosixStorage
49 
50 
51 class ButlerCfg(Policy, yaml.YAMLObject):
52  """Represents a Butler configuration.
53 
54  .. warning::
55 
56  cfg is 'wet paint' and very likely to change. Use of it in production
57  code other than via the 'old butler' API is strongly discouraged.
58  """
59  yaml_tag = u"!ButlerCfg"
60 
61  def __init__(self, cls, repoCfg):
62  super(ButlerCfg, self).__init__({'repoCfg': repoCfg, 'cls': cls})
63 
64 
65 class RepoData(object):
66  """Container object for repository data used by Butler
67 
68  Parameters
69  ----------
70  args - RepositoryArgs
71  Arguments used to initialize self.repo
72  cfg - RepositoryCfg
73  Configuration of repository
74  storedCfg - RepositoryCfg or None
75  If the cfg at root and the RepositoryArgs don't match then a new cfg is kept in cfg and the cfg that
76  was read from root is kept in storedCfg.
77  repo - Repository
78  The repository class instance
79  tags - set
80  The tags that apply to this repository, if any
81  """
82 
83  def __init__(self, args, cfg, storedCfg=None, isNewRepository=False, isV1Repository=True):
84  self.args = args
85  self.cfg = cfg
86  self.storedCfg = storedCfg
87  self.repo = None
88  self.mode = args.mode
89  # self.tags is used to keep track of *all* the applicable tags to the Repo, not just the tags in
90  # the cfg (e.g. parents inherit their childrens' tags)
91  self.tags = set()
92  self.isNewRepository = isNewRepository
93  self.isV1Repository = isV1Repository
94 
95  def __reduce__(self):
96  return (RepoData, (self.args, self.cfg, self.repo, self.mode, self.tags))
97 
98  def __repr__(self):
99  return "RepoData(args=%s cfg=%s repo=%s tags=%s" % (self.args, self.cfg, self.repo, self.tags)
100 
101  def addTags(self, tags):
102  self.tags = self.tags.union(tags)
103 
104 class RepoDataContainer(object):
105  """Container object for RepoData instances owned by a Butler instance."""
106 
107  def __init__(self):
108  self.byRepoRoot = {} # {args.root, RepoData}
109  self.byCfgRoot = {} # {args.cfgRoot, RepoData}
110  self._inputs = None
111  self._outputs = None
112  self._all = None # {cfg.root, RepoData}
113 
114  def add(self, repoData):
115  """Add a RepoData to the container
116 
117  Parameters
118  ----------
119  repoData - RepoData instance to add
120  """
121  self.byRepoRoot[repoData.cfg.root] = repoData
122  self.byCfgRoot[repoData.args.cfgRoot] = repoData
123 
124  def inputs(self):
125  """Get a list of RepoData that are used to as inputs to the Butler.
126  The list is created lazily as needed, and cached.
127 
128  Returns
129  -------
130  A list of RepoData with readable repositories, in the order to be used when searching.
131  """
132  if self._inputs is None:
133  raise RuntimeError("Inputs not yet initialized.")
134  return self._inputs
135 
136  def outputs(self):
137  """Get a list of RepoData that are used to as outputs to the Butler.
138  The list is created lazily as needed, and cached.
139 
140  Returns
141  -------
142  A list of RepoData with writable repositories, in the order to be use when searching.
143  """
144  if self._outputs is None:
145  raise RuntimeError("Outputs not yet initialized.")
146  return self._outputs
147 
148  def all(self):
149  """Get a list of all RepoData that are used to as by the Butler.
150  The list is created lazily as needed, and cached.
151 
152  Returns
153  -------
154  A list of RepoData with writable repositories, in the order to be use when searching.
155  """
156  if self._all is None:
157  raise RuntimeError("The all list is not yet initialized.")
158  return self._all
159 
160  def __repr__(self):
161  return "%s(\nbyRepoRoot=%r, \nbyCfgRoot=%r, \n_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
162  self.__class__.__name__,
163  self.byRepoRoot,
164  self.byCfgRoot,
165  self._inputs,
166  self._outputs,
167  self._all)
168 
169  def _buildLookupList(self, inputs, outputs):
170  """Buld the lists of inputs, outputs, and all repo datas in lookup
171  order.
172 
173  Parameters
174  ----------
175  inputs : list of RepositoryArgs
176  The input RepositoryArgs, in order.
177  outputs : list of RepositoryArgs
178  The output RepositoryArgs, in order.
179 
180  Returns
181  -------
182  None
183  """
184  def addRepoDataToLists(repoData, inout):
185  """"Adds the cfg represented by repoData to the _all dict/list, as
186  well as the _inputs or _outputs list, as indicated by inout. Then,
187  adds all the parents of the cfg to the lists."""
188  if repoData.cfg.root in self._all:
189  return
190  self._all[repoData.cfg.root] = repoData
191  if inout == 'in':
192  self._inputs.append(repoData)
193  elif inout == 'out':
194  self._outputs.append(repoData)
195  if 'r' in repoData.args.mode:
196  self._inputs.append(repoData)
197  else:
198  raise RuntimeError("'inout' must be 'in' or 'out', not %s" % inout)
199  for parent in repoData.cfg.parents:
200  if 'r' in repoData.args.mode:
201  addRepoDataToLists(self.byRepoRoot[parent], 'in')
202 
203  self._all = collections.OrderedDict()
204  self._inputs = []
205  self._outputs = []
206 
207  for repoArgs in outputs:
208  repoData = self.byCfgRoot[repoArgs.cfgRoot]
209  addRepoDataToLists(repoData, 'out')
210  for repoArgs in inputs:
211  repoData = self.byCfgRoot[repoArgs.cfgRoot]
212  addRepoDataToLists(repoData, 'in')
213 
214 
215 class Butler(object):
216  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
217 
218  A Butler manages a collection of datasets known as a repository. Each dataset has a type representing its
219  intended usage and a location. Note that the dataset type is not the same as the C++ or Python type of the
220  object containing the data. For example, an ExposureF object might be used to hold the data for a raw
221  image, a post-ISR image, a calibrated science image, or a difference image. These would all be different
222  dataset types.
223 
224  A Butler can produce a collection of possible values for a key (or tuples of values for multiple keys) if
225  given a partial data identifier. It can check for the existence of a file containing a dataset given its
226  type and data identifier. The Butler can then retrieve the dataset. Similarly, it can persist an object to
227  an appropriate location when given its associated data identifier.
228 
229  Note that the Butler has two more advanced features when retrieving a data set. First, the retrieval is
230  lazy. Input does not occur until the data set is actually accessed. This allows datasets to be retrieved
231  and placed on a clipboard prospectively with little cost, even if the algorithm of a stage ends up not
232  using them. Second, the Butler will call a standardization hook upon retrieval of the dataset. This
233  function, contained in the input mapper object, must perform any necessary manipulations to force the
234  retrieved object to conform to standards, including translating metadata.
235 
236  Public methods:
237 
238  __init__(self, root, mapper=None, **mapperArgs)
239 
240  defineAlias(self, alias, datasetType)
241 
242  getKeys(self, datasetType=None, level=None)
243 
244  queryMetadata(self, datasetType, format=None, dataId={}, **rest)
245 
246  datasetExists(self, datasetType, dataId={}, **rest)
247 
248  get(self, datasetType, dataId={}, immediate=False, **rest)
249 
250  put(self, obj, datasetType, dataId={}, **rest)
251 
252  subset(self, datasetType, level=None, dataId={}, **rest)
253 
254  dataRef(self, datasetType, level=None, dataId={}, **rest)
255 
256  Initialization:
257 
258  The preferred method of initialization is to pass in a RepositoryArgs instance, or a list of
259  RepositoryArgs to inputs and/or outputs.
260 
261  For backward compatibility: this initialization method signature can take a posix root path, and
262  optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
263  argument. However, for this to work in a backward compatible way it creates a single repository that is
264  used as both an input and an output repository. This is NOT preferred, and will likely break any
265  provenance system we have in place.
266 
267  Parameters
268  ----------
269  root - string
270  .. note:: Deprecated in 12_0
271  `root` will be removed in TBD, it is replaced by `inputs` and `outputs` for
272  multiple-repository support.
273  A fileysystem path. Will only work with a PosixRepository.
274  mapper - string or instance
275  .. note:: Deprecated in 12_0
276  `mapper` will be removed in TBD, it is replaced by `inputs` and `outputs` for
277  multiple-repository support.
278  Provides a mapper to be used with Butler.
279  mapperArgs - dict
280  .. note:: Deprecated in 12_0
281  `mapperArgs` will be removed in TBD, it is replaced by `inputs` and `outputs` for
282  multiple-repository support.
283  Provides arguments to be passed to the mapper if the mapper input arg is a class type to be
284  instantiated by Butler.
285  inputs - RepositoryArgs or string
286  Can be a single item or a list. Provides arguments to load an existing repository (or repositories).
287  String is assumed to be a URI and is used as the cfgRoot (URI to the location of the cfg file). (Local
288  file system URI does not have to start with 'file://' and in this way can be a relative path).
289  outputs - RepositoryArg or string
290  Can be a single item or a list. Provides arguments to load one or more existing repositories or create
291  new ones. String is assumed to be a URI and as used as the repository root.
292  """
293 
294  def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
295 
296  self.log = Log.getLogger("daf.persistence.butler")
297 
298  self._initArgs = {'root': root, 'mapper': mapper, 'inputs': inputs, 'outputs': outputs,
299  'mapperArgs': mapperArgs}
300 
301  # inputs and outputs may be modified, do not change the external value.
302  inputs = copy.deepcopy(inputs)
303  outputs = copy.deepcopy(outputs)
304 
305  isV1Args = inputs is None and outputs is None
306  if isV1Args:
307  inputs, outputs = self._convertV1Args(root=root, mapper=mapper, mapperArgs=mapperArgs)
308  elif root or mapper or mapperArgs:
309  raise RuntimeError(
310  'Butler version 1 API (root, mapper, **mapperArgs) may ' +
311  'not be used with version 2 API (inputs, outputs)')
312 
314 
315  # make sure inputs and outputs are lists, and if list items are a string convert it RepositoryArgs.
316  inputs = listify(inputs)
317  outputs = listify(outputs)
318  inputs = [RepositoryArgs(cfgRoot=args) if isinstance(args, basestring) else args for args in inputs]
319  outputs = [RepositoryArgs(cfgRoot=args) if isinstance(args, basestring) else args for args in outputs]
320  # Set default rw modes on input and output args as needed
321  for args in inputs:
322  if args.mode is None:
323  args.mode = 'r'
324  elif 'r' not in args.mode:
325  raise RuntimeError("The mode of an input should be readable.")
326  for args in outputs:
327  if args.mode is None:
328  args.mode = 'w'
329  elif 'w' not in args.mode:
330  raise RuntimeError("The mode of an output should be writable.")
331 
332  # Always use an empty Persistence policy until we can get rid of it
333  persistencePolicy = pexPolicy.Policy()
334  self.persistence = Persistence.getPersistence(persistencePolicy)
335 
336  self._createRepoDatas(inputs, outputs)
337 
338  self._repos._buildLookupList(inputs, outputs)
339 
340  self._setRepoDataTags()
341 
342  defaultMapper = self._getDefaultMapper()
343  self._assignDefaultMapper(defaultMapper)
344 
345  for repoData in self._repos.all().values():
346  repoData.repo = Repository(repoData)
347 
348  self.objectCache = weakref.WeakValueDictionary()
349 
350 
351  def _setRepoDataTags(self):
352  """Set the tags from each repoArgs into all its parent repoArgs so that they can be included in tagged
353  searches.
354 
355  Returns
356  -------
357  None
358  """
359  def setTags(butler, repoData, tags):
360  tags.update(repoData.args.tags)
361  repoData.addTags(tags)
362  for parent in repoData.cfg.parents:
363  setTags(butler, butler._repos.byRepoRoot[parent], copy.copy(tags))
364 
365  for repoData in self._repos.all().values():
366  setTags(self, repoData, set())
367 
368 
369  def _createRepoData(self, args, inout, instanceParents):
370  """Make a RepoData object for args, adding it to the RepoDataContainer.
371 
372  Parameters
373  ----------
374  args : RepoArgs
375  A RepositoryArgs that describes a new or existing Repository.
376  inout : 'in' or 'out'
377  Indicates if this Repository should be used by the Butler as an input or an output.
378  instanceParents : list of string
379  URI/path to the RepositoryCfg of parents in this instance of Butler; inputs and readable outputs
380  (but not their parents, grand-parents are looked up when the parents are loaded)
381 
382  Returns
383  -------
384  None
385  """
386  # if only a string is passed for inputs or outputs, assumption is that it's a URI;
387  # place it in a RepositoryArgs instance; cfgRoot for inputs, root for outputs.
388  if inout not in ('in', 'out'):
389  raise RuntimeError("inout must be either 'in' or 'out'")
390  # if we already have RepoData for these repoArgs, we're done with that repo and it's parents.
391  if args.cfgRoot in self._repos.byCfgRoot:
392  return
393  # Get the RepositoryCfg, if it exists:
394  cfg = Storage.getRepositoryCfg(args.cfgRoot)
395  # Handle the case where the Repository exists and contains a RepositoryCfg file:
396  if cfg:
397  if not cfg.matchesArgs(args):
398  if cfg.parents != instanceParents:
399  raise RuntimeError("Parents do not match.") # maybe this is ok to capture in an intermediate cfg?
400  storedCfg = cfg
401  cfg = RepositoryCfg.makeFromArgs(args)
402  else:
403  storedCfg = None
404  repoData = RepoData(args=args, cfg=cfg, storedCfg=storedCfg)
405  self._repos.add(repoData)
406  for parentArgs in cfg.parents:
407  self._createRepoData(RepositoryArgs(parentArgs, mode='r'), 'in', instanceParents)
408  # Handle the case where a RepositoryCfg file does not exist:
409  else:
410  # Posix repos might be Butler V1 Repos, requires special handling:
411  if Storage.isPosix(args.cfgRoot):
412  v1RepoExists = PosixStorage.v1RepoExists(args.cfgRoot)
413  if not v1RepoExists and inout == 'in':
414  msg = "Input repositories must exist; no repo found at " \
415  "%s. (A Butler V1 Repository 'exists' if the root " \
416  " folder exists AND contains items.)" % args.cfgRoot
417  raise RuntimeError(msg)
418  if inout == 'out' and not v1RepoExists:
419  p = instanceParents
420  if args.cfgRoot in p:
421  p.remove(args.cfgRoot)
422  else:
423  p = None
424  if v1RepoExists:
425  if not args.mapper:
426  args.mapper = PosixStorage.getMapperClass(args.cfgRoot)
427  cfg = RepositoryCfg.makeFromArgs(args, p)
428  repoData = RepoData(args=args, cfg=cfg, isNewRepository=not v1RepoExists,
429  isV1Repository=v1RepoExists)
430  self._repos.add(repoData)
431  if v1RepoExists:
432  parent = PosixStorage.getParentSymlinkPath(args.cfgRoot)
433  if parent:
434  parent = os.path.relpath(os.path.join(cfg.root, parent), '.')
435  cfg.addParents(parent)
436  # TODO the parent path can be (is always) relative;
437  # resolving path-to-parent-from-here should be handled
438  # by Storage and/or the cfg.
439  self._createRepoData(RepositoryArgs(parent, mode='r'), 'in', instanceParents)
440  # Do not need to check for Butler V1 Repos in non-posix Storages:
441  else:
442  if inout == 'in':
443  msg = "Input repositories must exist; no repo found at " \
444  "%s." % args.cfgRoot
445  raise RuntimeError(msg)
446  cfg = RepositoryCfg.makeFromArgs(args, parents)
447  repoData = RepoData(args=args, cfg=cfg, isNewRepository=True)
448  self._repos.add(repoData)
449 
450  @staticmethod
451  def _getParentsList(inputs, outputs):
452  parents = []
453  for args in outputs + inputs:
454  if 'r' in args.mode and args.cfgRoot not in parents:
455  parents.append(args.cfgRoot)
456  return parents
457 
458  def _createRepoDatas(self, inputs, outputs):
459  """Create the RepoDataContainer and put a RepoData object in it for each repository listed in inputs
460  and outputs as well as each parent of each repository.
461 
462  After this function runs, there will be a RepoData for any Repository that may be used by this Butler
463  instance.
464 
465  Parameters
466  ----------
467  inputs : list of RepoArgs
468  Repositories to be used by the Butler as as input repositories.
469  outputs : list of RepoArgs
470  Repositories to be used by the Butler as as output repositories.
471 
472  Returns
473  -------
474  None
475  """
476  try:
477  if self._repos:
478  raise RuntimeError("Must not call _createRepoDatas twice.")
479  except AttributeError:
480  pass
482  parents = self._getParentsList(inputs, outputs)
483 
484  for outputArgs in outputs:
485  self._createRepoData(outputArgs, 'out', parents)
486  for inputArgs in inputs:
487  self._createRepoData(inputArgs, 'in', parents)
488 
489  def _convertV1Args(self, root, mapper, mapperArgs):
490  """Convert Butler V1 args (root, mapper, mapperArgs) to V2 args (inputs, outputs)
491 
492  Parameters
493  ----------
494  root : string
495  Posix path to repository root
496  mapper : class, class instance, or string
497  Instantiated class, a class object to be instantiated, or a string that refers to a class that
498  can be imported & used as the mapper.
499  mapperArgs : dict
500  Args & their values used when instnatiating the mapper.
501 
502  Returns
503  -------
504  tuple
505  (inputs, outputs) - values to be used for inputs and outputs in Butler.__init__
506  """
507  # mapper ought to be an importable string or a class object (not a mapper class instance)
508  if not isinstance(mapper, basestring) and not inspect.isclass(mapper):
509  err = "mapper ought to be an importable string or a class object (not a mapper class instance)"
510  # TBD we might have to handle this. It'll be complicated because of e.g. outputRoot & calibRoot
511  self.log.warn(err)
512  inputs = None
513 
514  if root is None:
515  if hasattr(mapper, 'root'):
516  # in legacy repos, the mapper may be given the root directly.
517  root = mapper.root
518  else:
519  # in the past root="None" could be used to mean root='.'
520  root = '.'
521  outputs = RepositoryArgs(mode='rw',
522  root=root,
523  mapper=mapper,
524  mapperArgs=mapperArgs)
525  return inputs, outputs
526 
527  def __repr__(self):
528  return 'Butler(datasetTypeAliasDict=%s, repos=%s, persistence=%s)' % (
529  self.datasetTypeAliasDict, self._repos, self.persistence)
530 
531  def _getDefaultMapper(self):
532  """Get the default mapper. Currently this means if all the repos use
533  exactly the same mapper, that mapper may be considered the default.
534 
535  This definition may be changing; mappers may be able to exclude
536  themselves as candidates for default, and they may nominate a different
537  mapper instead. Also, we may not want to look at *all* the repos, but
538  only a depth-first search on each of the input & output repos, and
539  use the first-found mapper for each of those. TBD.
540 
541  Parameters
542  ----------
543  inputs : TYPE
544  Description
545 
546  Returns
547  -------
548  Mapper class or None
549  Returns the class type of the default mapper, or None if a default
550  mapper can not be determined.
551  """
552  defaultMapper = None
553 
554  for inputRepoData in self._repos.inputs():
555  mapper = None
556  if inputRepoData.cfg.mapper is not None:
557  mapper = inputRepoData.cfg.mapper
558  # if the mapper is:
559  # * a string, import it.
560  # * a class instance, get its class type
561  # * a class, do nothing; use it
562  if isinstance(mapper, basestring):
563  mapper = doImport(mapper)
564  elif not inspect.isclass(mapper):
565  mapper = mapper.__class__
566  # If no mapper has been found, note the first found mapper.
567  # Then, if a mapper has been found and each next mapper matches it,
568  # continue looking for mappers.
569  # If a mapper has been found and another non-matching mapper is
570  # found then we have no default, return None.
571  if defaultMapper is None:
572  defaultMapper = mapper
573  elif mapper == defaultMapper:
574  continue
575  elif mapper is not None:
576  return None
577  return defaultMapper
578 
579  def _assignDefaultMapper(self, defaultMapper):
580  for repoData in self._repos.all().values():
581  if repoData.cfg.mapper is None and (repoData.isNewRepository or repoData.isV1Repository):
582  if defaultMapper is None:
583  raise RuntimeError(
584  "No mapper specified for %s and no default mapper could be determined." %
585  repoData.args)
586  repoData.cfg.mapper = defaultMapper
587 
588  @staticmethod
589  def getMapperClass(root):
590  """posix-only; gets the mapper class at the path specifed by root (if a file _mapper can be found at
591  that location or in a parent location.
592 
593  As we abstract the storage and support different types of storage locations this method will be
594  moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
595  return Storage.getMapperClass(root)
596 
597  def defineAlias(self, alias, datasetType):
598  """Register an alias that will be substituted in datasetTypes.
599 
600  Paramters
601  ---------
602  alias - str
603  The alias keyword. It may start with @ or not. It may not contain @ except as the first character.
604  datasetType - str
605  The string that will be substituted when @alias is passed into datasetType. It may not contain '@'
606  """
607  # verify formatting of alias:
608  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
609  atLoc = alias.rfind('@')
610  if atLoc == -1:
611  alias = "@" + str(alias)
612  elif atLoc > 0:
613  raise RuntimeError("Badly formatted alias string: %s" % (alias,))
614 
615  # verify that datasetType does not contain '@'
616  if datasetType.count('@') != 0:
617  raise RuntimeError("Badly formatted type string: %s" % (datasetType))
618 
619  # verify that the alias keyword does not start with another alias keyword,
620  # and vice versa
621  for key in self.datasetTypeAliasDict:
622  if key.startswith(alias) or alias.startswith(key):
623  raise RuntimeError("Alias: %s overlaps with existing alias: %s" % (alias, key))
624 
625  self.datasetTypeAliasDict[alias] = datasetType
626 
627  def getKeys(self, datasetType=None, level=None, tag=None):
628  """Get the valid data id keys at or above the given level of hierarchy for the dataset type or the
629  entire collection if None. The dict values are the basic Python types corresponding to the keys (int,
630  float, str).
631 
632  Parameters
633  ----------
634  datasetType - str
635  The type of dataset to get keys for, entire collection if None.
636  level - str
637  The hierarchy level to descend to. None if it should not be restricted. Use an empty string if the
638  mapper should lookup the default level.
639  tags - any, or list of any
640  Any object that can be tested to be the same as the tag in a dataId passed into butler input
641  functions. Applies only to input repositories: If tag is specified by the dataId then the repo
642  will only be read from used if the tag in the dataId matches a tag used for that repository.
643 
644  Returns
645  -------
646  Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy for
647  the dataset type or the entire collection if None. The dict values are the basic Python types
648  corresponding to the keys (int, float, str).
649  """
650  datasetType = self._resolveDatasetTypeAlias(datasetType)
651 
652  keys = None
653  tag = setify(tag)
654  for repoData in self._repos.inputs():
655  if not tag or len(tag.intersection(repoData.tags)) > 0:
656  keys = repoData.repo.getKeys(datasetType, level)
657  # An empty dict is a valid "found" condition for keys. The only value for keys that should
658  # cause the search to continue is None
659  if keys is not None:
660  break
661  return keys
662 
663  def queryMetadata(self, datasetType, format=None, dataId={}, **rest):
664  """Returns the valid values for one or more keys when given a partial
665  input collection data id.
666 
667  Parameters
668  ----------
669  datasetType - str
670  The type of dataset to inquire about.
671  key - str
672  A key giving the level of granularity of the inquiry.
673  format - str, tuple
674  An optional key or tuple of keys to be returned.
675  dataId - DataId, dict
676  The partial data id.
677  **rest -
678  Keyword arguments for the partial data id.
679 
680  Returns
681  -------
682  A list of valid values or tuples of valid values as specified by the format (defaulting to the same as
683  the key) at the key's level of granularity.
684  """
685 
686  datasetType = self._resolveDatasetTypeAlias(datasetType)
687  dataId = DataId(dataId)
688  dataId.update(**rest)
689 
690  if format is None:
691  format = (key,)
692  else:
693  format = sequencify(format)
694 
695  tuples = None
696  for repoData in self._repos.inputs():
697  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
698  tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
699  if tuples:
700  break
701 
702  if not tuples:
703  return []
704 
705  if len(format) == 1:
706  ret = []
707  for x in tuples:
708  try:
709  ret.append(x[0])
710  except TypeError:
711  ret.append(x)
712  return ret
713 
714  return tuples
715 
716  def datasetExists(self, datasetType, dataId={}, **rest):
717  """Determines if a dataset file exists.
718 
719  Parameters
720  ----------
721  datasetType - str
722  The type of dataset to inquire about.
723  dataId - DataId, dict
724  The data id of the dataset.
725  **rest keyword arguments for the data id.
726 
727  Returns
728  -------
729  exists - bool
730  True if the dataset exists or is non-file-based.
731  """
732  datasetType = self._resolveDatasetTypeAlias(datasetType)
733  dataId = DataId(dataId)
734  dataId.update(**rest)
735 
736  location = None
737  for repoData in self._repos.inputs():
738  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
739  location = repoData.repo.map(datasetType, dataId)
740  if location:
741  break
742 
743  if location is None:
744  return False
745 
746  additionalData = location.getAdditionalData()
747  storageName = location.getStorageName()
748  if storageName in ('BoostStorage', 'FitsStorage', 'PafStorage',
749  'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage'):
750  locations = location.getLocations()
751  for locationString in locations:
752  logLoc = LogicalLocation(locationString, additionalData).locString()
753  if storageName == 'FitsStorage':
754  # Strip off directives for cfitsio (in square brackets, e.g., extension name)
755  bracket = logLoc.find('[')
756  if bracket > 0:
757  logLoc = logLoc[:bracket]
758  if not os.path.exists(logLoc):
759  return False
760  return True
761  self.log.warn("datasetExists() for non-file storage %s, dataset type=%s, keys=%s",
762  storageName, datasetType, str(dataId))
763  return True
764 
765  def _locate(self, datasetType, dataId, write):
766  """Get one or more ButlerLocations and/or ButlercComposites.
767 
768  Parameters
769  ----------
770  datasetType : string
771  The datasetType that is being searched for. The datasetType may be followed by a dot and
772  a component name (component names are specified in the policy). IE datasetType.componentName
773 
774  dataId : dict or DataId class instance
775  The dataId
776 
777  write : bool
778  True if this is a search to write an object. False if it is a search to read an object. This
779  affects what type (an object or a container) is returned.
780 
781  Returns
782  -------
783  If write is False, will return either a single object or None. If write is True, will return a list
784  (which may be empty)
785  """
786  repos = self._repos.outputs() if write else self._repos.inputs()
787  locations = []
788  for repoData in repos:
789  # enforce dataId & repository tags when reading:
790  if not write and dataId.tag and len(dataId.tag.intersection(repoData.tags)) == 0:
791  continue
792  components = datasetType.split('.')
793  datasetType = components[0]
794  components = components[1:]
795  location = repoData.repo.map(datasetType, dataId, write=write)
796  if location is None:
797  continue
798  location.datasetType = datasetType # todo is there a better way than monkey patching here?
799  if len(components) > 0:
800  if not isinstance(location, ButlerComposite):
801  raise RuntimeError("The location for a dotted datasetType must be a composite.")
802  # replace the first component name with the datasetType
803  components[0] = location.componentInfo[components[0]].datasetType
804  # join components back into a dot-delimited string
805  datasetType = '.'.join(components)
806  location = self._locate(datasetType, dataId, write)
807  # if a cmponent location is not found, we can not continue with this repo, move to next repo.
808  if location is None:
809  break
810  # if reading, only one location is desired.
811  if location:
812  if not write:
813  return location
814  else:
815  try:
816  locations.extend(location)
817  except TypeError:
818  locations.append(location)
819  if not write:
820  return None
821  return locations
822 
823  def get(self, datasetType, dataId=None, immediate=True, **rest):
824  """Retrieves a dataset given an input collection data id.
825 
826  Parameters
827  ----------
828  datasetType - str
829  The type of dataset to retrieve.
830  dataId - dict
831  The data id.
832  immediate - bool
833  If False use a proxy for delayed loading.
834  **rest
835  keyword arguments for the data id.
836 
837  Returns
838  -------
839  An object retrieved from the dataset (or a proxy for one).
840  """
841  datasetType = self._resolveDatasetTypeAlias(datasetType)
842  dataId = DataId(dataId)
843  dataId.update(**rest)
844 
845  location = self._locate(datasetType, dataId, write=False)
846  if location is None:
847  raise NoResults("No locations for get:", datasetType, dataId)
848  self.log.debug("Get type=%s keys=%s from %s", datasetType, dataId, str(location))
849 
850  if isinstance(location, ButlerComposite):
851  for name, componentInfo in location.componentInfo.items():
852  if componentInfo.subset:
853  subset = self.subset(datasetType=componentInfo.datasetType, dataId=location.dataId)
854  componentInfo.obj = [obj.get() for obj in subset]
855  else:
856  obj = self.get(componentInfo.datasetType, location.dataId, immediate=True)
857  componentInfo.obj = obj
858  assembler = location.assembler or genericAssembler
859  obj = assembler(dataId=location.dataId, componentInfo=location.componentInfo, cls=location.python)
860  return obj
861 
862  if location.datasetType and hasattr(location.mapper, "bypass_" + location.datasetType):
863  # this type loader block should get moved into a helper someplace, and duplciations removed.
864  pythonType = location.getPythonType()
865  if pythonType is not None:
866  if isinstance(pythonType, basestring):
867  pythonType = doImport(pythonType)
868  bypassFunc = getattr(location.mapper, "bypass_" + location.datasetType)
869  callback = lambda: bypassFunc(location.datasetType, pythonType, location, dataId)
870  else:
871  callback = lambda: self._read(location)
872  if location.mapper.canStandardize(location.datasetType):
873  innerCallback = callback
874  callback = lambda: location.mapper.standardize(location.datasetType, innerCallback(), dataId)
875  if immediate:
876  return callback()
877  return ReadProxy(callback)
878 
879  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
880  """Persists a dataset given an output collection data id.
881 
882  Parameters
883  ----------
884  obj -
885  The object to persist.
886  datasetType - str
887  The type of dataset to persist.
888  dataId - dict
889  The data id.
890  doBackup - bool
891  If True, rename existing instead of overwriting.
892  WARNING: Setting doBackup=True is not safe for parallel processing, as it may be subject to race
893  conditions.
894  **rest
895  Keyword arguments for the data id.
896  """
897  datasetType = self._resolveDatasetTypeAlias(datasetType)
898  dataId = DataId(dataId)
899  dataId.update(**rest)
900 
901  for location in self._locate(datasetType, dataId, write=True):
902  if isinstance(location, ButlerComposite):
903  disassembler = location.disassembler if location.disassembler else genericDisassembler
904  disassembler(obj=obj, dataId=location.dataId, componentInfo=location.componentInfo)
905  for name, info in location.componentInfo.items():
906  if not info.inputOnly:
907  self.put(info.obj, info.datasetType, location.dataId, doBackup=doBackup)
908  else:
909  if doBackup:
910  location.getRepository().backup(location.datasetType, dataId)
911  location.getRepository().write(location, obj)
912 
913  def subset(self, datasetType, level=None, dataId={}, **rest):
914  """Return complete dataIds for a dataset type that match a partial (or empty) dataId.
915 
916  Given a partial (or empty) dataId specified in dataId and **rest, find all datasets that match the
917  dataId. Optionally restrict the results to a given level specified by a dataId key (e.g. visit or
918  sensor or amp for a camera). Return an iterable collection of complete dataIds as ButlerDataRefs.
919  Datasets with the resulting dataIds may not exist; that needs to be tested with datasetExists().
920 
921  Parameters
922  ----------
923  datasetType - str
924  The type of dataset collection to subset
925  level - str
926  The level of dataId at which to subset. Use an empty string if the mapper should look up the
927  default level.
928  dataId - dict
929  The data id.
930  **rest
931  Keyword arguments for the data id.
932 
933  Returns
934  -------
935  subset - ButlerSubset
936  Collection of ButlerDataRefs for datasets matching the data id.
937 
938  Examples
939  -----------
940  To print the full dataIds for all r-band measurements in a source catalog
941  (note that the subset call is equivalent to: `butler.subset('src', dataId={'filter':'r'})`):
942 
943  >>> subset = butler.subset('src', filter='r')
944  >>> for data_ref in subset: print(data_ref.dataId)
945  """
946  datasetType = self._resolveDatasetTypeAlias(datasetType)
947 
948  # Currently expected behavior of subset is that if specified level is None then the mapper's default
949  # level should be used. Convention for level within Butler is that an empty string is used to indicate
950  # 'get default'.
951  if level is None:
952  level = ''
953 
954  dataId = DataId(dataId)
955  dataId.update(**rest)
956  return ButlerSubset(self, datasetType, level, dataId)
957 
958  def dataRef(self, datasetType, level=None, dataId={}, **rest):
959  """Returns a single ButlerDataRef.
960 
961  Given a complete dataId specified in dataId and **rest, find the unique dataset at the given level
962  specified by a dataId key (e.g. visit or sensor or amp for a camera) and return a ButlerDataRef.
963 
964  Parameters
965  ----------
966  datasetType - str
967  The type of dataset collection to reference
968  level - str
969  The level of dataId at which to reference
970  dataId - dict
971  The data id.
972  **rest
973  Keyword arguments for the data id.
974 
975  Returns
976  -------
977  dataRef - ButlerDataRef
978  ButlerDataRef for dataset matching the data id
979  """
980 
981  datasetType = self._resolveDatasetTypeAlias(datasetType)
982  dataId = DataId(dataId)
983  subset = self.subset(datasetType, level, dataId, **rest)
984  if len(subset) != 1:
985  raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
986  (str(datasetType), str(level), str(dataId), str(rest)))
987  return ButlerDataRef(subset, subset.cache[0])
988 
989  def _read(self, location):
990  """Unpersist an object using data inside a butlerLocation object.
991 
992  A weakref to loaded objects is cached here. If the object specified by the butlerLocaiton has been
993  loaded before and still exists then the object will not be re-read. A ref to the already-existing
994  object will be returned instead.
995 
996  Parameters
997  ----------
998  location - ButlerLocation
999  A butlerLocation instance populated with data needed to read the object.
1000 
1001  Returns
1002  -------
1003  object - an instance of the object specified by the butlerLoction.
1004  The object specified by the butlerLocation will either be loaded from persistent storage or will
1005  be fetched from the object cache (if it has already been read before).
1006  """
1007  def hasher(butlerLocation):
1008  """Hash a butler location for use as a key in the object cache.
1009 
1010  This requires that the dataId that was used to find the location is set in the usedDataId
1011  parameter. If this is not set, the dataId that was used to do the mapping is not known and we
1012  can't create a complete hash for comparison of like-objects.
1013  """
1014  if butlerLocation.usedDataId is None:
1015  return None
1016  return hash((butlerLocation.storageName, id(butlerLocation.mapper), id(butlerLocation.storage),
1017  tuple(butlerLocation.locationList), repr(sorted(butlerLocation.usedDataId.items())),
1018  butlerLocation.datasetType))
1019 
1020  locationHash = hasher(location)
1021  results = self.objectCache.get(locationHash, None) if locationHash is not None else None
1022  if not results:
1023  self.log.debug("Starting read from %s", location)
1024  results = location.repository.read(location)
1025  if len(results) == 1:
1026  results = results[0]
1027  self.log.debug("Ending read from %s", location)
1028  try:
1029  self.objectCache[locationHash] = results
1030  except TypeError:
1031  # some object types (e.g. builtins, like list) do not support weakref, and will raise a
1032  # TypeError when we try to create the weakref. This is ok, we simply will not keep those
1033  # types of objects in the cache.
1034  pass
1035  return results
1036 
1037  def __reduce__(self):
1038  ret = (_unreduce, (self._initArgs, self.datasetTypeAliasDict))
1039  return ret
1040 
1041  def _resolveDatasetTypeAlias(self, datasetType):
1042  """Replaces all the known alias keywords in the given string with the alias value.
1043 
1044  Parameters
1045  ----------
1046  datasetType - str
1047  A datasetType string to search & replace on
1048 
1049  Returns
1050  -------
1051  datasetType - str
1052  The de-aliased string
1053  """
1054  for key in self.datasetTypeAliasDict:
1055  # if all aliases have been replaced, bail out
1056  if datasetType.find('@') == -1:
1057  break
1058  datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key])
1059 
1060  # If an alias specifier can not be resolved then throw.
1061  if datasetType.find('@') != -1:
1062  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType))
1063 
1064  return datasetType
1065 
1066 
1067 def _unreduce(initArgs, datasetTypeAliasDict):
1068  mapperArgs = initArgs.pop('mapperArgs')
1069  initArgs.update(mapperArgs)
1070  butler = Butler(**initArgs)
1071  butler.datasetTypeAliasDict = datasetTypeAliasDict
1072  return butler
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169