LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
butlerQuantumContext.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining a butler like object specialized to a specific quantum.
23 """
24 
25 __all__ = ("ButlerQuantumContext",)
26 
27 import types
28 import typing
29 
30 from .connections import InputQuantizedConnection, OutputQuantizedConnection, DeferredDatasetRef
31 from .struct import Struct
32 from lsst.daf.butler import DatasetRef, Butler, Quantum
33 
34 
36  """Butler like class specialized for a single quantum
37 
38  A ButlerQuantumContext class wraps a standard butler interface and
39  specializes it to the context of a given quantum. What this means
40  in practice is that the only gets and puts that this class allows
41  are DatasetRefs that are contained in the quantum.
42 
43  In the future this class will also be used to record provenance on
44  what was actually get and put. This is in contrast to what the
45  preflight expects to be get and put by looking at the graph before
46  execution.
47 
48  Parameters
49  ----------
50  butler : `lsst.daf.butler.Butler`
51  Butler object from/to which datasets will be get/put
52  quantum : `lsst.daf.butler.core.Quantum`
53  Quantum object that describes the datasets which will
54  be get/put by a single execution of this node in the
55  pipeline graph.
56  """
57  def __init__(self, butler: Butler, quantum: Quantum):
58  self.quantum = quantum
59  self.registry = butler.registry
60  self.allInputs = set()
61  self.allOutputs = set()
62  for refs in quantum.predictedInputs.values():
63  for ref in refs:
64  self.allInputs.add((ref.datasetType, ref.dataId))
65  for refs in quantum.outputs.values():
66  for ref in refs:
67  self.allOutputs.add((ref.datasetType, ref.dataId))
68 
69  # Create closures over butler to discourage anyone from directly
70  # using a butler reference
71  def _get(self, ref):
72  if isinstance(ref, DeferredDatasetRef):
73  self._checkMembership(ref.datasetRef, self.allInputs)
74  return butler.getDeferred(ref.datasetRef)
75 
76  else:
77  self._checkMembership(ref, self.allInputs)
78  return butler.get(ref)
79 
80  def _put(self, value, ref):
81  self._checkMembership(ref, self.allOutputs)
82  butler.put(value, ref)
83 
84  self._get = types.MethodType(_get, self)
85  self._put = types.MethodType(_put, self)
86 
87  def get(self, dataset: typing.Union[InputQuantizedConnection,
88  typing.List[DatasetRef],
89  DatasetRef]) -> object:
90  """Fetches data from the butler
91 
92  Parameters
93  ----------
94  dataset
95  This argument may either be an `InputQuantizedConnection` which describes
96  all the inputs of a quantum, a list of `~lsst.daf.butler.DatasetRef`, or
97  a single `~lsst.daf.butler.DatasetRef`. The function will get and return
98  the corresponding datasets from the butler.
99 
100  Returns
101  -------
102  return : `object`
103  This function returns arbitrary objects fetched from the bulter. The
104  structure these objects are returned in depends on the type of the input
105  argument. If the input dataset argument is a InputQuantizedConnection, then
106  the return type will be a dictionary with keys corresponding to the attributes
107  of the `InputQuantizedConnection` (which in turn are the attribute identifiers
108  of the connections). If the input argument is of type `list` of
109  `~lsst.daf.butler.DatasetRef` then the return type will be a list of objects.
110  If the input argument is a single `~lsst.daf.butler.DatasetRef` then a single
111  object will be returned.
112 
113  Raises
114  ------
115  ValueError
116  If a `DatasetRef` is passed to get that is not defined in the quantum object
117  """
118  if isinstance(dataset, InputQuantizedConnection):
119  retVal = {}
120  for name, ref in dataset:
121  if isinstance(ref, list):
122  val = [self._get(r) for r in ref]
123  else:
124  val = self._get(ref)
125  retVal[name] = val
126  return retVal
127  elif isinstance(dataset, list):
128  return [self._get(x) for x in dataset]
129  elif isinstance(dataset, DatasetRef) or isinstance(dataset, DeferredDatasetRef):
130  return self._get(dataset)
131  else:
132  raise TypeError("Dataset argument is not a type that can be used to get")
133 
134  def put(self, values: typing.Union[Struct, typing.List[typing.Any], object],
135  dataset: typing.Union[OutputQuantizedConnection, typing.List[DatasetRef], DatasetRef]):
136  """Puts data into the butler
137 
138  Parameters
139  ----------
140  values : `Struct` or `list` of `object` or `object`
141  The data that should be put with the butler. If the type of the dataset
142  is `OutputQuantizedConnection` then this argument should be a `Struct`
143  with corresponding attribute names. Each attribute should then correspond
144  to either a list of object or a single object depending of the type of the
145  corresponding attribute on dataset. I.e. if dataset.calexp is [datasetRef1,
146  datasetRef2] then values.calexp should be [calexp1, calexp2]. Like wise
147  if there is a single ref, then only a single object need be passed. The same
148  restriction applies if dataset is directly a `list` of `DatasetRef` or a
149  single `DatasetRef`.
150  dataset
151  This argument may either be an `InputQuantizedConnection` which describes
152  all the inputs of a quantum, a list of `lsst.daf.butler.DatasetRef`, or
153  a single `lsst.daf.butler.DatasetRef`. The function will get and return
154  the corresponding datasets from the butler.
155 
156  Raises
157  ------
158  ValueError
159  If a `DatasetRef` is passed to put that is not defined in the quantum object, or
160  the type of values does not match what is expected from the type of dataset.
161  """
162  if isinstance(dataset, OutputQuantizedConnection):
163  if not isinstance(values, Struct):
164  raise ValueError("dataset is a OutputQuantizedConnection, a Struct with corresponding"
165  " attributes must be passed as the values to put")
166  for name, refs in dataset:
167  valuesAttribute = getattr(values, name)
168  if isinstance(refs, list):
169  if len(refs) != len(valuesAttribute):
170  raise ValueError(f"There must be a object to put for every Dataset ref in {name}")
171  for i, ref in enumerate(refs):
172  self._put(valuesAttribute[i], ref)
173  else:
174  self._put(valuesAttribute, refs)
175  elif isinstance(dataset, list):
176  if len(dataset) != len(values):
177  raise ValueError("There must be a common number of references and values to put")
178  for i, ref in enumerate(dataset):
179  self._put(values[i], ref)
180  elif isinstance(dataset, DatasetRef):
181  self._put(values, dataset)
182  else:
183  raise TypeError("Dataset argument is not a type that can be used to put")
184 
185  def _checkMembership(self, ref: typing.Union[typing.List[DatasetRef], DatasetRef], inout: set):
186  """Internal function used to check if a DatasetRef is part of the input quantum
187 
188  This function will raise an exception if the ButlerQuantumContext is used to
189  get/put a DatasetRef which is not defined in the quantum.
190 
191  Parameters
192  ----------
193  ref : `list` of `DatasetRef` or `DatasetRef`
194  Either a list or a single `DatasetRef` to check
195  inout : `set`
196  The connection type to check, e.g. either an input or an output. This prevents
197  both types needing to be checked for every operation, which may be important
198  for Quanta with lots of `DatasetRef`s.
199  """
200  if not isinstance(ref, list):
201  ref = [ref]
202  for r in ref:
203  if (r.datasetType, r.dataId) not in inout:
204  raise ValueError("DatasetRef is not part of the Quantum being processed")
daf::base::PropertySet * set
Definition: fits.cc:902