LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
connections.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining connection classes for PipelineTask.
23 """
24 
25 __all__ = ["PipelineTaskConnections", "InputQuantizedConnection", "OutputQuantizedConnection",
26  "DeferredDatasetRef", "iterConnections"]
27 
28 from collections import UserDict, namedtuple
29 from types import SimpleNamespace
30 import typing
31 
32 import itertools
33 import string
34 
35 from . import config as configMod
36 from .connectionTypes import (InitInput, InitOutput, Input, PrerequisiteInput,
37  Output, BaseConnection)
38 from lsst.daf.butler import DatasetRef, Quantum
39 
40 if typing.TYPE_CHECKING:
41  from .config import PipelineTaskConfig
42 
43 
44 class ScalarError(TypeError):
45  """Exception raised when dataset type is configured as scalar
46  but there are multiple DataIds in a Quantum for that dataset.
47 
48  Parameters
49  ----------
50  key : `str`
51  Name of the configuration field for dataset type.
52  numDataIds : `int`
53  Actual number of DataIds in a Quantum for this dataset type.
54  """
55  def __init__(self, key, numDataIds):
56  super().__init__((f"Expected scalar for output dataset field {key}, "
57  "received {numDataIds} DataIds"))
58 
59 
61  """This is a special dict class used by PipelineTaskConnectionMetaclass
62 
63  This dict is used in PipelineTaskConnection class creation, as the
64  dictionary that is initially used as __dict__. It exists to
65  intercept connection fields declared in a PipelineTaskConnection, and
66  what name is used to identify them. The names are then added to class
67  level list according to the connection type of the class attribute. The
68  names are also used as keys in a class level dictionary associated with
69  the corresponding class attribute. This information is a duplicate of
70  what exists in __dict__, but provides a simple place to lookup and
71  iterate on only these variables.
72  """
73  def __init__(self, *args, **kwargs):
74  super().__init__(*args, **kwargs)
75  # Initialize class level variables used to track any declared
76  # class level variables that are instances of
77  # connectionTypes.BaseConnection
78  self.data['inputs'] = []
79  self.data['prerequisiteInputs'] = []
80  self.data['outputs'] = []
81  self.data['initInputs'] = []
82  self.data['initOutputs'] = []
83  self.data['allConnections'] = {}
84 
85  def __setitem__(self, name, value):
86  if isinstance(value, Input):
87  self.data['inputs'].append(name)
88  elif isinstance(value, PrerequisiteInput):
89  self.data['prerequisiteInputs'].append(name)
90  elif isinstance(value, Output):
91  self.data['outputs'].append(name)
92  elif isinstance(value, InitInput):
93  self.data['initInputs'].append(name)
94  elif isinstance(value, InitOutput):
95  self.data['initOutputs'].append(name)
96  # This should not be an elif, as it needs tested for
97  # everything that inherits from BaseConnection
98  if isinstance(value, BaseConnection):
99  object.__setattr__(value, 'varName', name)
100  self.data['allConnections'][name] = value
101  # defer to the default behavior
102  super().__setitem__(name, value)
103 
104 
106  """Metaclass used in the declaration of PipelineTaskConnections classes
107  """
108  def __prepare__(name, bases, **kwargs): # noqa: 805
109  # Create an instance of our special dict to catch and track all
110  # variables that are instances of connectionTypes.BaseConnection
111  # Copy any existing connections from a parent class
113  for base in bases:
114  if isinstance(base, PipelineTaskConnectionsMetaclass):
115  for name, value in base.allConnections.items():
116  dct[name] = value
117  return dct
118 
119  def __new__(cls, name, bases, dct, **kwargs):
120  dimensionsValueError = TypeError("PipelineTaskConnections class must be created with a dimensions "
121  "attribute which is an iterable of dimension names")
122 
123  if name != 'PipelineTaskConnections':
124  # Verify that dimensions are passed as a keyword in class
125  # declaration
126  if 'dimensions' not in kwargs:
127  for base in bases:
128  if hasattr(base, 'dimensions'):
129  kwargs['dimensions'] = base.dimensions
130  break
131  if 'dimensions' not in kwargs:
132  raise dimensionsValueError
133  try:
134  dct['dimensions'] = set(kwargs['dimensions'])
135  except TypeError as exc:
136  raise dimensionsValueError from exc
137  # Lookup any python string templates that may have been used in the
138  # declaration of the name field of a class connection attribute
139  allTemplates = set()
140  stringFormatter = string.Formatter()
141  # Loop over all connections
142  for obj in dct['allConnections'].values():
143  nameValue = obj.name
144  # add all the parameters to the set of templates
145  for param in stringFormatter.parse(nameValue):
146  if param[1] is not None:
147  allTemplates.add(param[1])
148 
149  # look up any template from base classes and merge them all
150  # together
151  mergeDict = {}
152  for base in bases[::-1]:
153  if hasattr(base, 'defaultTemplates'):
154  mergeDict.update(base.defaultTemplates)
155  if 'defaultTemplates' in kwargs:
156  mergeDict.update(kwargs['defaultTemplates'])
157 
158  if len(mergeDict) > 0:
159  kwargs['defaultTemplates'] = mergeDict
160 
161  # Verify that if templated strings were used, defaults were
162  # supplied as an argument in the declaration of the connection
163  # class
164  if len(allTemplates) > 0 and 'defaultTemplates' not in kwargs:
165  raise TypeError("PipelineTaskConnection class contains templated attribute names, but no "
166  "defaut templates were provided, add a dictionary attribute named "
167  "defaultTemplates which contains the mapping between template key and value")
168  if len(allTemplates) > 0:
169  # Verify all templates have a default, and throw if they do not
170  defaultTemplateKeys = set(kwargs['defaultTemplates'].keys())
171  templateDifference = allTemplates.difference(defaultTemplateKeys)
172  if templateDifference:
173  raise TypeError(f"Default template keys were not provided for {templateDifference}")
174  # Verify that templates do not share names with variable names
175  # used for a connection, this is needed because of how
176  # templates are specified in an associated config class.
177  nameTemplateIntersection = allTemplates.intersection(set(dct['allConnections'].keys()))
178  if len(nameTemplateIntersection) > 0:
179  raise TypeError(f"Template parameters cannot share names with Class attributes")
180  dct['defaultTemplates'] = kwargs.get('defaultTemplates', {})
181 
182  # Convert all the connection containers into frozensets so they cannot
183  # be modified at the class scope
184  for connectionName in ("inputs", "prerequisiteInputs", "outputs", "initInputs", "initOutputs"):
185  dct[connectionName] = frozenset(dct[connectionName])
186  # our custom dict type must be turned into an actual dict to be used in
187  # type.__new__
188  return super().__new__(cls, name, bases, dict(dct))
189 
190  def __init__(cls, name, bases, dct, **kwargs):
191  # This overrides the default init to drop the kwargs argument. Python
192  # metaclasses will have this argument set if any kwargs are passes at
193  # class construction time, but should be consumed before calling
194  # __init__ on the type metaclass. This is in accordance with python
195  # documentation on metaclasses
196  super().__init__(name, bases, dct)
197 
198 
199 class QuantizedConnection(SimpleNamespace):
200  """A Namespace to map defined variable names of connections to their
201  `lsst.daf.buter.DatasetRef`s
202 
203  This class maps the names used to define a connection on a
204  PipelineTaskConnectionsClass to the corresponding
205  `lsst.daf.butler.DatasetRef`s provided by a `lsst.daf.butler.Quantum`
206  instance. This will be a quantum of execution based on the graph created
207  by examining all the connections defined on the
208  `PipelineTaskConnectionsClass`.
209  """
210  def __init__(self, **kwargs):
211  # Create a variable to track what attributes are added. This is used
212  # later when iterating over this QuantizedConnection instance
213  object.__setattr__(self, "_attributes", set())
214 
215  def __setattr__(self, name: str, value: typing.Union[DatasetRef, typing.List[DatasetRef]]):
216  # Capture the attribute name as it is added to this object
217  self._attributes.add(name)
218  super().__setattr__(name, value)
219 
220  def __delattr__(self, name):
221  object.__delattr__(self, name)
222  self._attributes.remove(name)
223 
224  def __iter__(self) -> typing.Generator[typing.Tuple[str, typing.Union[DatasetRef,
225  typing.List[DatasetRef]]], None, None]:
226  """Make an Iterator for this QuantizedConnection
227 
228  Iterating over a QuantizedConnection will yield a tuple with the name
229  of an attribute and the value associated with that name. This is
230  similar to dict.items() but is on the namespace attributes rather than
231  dict keys.
232  """
233  yield from ((name, getattr(self, name)) for name in self._attributes)
234 
235  def keys(self) -> typing.Generator[str, None, None]:
236  """Returns an iterator over all the attributes added to a
237  QuantizedConnection class
238  """
239  yield from self._attributes
240 
241 
243  pass
244 
245 
246 class OutputQuantizedConnection(QuantizedConnection):
247  pass
248 
249 
250 class DeferredDatasetRef(namedtuple("DeferredDatasetRefBase", "datasetRef")):
251  """Class which denotes that a datasetRef should be treated as deferred when
252  interacting with the butler
253 
254  Parameters
255  ----------
256  datasetRef : `lsst.daf.butler.DatasetRef`
257  The `lsst.daf.butler.DatasetRef` that will be eventually used to
258  resolve a dataset
259  """
260  __slots__ = ()
261 
262 
263 class PipelineTaskConnections(metaclass=PipelineTaskConnectionsMetaclass):
264  """PipelineTaskConnections is a class used to declare desired IO when a
265  PipelineTask is run by an activator
266 
267  Parameters
268  ----------
269  config : `PipelineTaskConfig` A `PipelineTaskConfig` class instance who's
270  class has been configured to use this `PipelineTaskConnectionsClass`
271 
272  Notes ----- PipelineTaskConnection classes are created by declaring class
273  attributes of types defined in lsst.pipe.base.connectionTypes and are
274  listed as follows:
275 
276  * InitInput - Defines connections in a quantum graph which are used as
277  inputs to the __init__ function of the PipelineTask corresponding to this
278  class
279  * InitOuput - Defines connections in a quantum graph which are to be
280  persisted using a butler at the end of the __init__ function of the
281  PipelineTask corresponding to this class. The variable name used to define
282  this connection should be the same as an attribute name on the PipelineTask
283  instance. E.g. if a InitOutput is declared with the name outputSchema in a
284  PipelineTaskConnections class, then a PipelineTask instance should have an
285  attribute self.outputSchema defined. Its value is what will be saved by the
286  activator framework.
287  * PrerequisiteInput - An input connection type that defines a
288  `lsst.daf.butler.DatasetType` that must be present at execution time, but
289  that will not be used during the course of creating the quantum graph to be
290  executed. These most often are things produced outside the processing
291  pipeline, such as reference catalogs.
292  * Input - Input `lsst.daf.butler.DatasetType`s that will be used in the run
293  method of a PipelineTask. The name used to declare class attribute must
294  match a function argument name in the run method of a PipelineTask. E.g. If
295  the PipelineTaskConnections defines an Input with the name calexp, then the
296  corresponding signature should be PipelineTask.run(calexp, ...)
297  * Output - A `lsst.daf.butler.DatasetType` that will be produced by an
298  execution of a PipelineTask. The name used to declare the connection must
299  correspond to an attribute of a `Struct` that is returned by a
300  `PipelineTask` run method. E.g. if an output connection is defined with
301  the name measCat, then the corresponding PipelineTask.run method must
302  return Struct(measCat=X,..) where X matches the storageClass type defined
303  on the output connection.
304 
305  The process of declaring a PipelineTaskConnection class involves parameters
306  passed in the declaration statement.
307 
308  The first parameter is dimensions which is an iterable of strings which
309  defines the unit of processing the run method of a corresponding
310  `PipelineTask` will operate on. These dimensions must match dimensions that
311  exist in the butler registry which will be used in executing the
312  corresponding `PipelineTask`.
313 
314  The second parameter is labeled ``defaultTemplates`` and is conditionally
315  optional. The name attributes of connections can be specified as python
316  format strings, with named format arguments. If any of the name parameters
317  on connections defined in a `PipelineTaskConnections` class contain a
318  template, then a default template value must be specified in the
319  ``defaultTemplates`` argument. This is done by passing a dictionary with
320  keys corresponding to a template identifier, and values corresponding to
321  the value to use as a default when formatting the string. For example if
322  ConnectionClass.calexp.name = '{input}Coadd_calexp' then
323  ``defaultTemplates`` = {'input': 'deep'}.
324 
325  Once a `PipelineTaskConnections` class is created, it is used in the
326  creation of a `PipelineTaskConfig`. This is further documented in the
327  documentation of `PipelineTaskConfig`. For the purposes of this
328  documentation, the relevant information is that the config class allows
329  configuration of connection names by users when running a pipeline.
330 
331  Instances of a `PipelineTaskConnections` class are used by the pipeline
332  task execution framework to introspect what a corresponding `PipelineTask`
333  will require, and what it will produce.
334 
335  Examples
336  --------
337  >>> from lsst.pipe.base import connectionTypes as cT
338  >>> from lsst.pipe.base import PipelineTaskConnections
339  >>> from lsst.pipe.base import PipelineTaskConfig
340  >>> class ExampleConnections(PipelineTaskConnections,
341  ... dimensions=("A", "B"),
342  ... defaultTemplates={"foo": "Example"}):
343  ... inputConnection = cT.Input(doc="Example input",
344  ... dimensions=("A", "B"),
345  ... storageClass=Exposure,
346  ... name="{foo}Dataset")
347  ... outputConnection = cT.Output(doc="Example output",
348  ... dimensions=("A", "B"),
349  ... storageClass=Exposure,
350  ... name="{foo}output")
351  >>> class ExampleConfig(PipelineTaskConfig,
352  ... pipelineConnections=ExampleConnections):
353  ... pass
354  >>> config = ExampleConfig()
355  >>> config.connections.foo = Modified
356  >>> config.connections.outputConnection = "TotallyDifferent"
357  >>> connections = ExampleConnections(config=config)
358  >>> assert(connections.inputConnection.name == "ModifiedDataset")
359  >>> assert(connections.outputCOnnection.name == "TotallyDifferent")
360  """
361 
362  def __init__(self, *, config: 'PipelineTaskConfig' = None):
363  self.inputs = set(self.inputs)
365  self.outputs = set(self.outputs)
366  self.initInputs = set(self.initInputs)
368 
369  if config is None or not isinstance(config, configMod.PipelineTaskConfig):
370  raise ValueError("PipelineTaskConnections must be instantiated with"
371  " a PipelineTaskConfig instance")
372  self.config = config
373  # Extract the template names that were defined in the config instance
374  # by looping over the keys of the defaultTemplates dict specified at
375  # class declaration time
376  templateValues = {name: getattr(config.connections, name) for name in getattr(self,
377  'defaultTemplates').keys()}
378  # Extract the configured value corresponding to each connection
379  # variable. I.e. for each connection identifier, populate a override
380  # for the connection.name attribute
381  self._nameOverrides = {name: getattr(config.connections, name).format(**templateValues)
382  for name in self.allConnections.keys()}
383 
384  # connections.name corresponds to a dataset type name, create a reverse
385  # mapping that goes from dataset type name to attribute identifier name
386  # (variable name) on the connection class
387  self._typeNameToVarName = {v: k for k, v in self._nameOverrides.items()}
388 
389  def buildDatasetRefs(self, quantum: Quantum) -> typing.Tuple[InputQuantizedConnection,
390  OutputQuantizedConnection]:
391  """Builds QuantizedConnections corresponding to input Quantum
392 
393  Parameters
394  ----------
395  quantum : `lsst.daf.butler.Quantum`
396  Quantum object which defines the inputs and outputs for a given
397  unit of processing
398 
399  Returns
400  -------
401  retVal : `tuple` of (`InputQuantizedConnection`,
402  `OutputQuantizedConnection`) Namespaces mapping attribute names
403  (identifiers of connections) to butler references defined in the
404  input `lsst.daf.butler.Quantum`
405  """
406  inputDatasetRefs = InputQuantizedConnection()
407  outputDatasetRefs = OutputQuantizedConnection()
408  # operate on a reference object and an interable of names of class
409  # connection attributes
410  for refs, names in zip((inputDatasetRefs, outputDatasetRefs),
411  (itertools.chain(self.inputs, self.prerequisiteInputs), self.outputs)):
412  # get a name of a class connection attribute
413  for attributeName in names:
414  # get the attribute identified by name
415  attribute = getattr(self, attributeName)
416  # Branch if the attribute dataset type is an input
417  if attribute.name in quantum.predictedInputs:
418  # Get the DatasetRefs
419  quantumInputRefs = quantum.predictedInputs[attribute.name]
420  # if the dataset is marked to load deferred, wrap it in a
421  # DeferredDatasetRef
422  if attribute.deferLoad:
423  quantumInputRefs = [DeferredDatasetRef(datasetRef=ref) for ref in quantumInputRefs]
424  # Unpack arguments that are not marked multiples (list of
425  # length one)
426  if not attribute.multiple:
427  if len(quantumInputRefs) != 1:
428  raise ScalarError(attributeName, len(quantumInputRefs))
429  quantumInputRefs = quantumInputRefs[0]
430  # Add to the QuantizedConnection identifier
431  setattr(refs, attributeName, quantumInputRefs)
432  # Branch if the attribute dataset type is an output
433  elif attribute.name in quantum.outputs:
434  value = quantum.outputs[attribute.name]
435  # Unpack arguments that are not marked multiples (list of
436  # length one)
437  if not attribute.multiple:
438  value = value[0]
439  # Add to the QuantizedConnection identifier
440  setattr(refs, attributeName, value)
441  # Specified attribute is not in inputs or outputs dont know how
442  # to handle, throw
443  else:
444  raise ValueError(f"Attribute with name {attributeName} has no counterpoint "
445  "in input quantum")
446  return inputDatasetRefs, outputDatasetRefs
447 
448  def adjustQuantum(self, datasetRefMap: InputQuantizedConnection):
449  """Override to make adjustments to `lsst.daf.butler.DatasetRef`s in the
450  `lsst.daf.butler.core.Quantum` during the graph generation stage of the
451  activator.
452 
453  Parameters
454  ----------
455  datasetRefMap : `dict`
456  Mapping with keys of dataset type name to `list` of
457  `lsst.daf.butler.DatasetRef`s
458 
459  Returns
460  -------
461  datasetRefMap : `dict`
462  Modified mapping of input with possible adjusted
463  `lsst.daf.butler.DatasetRef`s
464 
465  Raises
466  ------
467  Exception
468  Overrides of this function have the option of raising and Exception
469  if a field in the input does not satisfy a need for a corresponding
470  pipelineTask, i.e. no reference catalogs are found.
471  """
472  return datasetRefMap
473 
474 
475 def iterConnections(connections: PipelineTaskConnections, connectionType: str) -> typing.Generator:
476  """Creates an iterator over the selected connections type which yields
477  all the defined connections of that type.
478 
479  Parameters
480  ----------
481  connections: `PipelineTaskConnections`
482  An instance of a `PipelineTaskConnections` object that will be iterated
483  over.
484  connectionType: `str`
485  The type of connections to iterate over, valid values are inputs,
486  outputs, prerequisiteInputs, initInputs, initOutputs.
487 
488  Yields
489  -------
490  connection: `BaseConnection`
491  A connection defined on the input connections object of the type
492  supplied. The yielded value Will be an derived type of
493  `BaseConnection`.
494  """
495  for name in getattr(connections, connectionType):
496  yield getattr(connections, name)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:902
table::Key< int > type
Definition: Detector.cc:163
def __init__(self, key, numDataIds)
Definition: connections.py:55