LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
butler.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008-2015 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 # -*- python -*-
26 
27 """This module defines the Butler class."""
28 from future import standard_library
29 standard_library.install_aliases()
30 from builtins import str
31 from past.builtins import basestring
32 from builtins import object
33 
34 import collections
35 import copy
36 import inspect
37 import os
38 
39 import yaml
40 
41 from lsst.log import Log
42 import lsst.pex.policy as pexPolicy
43 from . import LogicalLocation, ReadProxy, ButlerSubset, ButlerDataRef, Persistence, \
44  Storage, Policy, NoResults, Repository, DataId, RepositoryCfg, \
45  RepositoryArgs, listify, setify, sequencify, doImport
46 
47 
48 class ButlerCfg(Policy, yaml.YAMLObject):
49  """Represents a Butler configuration.
50 
51  .. warning::
52 
53  cfg is 'wet paint' and very likely to change. Use of it in production
54  code other than via the 'old butler' API is strongly discouraged.
55  """
56  yaml_tag = u"!ButlerCfg"
57 
58  def __init__(self, cls, repoCfg):
59  super(ButlerCfg, self).__init__({'repoCfg': repoCfg, 'cls': cls})
60 
61 
62 class RepoData(object):
63  """Container object for repository data used by Butler"""
64 
65  def __init__(self, args, cfg, repo, tags):
66  """Initializer for RepoData
67 
68  @param args (RepositoryArgs) Arguments used to initialize self.repo
69  @param cfg (RepositoryCfg) Configuration of repository (this is persisted)
70  @param repo (Repository) The repository class instance
71  @param tags (set) The tags that apply to this repository, if any
72  """
73  self.args = args
74  self.cfg = cfg
75  self.repo = repo
76  self.mode = args.mode
77  # self.tags is used to keep track of *all* the applicable tags to the Repo, not just the tags in
78  # the cfg (e.g. parents inherit their childrens' tags)
79  if not isinstance(tags, set):
80  raise RuntimeError("tags passed into RepoData must be in a set.")
81  self.tags = tags
82 
83  def __reduce__(self):
84  return (RepoData, (self.args, self.cfg, self.repo, self.mode, self.tags))
85 
86  def __repr__(self):
87  return "RepoData(args=%s cfg=%s repo=%s tags=%s" % (self.args, self.cfg, self.repo, self.tags)
88 
89 
90 class RepoDataContainer(object):
91  """Container object for RepoData instances owned by a Butler instance."""
92 
93  def __init__(self):
94  self.byRepoRoot = collections.OrderedDict() # {repo root, RepoData}
95  self.byCfgRoot = {} # {repo cfgRoot, RepoData}
96  self._inputs = None
97  self._outputs = None
98  self._all = None
99 
100  def add(self, repoData):
101  """Add a RepoData to the container
102 
103  @param (RepoData) RepoData instance to add
104  """
105  self._inputs = None
106  self._outputs = None
107  self._all = None
108  self.byRepoRoot[repoData.cfg.root] = repoData
109  self.byCfgRoot[repoData.args.cfgRoot] = repoData
110 
111  def inputs(self):
112  """Get a list of RepoData that are used to as inputs to the Butler.
113 
114  The list is created lazily as needed, and cached.
115  @return a list of RepoData with readable repositories. List is in the order to be use when searching.
116  """
117  if self._inputs is None:
118  self._inputs = [rd for rd in self.byRepoRoot.values() if 'r' in rd.mode]
119  return self._inputs
120 
121  def outputs(self):
122  """Get a list of RepoData that are used to as outputs to the Butler.
123 
124  The list is created lazily as needed, and cached.
125  @return a list of RepoData with writable repositories. List is in the order to be use when searching.
126  """
127  if self._outputs is None:
128  self._outputs = [rd for rd in self.byRepoRoot.values() if 'w' in rd.mode]
129  return self._outputs
130 
131  def all(self):
132  """Get a list of all RepoData that are used to as by the Butler.
133 
134  The list is created lazily as needed, and cached.
135  @return a list of RepoData with writable repositories. List is in the order to be use when searching.
136  """
137  if self._all is None:
138  self._all = [rd for rd in self.byRepoRoot.values()]
139  return self._all
140 
141  def __repr__(self):
142  return "%s(\nbyRepoRoot=%r, \nbyCfgRoot=%r, \n_inputs=%r, \n_outputs=%s, \n_all=%s)" % (
143  self.__class__.__name__,
144  self.byRepoRoot,
145  self.byCfgRoot,
146  self._inputs,
147  self._outputs,
148  self._all)
149 
150 
151 class Butler(object):
152  """Butler provides a generic mechanism for persisting and retrieving data using mappers.
153 
154  A Butler manages a collection of datasets known as a repository. Each
155  dataset has a type representing its intended usage and a location. Note
156  that the dataset type is not the same as the C++ or Python type of the
157  object containing the data. For example, an ExposureF object might be
158  used to hold the data for a raw image, a post-ISR image, a calibrated
159  science image, or a difference image. These would all be different
160  dataset types.
161 
162  A Butler can produce a collection of possible values for a key (or tuples
163  of values for multiple keys) if given a partial data identifier. It can
164  check for the existence of a file containing a dataset given its type and
165  data identifier. The Butler can then retrieve the dataset. Similarly, it
166  can persist an object to an appropriate location when given its associated
167  data identifier.
168 
169  Note that the Butler has two more advanced features when retrieving a data
170  set. First, the retrieval is lazy. Input does not occur until the data
171  set is actually accessed. This allows datasets to be retrieved and
172  placed on a clipboard prospectively with little cost, even if the
173  algorithm of a stage ends up not using them. Second, the Butler will call
174  a standardization hook upon retrieval of the dataset. This function,
175  contained in the input mapper object, must perform any necessary
176  manipulations to force the retrieved object to conform to standards,
177  including translating metadata.
178 
179  Public methods:
180 
181  __init__(self, root, mapper=None, **mapperArgs)
182 
183  defineAlias(self, alias, datasetType)
184 
185  getKeys(self, datasetType=None, level=None)
186 
187  queryMetadata(self, datasetType, keys, format=None, dataId={}, **rest)
188 
189  datasetExists(self, datasetType, dataId={}, **rest)
190 
191  get(self, datasetType, dataId={}, immediate=False, **rest)
192 
193  put(self, obj, datasetType, dataId={}, **rest)
194 
195  subset(self, datasetType, level=None, dataId={}, **rest)
196 
197  dataRef(self, datasetType, level=None, dataId={}, **rest)
198  """
199 
200  def __init__(self, root=None, mapper=None, inputs=None, outputs=None, **mapperArgs):
201  """Initializer for the Class.
202 
203  The prefered initialization argument is to pass a single arg; cfg created by Butler.cfg();
204  butler = Butler(Butler.cfg(repoCfg))
205  For backward compatibility: this initialization method signature can take a posix root path, and
206  optionally a mapper class instance or class type that will be instantiated using the mapperArgs input
207  argument.
208  However, for this to work in a backward compatible way it creates a single repository that is used as
209  both an input and an output repository. This is NOT preferred, and will likely break any provenance
210  system we have in place.
211 
212  @param root (str) Best practice is to pass in a cfg created by Butler.cfg(). But for backward
213  compatibility this can also be a fileysystem path. Will only work with a
214  PosixRepository.
215  @param mapper Deprecated. Provides a mapper to be used with Butler.
216  @param mapperArgs Deprecated. Provides arguments to be passed to the mapper if the mapper input arg
217  is a class type to be instantiated by Butler.
218  @param inputs (RepositoryArg or string) Can be a single item or a list. Provides arguments to load an
219  existing repository (or repositories). String is assumed to be
220  a URI and is used as the cfgRoot (URI to the location of the
221  cfg file). (Local file system URI does not have to start with
222  'file://' and in this way can be a relative path).
223  @param outputs (RepositoryArg) Can be a single item or a list. Provides arguments to load one or more
224  existing repositories or create new ones. String is assumed to be a
225  URI and as used as the repository root.
226 
227  :return:
228  """
229  self._initArgs = {'root': root, 'mapper': mapper, 'inputs': inputs, 'outputs': outputs,
230  'mapperArgs': mapperArgs}
231 
232  isLegacyRepository = inputs is None and outputs is None
233 
234  if root is not None and not isLegacyRepository:
235  raise RuntimeError(
236  'root is a deprecated parameter and may not be used with the parameters input and output')
237 
238  if isLegacyRepository:
239  if root is None:
240  if hasattr(mapper, 'root'):
241  # in legacy repos, the mapper may be given the root directly.
242  root = mapper.root
243  else:
244  # in the past root="None" could be used to mean root='.'
245  root = '.'
246  outputs = RepositoryArgs(mode='rw',
247  root=root,
248  mapper=mapper,
249  mapperArgs=mapperArgs)
250  outputs.isLegacyRepository = True
251 
253 
254  # Always use an empty Persistence policy until we can get rid of it
255  persistencePolicy = pexPolicy.Policy()
256  self.persistence = Persistence.getPersistence(persistencePolicy)
257  self.log = Log.getLogger("daf.persistence.butler")
258 
259  inputs = listify(inputs)
260  outputs = listify(outputs)
261 
262  # if only a string is passed for inputs or outputs, assumption is that it's a URI;
263  # place it in a RepositoryArgs instance; cfgRoot for inputs, root for outputs.
264  inputs = [RepositoryArgs(cfgRoot=i) if isinstance(i, basestring) else i for i in inputs]
265  outputs = [RepositoryArgs(root=o) if isinstance(o, basestring) else o for o in outputs]
266 
267  # set default rw modes on input and output args as needed
268  for i in inputs:
269  if i.mode is None:
270  i.mode = 'r'
271  for o in outputs:
272  if o.mode is None:
273  o.mode = 'w'
274 
276 
277  defaultMapper = self._getDefaultMapper(inputs)
278 
279  butlerIOParents = collections.OrderedDict()
280  for args in outputs + inputs:
281  if 'r' in args.mode:
282  butlerIOParents[args.cfgRoot] = args
283 
284  for args in outputs:
285  self._addRepo(args, inout='out', defaultMapper=defaultMapper, butlerIOParents=butlerIOParents)
286 
287  for args in inputs:
288  self._addRepo(args, inout='in', butlerIOParents=butlerIOParents)
289 
290  def _addRepo(self, args, inout, defaultMapper=None, butlerIOParents=None, tags=None):
291  """Create a Repository and related infrastructure and add it to the list of repositories.
292 
293  @param args (RepositoryArgs) settings used to create the repository.
294  @param inout (string) must be 'in' our 'out', indicates how the repository is to be used. Input repos
295  are only read from, and output repositories may be read from and/or written to
296  (w/rw of output repos depends on args.mode)
297  @param defaultMapper (mapper class or None ) if a default mapper could be inferred from inputs then
298  this will be a class object that can be used for any
299  outputs that do not explicitly define their mapper. If
300  None then a mapper class could not be inferred and a
301  mapper must be defined by each output.
302  @param butlerIOParents (ordered dict) The keys are cfgRoot of repoArgs, val is the repoArgs.
303  This is all the explicit input and output repositories to the
304  butler __init__ function, it is used when determining what the
305  parents of writable repositories are.
306  @param tags (any or list of any) Any object that can be tested to be the same as the tag in a dataId
307  passed into butler input functions. Applies only to input
308  repositories: If tag is specified by the dataId then the repo will
309  only be read from used if the tag in the dataId matches a tag used
310  for that repository.
311  """
312  if butlerIOParents is None:
313  butlerIOParents = {}
314 
315  tags = copy.copy(setify(tags))
316  tags.update(args.tags)
317 
318  # If this repository has already been loaded/created, compare the input args to the exsisting repo;
319  # if they don't match raise an exception - the caller has to sort this out.
320  if args.cfgRoot in self._repos.byCfgRoot:
321  repoData = self._repos.byCfgRoot[args.cfgRoot]
322  if not repoData.cfg.matchesArgs(args):
323  raise RuntimeError("Mismatched repository configurations passed in or loaded:" +
324  "\n\tnew args:%s" +
325  "\n\texisting repoData:%s" %
326  (args, repoData))
327  repoData.tags.update(setify(tags))
328  else:
329  # If we are here, the repository is not yet loaded by butler. Do the loading:
330 
331  # Verify mode is legal (must be writeable for outputs, readable for inputs).
332  # And set according to the default value if needed.
333  if inout == 'out':
334  if 'w' not in args.mode:
335  raise RuntimeError('Output repositories must be writable.')
336  elif inout == 'in':
337  if 'r' not in args.mode:
338  raise RuntimeError('Input repositories must be readable.')
339  else:
340  raise RuntimeError('Unrecognized value for inout:' % inout)
341 
342  # Try to get the cfg.
343  # If it exists, verify no mismatch with args.
344  # If it does not exist, make the cfg and save the cfg at cfgRoot.
345  cfg = Storage.getRepositoryCfg(args.cfgRoot)
346 
347  parentsToAdd = []
348  if cfg is not None:
349  if inout == 'out':
350  # Parents used by this butler instance must match the parents of any existing output
351  # repositories used by this butler. (If there is a configuration change a new output
352  # repository should be created).
353  # IE cfg.parents can have parents that are not passed in as butler inputs or readable
354  # outputs, but the butler may not have readable i/o that is not a parent of an already
355  # existing output.
356  for cfgRoot in butlerIOParents:
357  if cfgRoot not in cfg.parents and cfgRoot != args.cfgRoot:
358  raise RuntimeError(
359  "Existing output repository parents do not match butler's inputs.")
360  if not cfg.matchesArgs(args):
361  raise RuntimeError(
362  "Persisted RepositoryCfg and passed-in RepositoryArgs have"
363  " conflicting parameters:\n" + "\t%s\n\t%s", (cfg, args))
364  if args.mapperArgs is not None:
365  if cfg.mapperArgs is None:
366  cfg.mapperArgs = args.mapperArgs
367  else:
368  cfg.mapperArgs.update(args.mapperArgs)
369  if 'r' in args.mode:
370  parentsToAdd = copy.copy(cfg.parents)
371  else:
372  if args.mapper is None:
373  if defaultMapper is None:
374  raise RuntimeError(
375  "Could not infer mapper and one not specified in repositoryArgs:%s" % args)
376  args.mapper = defaultMapper
377  parents = [cfgRoot for cfgRoot in list(butlerIOParents.keys()) if cfgRoot != args.cfgRoot]
378  cfg = RepositoryCfg.makeFromArgs(args, parents)
379  Storage.putRepositoryCfg(cfg, args.cfgRoot)
380 
381  repo = Repository(cfg)
382  self._repos.add(RepoData(args, cfg, repo, tags))
383  for parent in parentsToAdd:
384  if parent in butlerIOParents:
385  args = butlerIOParents[parent]
386  else:
387  args = RepositoryArgs(cfgRoot=parent, mode='r')
388  self._addRepo(args=args, inout='in', tags=tags)
389 
390  def __repr__(self):
391  return 'Butler(datasetTypeAliasDict=%s, repos=%s, persistence=%s)' % (
392  self.datasetTypeAliasDict, self._repos, self.persistence)
393 
394  @staticmethod
395  def _getDefaultMapper(inputs):
396  mappers = set()
397  for args in inputs:
398  if args.mapper is not None:
399  mapper = args.mapper
400  # if the mapper is:
401  # * a string, import it.
402  # * a class instance, get its class type
403  # * a class, do nothing; use it
404  if isinstance(mapper, basestring):
405  mapper = doImport(args.mapper)
406  elif not inspect.isclass(mapper):
407  mapper = mapper.__class__
408  else:
409  cfgRoot = args.cfgRoot
410  mapper = Butler.getMapperClass(cfgRoot)
411  mappers.add(mapper)
412 
413  if len(mappers) == 1:
414  return mappers.pop()
415  else:
416  return None
417 
418  @staticmethod
419  def getMapperClass(root):
420  """posix-only; gets the mapper class at the path specifed by root (if a file _mapper can be found at
421  that location or in a parent location.
422 
423  As we abstract the storage and support different types of storage locaitons this method will be
424  moved entirely into Butler Access, or made more dynamic, and the API will very likely change."""
425  return Storage.getMapperClass(root)
426 
427  def defineAlias(self, alias, datasetType):
428  """Register an alias that will be substituted in datasetTypes.
429 
430  @param alias (str) the alias keyword. it may start with @ or not. It may not contain @ except as the
431  first character.
432  @param datasetType (str) the string that will be substituted when @alias is passed into datasetType.
433  It may not contain '@'
434  """
435 
436  # verify formatting of alias:
437  # it can have '@' as the first character (if not it's okay, we will add it) or not at all.
438  atLoc = alias.rfind('@')
439  if atLoc == -1:
440  alias = "@" + str(alias)
441  elif atLoc > 0:
442  raise RuntimeError("Badly formatted alias string: %s" % (alias,))
443 
444  # verify that datasetType does not contain '@'
445  if datasetType.count('@') != 0:
446  raise RuntimeError("Badly formatted type string: %s" % (datasetType))
447 
448  # verify that the alias keyword does not start with another alias keyword,
449  # and vice versa
450  for key in self.datasetTypeAliasDict:
451  if key.startswith(alias) or alias.startswith(key):
452  raise RuntimeError("Alias: %s overlaps with existing alias: %s" % (alias, key))
453 
454  self.datasetTypeAliasDict[alias] = datasetType
455 
456  def getKeys(self, datasetType=None, level=None, tag=None):
457  """Returns a dict. The dict keys are the valid data id keys at or above the given level of hierarchy
458  for the dataset type or the entire collection if None. The dict values are the basic Python types
459  corresponding to the keys (int, float, str).
460 
461  @param datasetType (str) the type of dataset to get keys for, entire collection if None.
462  @param level (str) the hierarchy level to descend to. None if it should not be restricted. Use an
463  empty string if the mapper should lookup the default level.
464  @param tags (any or list of any) Any object that can be tested to be the same as the tag in a dataId
465  passed into butler input functions. Applies only to input
466  repositories: If tag is specified by the dataId then the repo will
467  only be read from used if the tag in the dataId matches a tag used
468  for that repository.
469  @returns (dict) valid data id keys; values are corresponding types.
470  """
471  datasetType = self._resolveDatasetTypeAlias(datasetType)
472 
473  keys = None
474  tag = setify(tag)
475  for repoData in self._repos.inputs():
476  if not tag or len(tag.intersection(repoData.tags)) > 0:
477  keys = repoData.repo.getKeys(datasetType, level)
478  # An empty dict is a valid "found" condition for keys. The only value for keys that should
479  # cause the search to continue is None
480  if keys is not None:
481  break
482  return keys
483 
484  def queryMetadata(self, datasetType, format=None, dataId={}, **rest):
485  """Returns the valid values for one or more keys when given a partial
486  input collection data id.
487 
488  @param datasetType (str) the type of dataset to inquire about.
489  @param key (str) a key giving the level of granularity of the inquiry.
490  @param format (str, tuple) an optional key or tuple of keys to be returned.
491  @param dataId (DataId, dict) the partial data id.
492  @param **rest keyword arguments for the partial data id.
493  @returns (list) a list of valid values or tuples of valid values as
494  specified by the format (defaulting to the same as the key) at the
495  key's level of granularity.
496  """
497 
498  datasetType = self._resolveDatasetTypeAlias(datasetType)
499  dataId = DataId(dataId)
500  dataId.update(**rest)
501 
502  if format is None:
503  format = (key,)
504  else:
505  format = sequencify(format)
506 
507  tuples = None
508  for repoData in self._repos.inputs():
509  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
510  tuples = repoData.repo.queryMetadata(datasetType, format, dataId)
511  if tuples:
512  break
513 
514  if not tuples:
515  return []
516 
517  if len(format) == 1:
518  ret = []
519  for x in tuples:
520  try:
521  ret.append(x[0])
522  except TypeError:
523  ret.append(x)
524  return ret
525 
526  return tuples
527 
528  def datasetExists(self, datasetType, dataId={}, **rest):
529  """Determines if a dataset file exists.
530 
531  @param datasetType (str) the type of dataset to inquire about.
532  @param dataId (DataId, dict) the data id of the dataset.
533  @param **rest keyword arguments for the data id.
534  @returns (bool) True if the dataset exists or is non-file-based.
535  """
536 
537  datasetType = self._resolveDatasetTypeAlias(datasetType)
538  dataId = DataId(dataId)
539  dataId.update(**rest)
540 
541  location = None
542  for repoData in self._repos.inputs():
543  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
544  location = repoData.repo.map(datasetType, dataId)
545  if location:
546  break
547 
548  if location is None:
549  return False
550 
551  additionalData = location.getAdditionalData()
552  storageName = location.getStorageName()
553  if storageName in ('BoostStorage', 'FitsStorage', 'PafStorage',
554  'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage'):
555  locations = location.getLocations()
556  for locationString in locations:
557  logLoc = LogicalLocation(locationString, additionalData).locString()
558  if storageName == 'FitsStorage':
559  # Strip off directives for cfitsio (in square brackets, e.g., extension name)
560  bracket = logLoc.find('[')
561  if bracket > 0:
562  logLoc = logLoc[:bracket]
563  if not os.path.exists(logLoc):
564  return False
565  return True
566  self.log.warn("datasetExists() for non-file storage %s, dataset type=%s, keys=%s",
567  storageName, datasetType, str(dataId))
568  return True
569 
570  def get(self, datasetType, dataId=None, immediate=False, **rest):
571  """Retrieves a dataset given an input collection data id.
572 
573  @param datasetType (str) the type of dataset to retrieve.
574  @param dataId (dict) the data id.
575  @param immediate (bool) don't use a proxy for delayed loading.
576  @param **rest keyword arguments for the data id.
577  @returns an object retrieved from the dataset (or a proxy for one).
578  """
579 
580  datasetType = self._resolveDatasetTypeAlias(datasetType)
581  dataId = DataId(dataId)
582  dataId.update(**rest)
583  location = None
584  for repoData in self._repos.inputs():
585  if not dataId.tag or len(dataId.tag.intersection(repoData.tags)) > 0:
586  location = repoData.repo.map(datasetType, dataId)
587  if location:
588  break
589  if location is None:
590  raise NoResults("No locations for get:", datasetType, dataId)
591 
592  self.log.debug("Get type=%s keys=%s from %s", datasetType, dataId, str(location))
593 
594  if hasattr(location.mapper, "bypass_" + datasetType):
595  # this type loader block should get moved into a helper someplace, and duplciations removed.
596  pythonType = location.getPythonType()
597  if pythonType is not None:
598  if isinstance(pythonType, basestring):
599  # import this pythonType dynamically
600  pythonTypeTokenList = location.getPythonType().split('.')
601  importClassString = pythonTypeTokenList.pop()
602  importClassString = importClassString.strip()
603  importPackage = ".".join(pythonTypeTokenList)
604  importType = __import__(importPackage, globals(), locals(), [importClassString], 0)
605  pythonType = getattr(importType, importClassString)
606  bypassFunc = getattr(location.mapper, "bypass_" + datasetType)
607  callback = lambda: bypassFunc(datasetType, pythonType, location, dataId)
608  else:
609  callback = lambda: self._read(location)
610  if location.mapper.canStandardize(datasetType):
611  innerCallback = callback
612  callback = lambda: location.mapper.standardize(datasetType, innerCallback(), dataId)
613  if immediate:
614  return callback()
615  return ReadProxy(callback)
616 
617  def put(self, obj, datasetType, dataId={}, doBackup=False, **rest):
618  """Persists a dataset given an output collection data id.
619 
620  @param obj the object to persist.
621  @param datasetType (str) the type of dataset to persist.
622  @param dataId (dict) the data id.
623  @param doBackup if True, rename existing instead of overwriting
624  @param **rest keyword arguments for the data id.
625 
626  WARNING: Setting doBackup=True is not safe for parallel processing, as it
627  may be subject to race conditions.
628  """
629 
630  datasetType = self._resolveDatasetTypeAlias(datasetType)
631  dataId = DataId(dataId)
632  dataId.update(**rest)
633 
634  for repoData in self._repos.outputs():
635  location = repoData.repo.map(datasetType, dataId, write=True)
636  if location:
637  if doBackup:
638  repoData.repo.backup(datasetType, dataId)
639  repoData.repo.write(location, obj)
640 
641  def subset(self, datasetType, level=None, dataId={}, **rest):
642  """Extracts a subset of a dataset collection.
643 
644  Given a partial dataId specified in dataId and **rest, find all
645  datasets at a given level specified by a dataId key (e.g. visit or
646  sensor or amp for a camera) and return a collection of their dataIds
647  as ButlerDataRefs.
648 
649  @param datasetType (str) the type of dataset collection to subset
650  @param level (str) the level of dataId at which to subset. Use an empty string if the mapper
651  should look up the default level.
652  @param dataId (dict) the data id.
653  @param **rest keyword arguments for the data id.
654  @returns (ButlerSubset) collection of ButlerDataRefs for datasets
655  matching the data id.
656  """
657 
658  datasetType = self._resolveDatasetTypeAlias(datasetType)
659 
660  # Currently expected behavior of subset is that if specified level is None then the mapper's default
661  # level should be used. Convention for level within Butler is that an empty string is used to indicate
662  # 'get default'.
663  if level is None:
664  level = ''
665 
666  dataId = DataId(dataId)
667  dataId.update(**rest)
668  return ButlerSubset(self, datasetType, level, dataId)
669 
670  def dataRef(self, datasetType, level=None, dataId={}, **rest):
671  """Returns a single ButlerDataRef.
672 
673  Given a complete dataId specified in dataId and **rest, find the
674  unique dataset at the given level specified by a dataId key (e.g.
675  visit or sensor or amp for a camera) and return a ButlerDataRef.
676 
677  @param datasetType (str) the type of dataset collection to reference
678  @param level (str) the level of dataId at which to reference
679  @param dataId (dict) the data id.
680  @param **rest keyword arguments for the data id.
681  @returns (ButlerDataRef) ButlerDataRef for dataset matching the data id
682  """
683 
684  datasetType = self._resolveDatasetTypeAlias(datasetType)
685  dataId = DataId(dataId)
686  subset = self.subset(datasetType, level, dataId, **rest)
687  if len(subset) != 1:
688  raise RuntimeError("No unique dataset for: Dataset type:%s Level:%s Data ID:%s Keywords:%s" %
689  (str(datasetType), str(level), str(dataId), str(rest)))
690  return ButlerDataRef(subset, subset.cache[0])
691 
692  def _read(self, location):
693  self.log.debug("Starting read from %s", location)
694  results = location.repository.read(location)
695  if len(results) == 1:
696  results = results[0]
697  self.log.debug("Ending read from %s", location)
698  return results
699 
700  def __reduce__(self):
701  ret = (_unreduce, (self._initArgs, self.datasetTypeAliasDict))
702  return ret
703 
704  def _resolveDatasetTypeAlias(self, datasetType):
705  """ Replaces all the known alias keywords in the given string with the alias value.
706  @param (str)datasetType
707  @return (str) the de-aliased string
708  """
709 
710  for key in self.datasetTypeAliasDict:
711  # if all aliases have been replaced, bail out
712  if datasetType.find('@') == -1:
713  break
714  datasetType = datasetType.replace(key, self.datasetTypeAliasDict[key])
715 
716  # If an alias specifier can not be resolved then throw.
717  if datasetType.find('@') != -1:
718  raise RuntimeError("Unresolvable alias specifier in datasetType: %s" % (datasetType))
719 
720  return datasetType
721 
722 
723 def _unreduce(initArgs, datasetTypeAliasDict):
724  mapperArgs = initArgs.pop('mapperArgs')
725  initArgs.update(mapperArgs)
726  butler = Butler(**initArgs)
727  butler.datasetTypeAliasDict = datasetTypeAliasDict
728  return butler
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
Definition: Log.h:716