LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
butlerQuantumContext.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining a butler like object specialized to a specific quantum.
23 """
24 
25 __all__ = ("ButlerQuantumContext",)
26 
27 import types
28 import typing
29 
30 from .connections import InputQuantizedConnection, OutputQuantizedConnection, DeferredDatasetRef
31 from .struct import Struct
32 from lsst.daf.butler import DatasetRef, Butler, Quantum
33 
34 
36  """Butler like class specialized for a single quantum
37 
38  A ButlerQuantumContext class wraps a standard butler interface and
39  specializes it to the context of a given quantum. What this means
40  in practice is that the only gets and puts that this class allows
41  are DatasetRefs that are contained in the quantum.
42 
43  In the future this class will also be used to record provenance on
44  what was actually get and put. This is in contrast to what the
45  preflight expects to be get and put by looking at the graph before
46  execution.
47 
48  Parameters
49  ----------
50  butler : `lsst.daf.butler.Butler`
51  Butler object from/to which datasets will be get/put
52  quantum : `lsst.daf.butler.core.Quantum`
53  Quantum object that describes the datasets which will
54  be get/put by a single execution of this node in the
55  pipeline graph.
56  """
57  def __init__(self, butler: Butler, quantum: Quantum):
58  self.quantum = quantum
59  self.registry = butler.registry
60  self.allInputs = set()
61  self.allOutputs = set()
62  for refs in quantum.predictedInputs.values():
63  for ref in refs:
64  self.allInputs.add((ref.datasetType, ref.dataId))
65  for refs in quantum.outputs.values():
66  for ref in refs:
67  self.allOutputs.add((ref.datasetType, ref.dataId))
68 
69  # Create closures over butler to discourage anyone from directly
70  # using a butler reference
71  def _get(self, ref):
72  if isinstance(ref, DeferredDatasetRef):
73  self._checkMembership(ref.datasetRef, self.allInputs)
74  return butler.getDeferred(ref.datasetRef)
75 
76  else:
77  self._checkMembership(ref, self.allInputs)
78  return butler.get(ref)
79 
80  def _put(self, value, ref):
81  self._checkMembership(ref, self.allOutputs)
82  butler.put(value, ref)
83 
84  self._get = types.MethodType(_get, self)
85  self._put = types.MethodType(_put, self)
86 
87  def get(self, dataset: typing.Union[InputQuantizedConnection,
88  typing.List[DatasetRef],
89  DatasetRef]) -> object:
90  """Fetches data from the butler
91 
92  Parameters
93  ----------
94  dataset : `InputQuantizedConnection` or `list` [`~lsst.daf.butler.DatasetRef`]
95  or `~lsst.daf.butler.DatasetRef`
96  This argument may either be an `InputQuantizedConnection` which describes
97  all the inputs of a quantum, a list of `~lsst.daf.butler.DatasetRef`, or
98  a single `~lsst.daf.butler.DatasetRef`. The function will get and return
99  the corresponding datasets from the butler.
100 
101  Returns
102  -------
103  return : `object`
104  This function returns arbitrary objects fetched from the bulter. The
105  structure these objects are returned in depends on the type of the input
106  argument. If the input dataset argument is a InputQuantizedConnection, then
107  the return type will be a dictionary with keys corresponding to the attributes
108  of the `InputQuantizedConnection` (which in turn are the attribute identifiers
109  of the connections). If the input argument is of type `list` of
110  `~lsst.daf.butler.DatasetRef` then the return type will be a list of objects.
111  If the input argument is a single `~lsst.daf.butler.DatasetRef` then a single
112  object will be returned.
113 
114  Raises
115  ------
116  ValueError
117  If a `DatasetRef` is passed to get that is not defined in the quantum object
118  """
119  if isinstance(dataset, InputQuantizedConnection):
120  retVal = {}
121  for name, ref in dataset:
122  if isinstance(ref, list):
123  val = [self._get(r) for r in ref]
124  else:
125  val = self._get(ref)
126  retVal[name] = val
127  return retVal
128  elif isinstance(dataset, list):
129  return [self._get(x) for x in dataset]
130  elif isinstance(dataset, DatasetRef) or isinstance(dataset, DeferredDatasetRef):
131  return self._get(dataset)
132  else:
133  raise TypeError("Dataset argument is not a type that can be used to get")
134 
135  def put(self, values: typing.Union[Struct, typing.List[typing.Any], object],
136  dataset: typing.Union[OutputQuantizedConnection, typing.List[DatasetRef], DatasetRef]):
137  """Puts data into the butler
138 
139  Parameters
140  ----------
141  values : `Struct` or `list` of `object` or `object`
142  The data that should be put with the butler. If the type of the dataset
143  is `OutputQuantizedConnection` then this argument should be a `Struct`
144  with corresponding attribute names. Each attribute should then correspond
145  to either a list of object or a single object depending of the type of the
146  corresponding attribute on dataset. I.e. if dataset.calexp is [datasetRef1,
147  datasetRef2] then values.calexp should be [calexp1, calexp2]. Like wise
148  if there is a single ref, then only a single object need be passed. The same
149  restriction applies if dataset is directly a `list` of `DatasetRef` or a
150  single `DatasetRef`.
151  dataset : `OutputQuantizedConnection` or `list` of `lsst.daf.butler.DatasetRef`
152  or `lsst.daf.butler.DatasetRef`
153  This argument may either be an `InputQuantizedConnection` which describes
154  all the inputs of a quantum, a list of `lsst.daf.butler.DatasetRef`, or
155  a single `lsst.daf.butler.DatasetRef`. The function will get and return
156  the corresponding datasets from the butler.
157 
158  Raises
159  ------
160  ValueError
161  If a `DatasetRef` is passed to put that is not defined in the quantum object, or
162  the type of values does not match what is expected from the type of dataset.
163  """
164  if isinstance(dataset, OutputQuantizedConnection):
165  if not isinstance(values, Struct):
166  raise ValueError("dataset is a OutputQuantizedConnection, a Struct with corresponding"
167  " attributes must be passed as the values to put")
168  for name, refs in dataset:
169  valuesAttribute = getattr(values, name)
170  if isinstance(refs, list):
171  if len(refs) != len(valuesAttribute):
172  raise ValueError(f"There must be a object to put for every Dataset ref in {name}")
173  for i, ref in enumerate(refs):
174  self._put(valuesAttribute[i], ref)
175  else:
176  self._put(valuesAttribute, refs)
177  elif isinstance(dataset, list):
178  if len(dataset) != len(values):
179  raise ValueError("There must be a common number of references and values to put")
180  for i, ref in enumerate(dataset):
181  self._put(values[i], ref)
182  elif isinstance(dataset, DatasetRef):
183  self._put(values, dataset)
184  else:
185  raise TypeError("Dataset argument is not a type that can be used to put")
186 
187  def _checkMembership(self, ref: typing.Union[typing.List[DatasetRef], DatasetRef], inout: set):
188  """Internal function used to check if a DatasetRef is part of the input quantum
189 
190  This function will raise an exception if the ButlerQuantumContext is used to
191  get/put a DatasetRef which is not defined in the quantum.
192 
193  Parameters
194  ----------
195  ref : `list` of `DatasetRef` or `DatasetRef`
196  Either a list or a single `DatasetRef` to check
197  inout : `set`
198  The connection type to check, e.g. either an input or an output. This prevents
199  both types needing to be checked for every operation, which may be important
200  for Quanta with lots of `DatasetRef`s.
201  """
202  if not isinstance(ref, list):
203  ref = [ref]
204  for r in ref:
205  if (r.datasetType, r.dataId) not in inout:
206  raise ValueError("DatasetRef is not part of the Quantum being processed")
daf::base::PropertySet * set
Definition: fits.cc:902