22 """Module defining connection classes for PipelineTask. 25 __all__ = [
"PipelineTaskConnections",
"InputQuantizedConnection",
"OutputQuantizedConnection",
26 "DeferredDatasetRef",
"iterConnections"]
28 from collections
import UserDict, namedtuple
29 from types
import SimpleNamespace
35 from .
import config
as configMod
36 from .connectionTypes
import (InitInput, InitOutput, Input, PrerequisiteInput,
37 Output, BaseConnection)
38 from lsst.daf.butler
import DatasetRef, Quantum
40 if typing.TYPE_CHECKING:
41 from .config
import PipelineTaskConfig
45 """Exception raised when dataset type is configured as scalar 46 but there are multiple DataIds in a Quantum for that dataset. 51 Name of the configuration field for dataset type. 53 Actual number of DataIds in a Quantum for this dataset type. 56 super().
__init__((f
"Expected scalar for output dataset field {key}, " 57 f
"received {numDataIds} DataIds"))
61 """This is a special dict class used by PipelineTaskConnectionMetaclass 63 This dict is used in PipelineTaskConnection class creation, as the 64 dictionary that is initially used as __dict__. It exists to 65 intercept connection fields declared in a PipelineTaskConnection, and 66 what name is used to identify them. The names are then added to class 67 level list according to the connection type of the class attribute. The 68 names are also used as keys in a class level dictionary associated with 69 the corresponding class attribute. This information is a duplicate of 70 what exists in __dict__, but provides a simple place to lookup and 71 iterate on only these variables. 78 self.data[
'inputs'] = []
79 self.data[
'prerequisiteInputs'] = []
80 self.data[
'outputs'] = []
81 self.data[
'initInputs'] = []
82 self.data[
'initOutputs'] = []
83 self.data[
'allConnections'] = {}
86 if isinstance(value, Input):
87 self.data[
'inputs'].
append(name)
88 elif isinstance(value, PrerequisiteInput):
89 self.data[
'prerequisiteInputs'].
append(name)
90 elif isinstance(value, Output):
91 self.data[
'outputs'].
append(name)
92 elif isinstance(value, InitInput):
93 self.data[
'initInputs'].
append(name)
94 elif isinstance(value, InitOutput):
95 self.data[
'initOutputs'].
append(name)
98 if isinstance(value, BaseConnection):
99 object.__setattr__(value,
'varName', name)
100 self.data[
'allConnections'][name] = value
106 """Metaclass used in the declaration of PipelineTaskConnections classes 114 if isinstance(base, PipelineTaskConnectionsMetaclass):
115 for name, value
in base.allConnections.items():
120 dimensionsValueError = TypeError(
"PipelineTaskConnections class must be created with a dimensions " 121 "attribute which is an iterable of dimension names")
123 if name !=
'PipelineTaskConnections':
126 if 'dimensions' not in kwargs:
128 if hasattr(base,
'dimensions'):
129 kwargs[
'dimensions'] = base.dimensions
131 if 'dimensions' not in kwargs:
132 raise dimensionsValueError
134 dct[
'dimensions'] =
set(kwargs[
'dimensions'])
135 except TypeError
as exc:
136 raise dimensionsValueError
from exc
140 stringFormatter = string.Formatter()
142 for obj
in dct[
'allConnections'].values():
145 for param
in stringFormatter.parse(nameValue):
146 if param[1]
is not None:
147 allTemplates.add(param[1])
152 for base
in bases[::-1]:
153 if hasattr(base,
'defaultTemplates'):
154 mergeDict.update(base.defaultTemplates)
155 if 'defaultTemplates' in kwargs:
156 mergeDict.update(kwargs[
'defaultTemplates'])
158 if len(mergeDict) > 0:
159 kwargs[
'defaultTemplates'] = mergeDict
164 if len(allTemplates) > 0
and 'defaultTemplates' not in kwargs:
165 raise TypeError(
"PipelineTaskConnection class contains templated attribute names, but no " 166 "defaut templates were provided, add a dictionary attribute named " 167 "defaultTemplates which contains the mapping between template key and value")
168 if len(allTemplates) > 0:
170 defaultTemplateKeys =
set(kwargs[
'defaultTemplates'].
keys())
171 templateDifference = allTemplates.difference(defaultTemplateKeys)
172 if templateDifference:
173 raise TypeError(f
"Default template keys were not provided for {templateDifference}")
177 nameTemplateIntersection = allTemplates.intersection(
set(dct[
'allConnections'].
keys()))
178 if len(nameTemplateIntersection) > 0:
179 raise TypeError(f
"Template parameters cannot share names with Class attributes")
180 dct[
'defaultTemplates'] = kwargs.get(
'defaultTemplates', {})
184 for connectionName
in (
"inputs",
"prerequisiteInputs",
"outputs",
"initInputs",
"initOutputs"):
185 dct[connectionName] = frozenset(dct[connectionName])
188 return super().
__new__(cls, name, bases, dict(dct))
200 """A Namespace to map defined variable names of connections to their 201 `lsst.daf.buter.DatasetRef`s 203 This class maps the names used to define a connection on a 204 PipelineTaskConnectionsClass to the corresponding 205 `lsst.daf.butler.DatasetRef`s provided by a `lsst.daf.butler.Quantum` 206 instance. This will be a quantum of execution based on the graph created 207 by examining all the connections defined on the 208 `PipelineTaskConnectionsClass`. 213 object.__setattr__(self,
"_attributes",
set())
215 def __setattr__(self, name: str, value: typing.Union[DatasetRef, typing.List[DatasetRef]]):
217 self._attributes.add(name)
221 object.__delattr__(self, name)
222 self._attributes.remove(name)
224 def __iter__(self) -> typing.Generator[typing.Tuple[str, typing.Union[DatasetRef,
225 typing.List[DatasetRef]]], None, None]:
226 """Make an Iterator for this QuantizedConnection 228 Iterating over a QuantizedConnection will yield a tuple with the name 229 of an attribute and the value associated with that name. This is 230 similar to dict.items() but is on the namespace attributes rather than 233 yield from ((name, getattr(self, name))
for name
in self._attributes)
235 def keys(self) -> typing.Generator[str, None, None]:
236 """Returns an iterator over all the attributes added to a 237 QuantizedConnection class 239 yield from self._attributes
246 class OutputQuantizedConnection(QuantizedConnection):
251 """Class which denotes that a datasetRef should be treated as deferred when 252 interacting with the butler 256 datasetRef : `lsst.daf.butler.DatasetRef` 257 The `lsst.daf.butler.DatasetRef` that will be eventually used to 264 """PipelineTaskConnections is a class used to declare desired IO when a 265 PipelineTask is run by an activator 269 config : `PipelineTaskConfig` 270 A `PipelineTaskConfig` class instance whose class has been configured 271 to use this `PipelineTaskConnectionsClass` 275 ``PipelineTaskConnection`` classes are created by declaring class 276 attributes of types defined in `lsst.pipe.base.connectionTypes` and are 279 * ``InitInput`` - Defines connections in a quantum graph which are used as 280 inputs to the ``__init__`` function of the `PipelineTask` corresponding 282 * ``InitOuput`` - Defines connections in a quantum graph which are to be 283 persisted using a butler at the end of the ``__init__`` function of the 284 `PipelineTask` corresponding to this class. The variable name used to 285 define this connection should be the same as an attribute name on the 286 `PipelineTask` instance. E.g. if an ``InitOutput`` is declared with 287 the name ``outputSchema`` in a ``PipelineTaskConnections`` class, then 288 a `PipelineTask` instance should have an attribute 289 ``self.outputSchema`` defined. Its value is what will be saved by the 291 * ``PrerequisiteInput`` - An input connection type that defines a 292 `lsst.daf.butler.DatasetType` that must be present at execution time, 293 but that will not be used during the course of creating the quantum 294 graph to be executed. These most often are things produced outside the 295 processing pipeline, such as reference catalogs. 296 * ``Input`` - Input `lsst.daf.butler.DatasetType` objects that will be used 297 in the ``run`` method of a `PipelineTask`. The name used to declare 298 class attribute must match a function argument name in the ``run`` 299 method of a `PipelineTask`. E.g. If the ``PipelineTaskConnections`` 300 defines an ``Input`` with the name ``calexp``, then the corresponding 301 signature should be ``PipelineTask.run(calexp, ...)`` 302 * ``Output`` - A `lsst.daf.butler.DatasetType` that will be produced by an 303 execution of a `PipelineTask`. The name used to declare the connection 304 must correspond to an attribute of a `Struct` that is returned by a 305 `PipelineTask` ``run`` method. E.g. if an output connection is 306 defined with the name ``measCat``, then the corresponding 307 ``PipelineTask.run`` method must return ``Struct(measCat=X,..)`` where 308 X matches the ``storageClass`` type defined on the output connection. 310 The process of declaring a ``PipelineTaskConnection`` class involves 311 parameters passed in the declaration statement. 313 The first parameter is ``dimensions`` which is an iterable of strings which 314 defines the unit of processing the run method of a corresponding 315 `PipelineTask` will operate on. These dimensions must match dimensions that 316 exist in the butler registry which will be used in executing the 317 corresponding `PipelineTask`. 319 The second parameter is labeled ``defaultTemplates`` and is conditionally 320 optional. The name attributes of connections can be specified as python 321 format strings, with named format arguments. If any of the name parameters 322 on connections defined in a `PipelineTaskConnections` class contain a 323 template, then a default template value must be specified in the 324 ``defaultTemplates`` argument. This is done by passing a dictionary with 325 keys corresponding to a template identifier, and values corresponding to 326 the value to use as a default when formatting the string. For example if 327 ``ConnectionClass.calexp.name = '{input}Coadd_calexp'`` then 328 ``defaultTemplates`` = {'input': 'deep'}. 330 Once a `PipelineTaskConnections` class is created, it is used in the 331 creation of a `PipelineTaskConfig`. This is further documented in the 332 documentation of `PipelineTaskConfig`. For the purposes of this 333 documentation, the relevant information is that the config class allows 334 configuration of connection names by users when running a pipeline. 336 Instances of a `PipelineTaskConnections` class are used by the pipeline 337 task execution framework to introspect what a corresponding `PipelineTask` 338 will require, and what it will produce. 342 >>> from lsst.pipe.base import connectionTypes as cT 343 >>> from lsst.pipe.base import PipelineTaskConnections 344 >>> from lsst.pipe.base import PipelineTaskConfig 345 >>> class ExampleConnections(PipelineTaskConnections, 346 ... dimensions=("A", "B"), 347 ... defaultTemplates={"foo": "Example"}): 348 ... inputConnection = cT.Input(doc="Example input", 349 ... dimensions=("A", "B"), 350 ... storageClass=Exposure, 351 ... name="{foo}Dataset") 352 ... outputConnection = cT.Output(doc="Example output", 353 ... dimensions=("A", "B"), 354 ... storageClass=Exposure, 355 ... name="{foo}output") 356 >>> class ExampleConfig(PipelineTaskConfig, 357 ... pipelineConnections=ExampleConnections): 359 >>> config = ExampleConfig() 360 >>> config.connections.foo = Modified 361 >>> config.connections.outputConnection = "TotallyDifferent" 362 >>> connections = ExampleConnections(config=config) 363 >>> assert(connections.inputConnection.name == "ModifiedDataset") 364 >>> assert(connections.outputConnection.name == "TotallyDifferent") 367 def __init__(self, *, config:
'PipelineTaskConfig' =
None):
374 if config
is None or not isinstance(config, configMod.PipelineTaskConfig):
375 raise ValueError(
"PipelineTaskConnections must be instantiated with" 376 " a PipelineTaskConfig instance")
381 templateValues = {name: getattr(config.connections, name)
for name
in getattr(self,
382 'defaultTemplates').
keys()}
387 for name
in self.allConnections.
keys()}
395 OutputQuantizedConnection]:
396 """Builds QuantizedConnections corresponding to input Quantum 400 quantum : `lsst.daf.butler.Quantum` 401 Quantum object which defines the inputs and outputs for a given 406 retVal : `tuple` of (`InputQuantizedConnection`, 407 `OutputQuantizedConnection`) Namespaces mapping attribute names 408 (identifiers of connections) to butler references defined in the 409 input `lsst.daf.butler.Quantum` 415 for refs, names
in zip((inputDatasetRefs, outputDatasetRefs),
418 for attributeName
in names:
420 attribute = getattr(self, attributeName)
422 if attribute.name
in quantum.predictedInputs:
424 quantumInputRefs = quantum.predictedInputs[attribute.name]
427 if attribute.deferLoad:
431 if not attribute.multiple:
432 if len(quantumInputRefs) > 1:
433 raise ScalarError(attributeName, len(quantumInputRefs))
434 if len(quantumInputRefs) == 0:
436 quantumInputRefs = quantumInputRefs[0]
438 setattr(refs, attributeName, quantumInputRefs)
440 elif attribute.name
in quantum.outputs:
441 value = quantum.outputs[attribute.name]
444 if not attribute.multiple:
447 setattr(refs, attributeName, value)
451 raise ValueError(f
"Attribute with name {attributeName} has no counterpoint " 453 return inputDatasetRefs, outputDatasetRefs
456 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects 457 in the `lsst.daf.butler.core.Quantum` during the graph generation stage 462 datasetRefMap : `dict` 463 Mapping with keys of dataset type name to `list` of 464 `lsst.daf.butler.DatasetRef` objects 468 datasetRefMap : `dict` 469 Modified mapping of input with possible adjusted 470 `lsst.daf.butler.DatasetRef` objects 475 Overrides of this function have the option of raising an Exception 476 if a field in the input does not satisfy a need for a corresponding 477 pipelineTask, i.e. no reference catalogs are found. 482 def iterConnections(connections: PipelineTaskConnections, connectionType: str) -> typing.Generator:
483 """Creates an iterator over the selected connections type which yields 484 all the defined connections of that type. 488 connections: `PipelineTaskConnections` 489 An instance of a `PipelineTaskConnections` object that will be iterated 491 connectionType: `str` 492 The type of connections to iterate over, valid values are inputs, 493 outputs, prerequisiteInputs, initInputs, initOutputs. 497 connection: `BaseConnection` 498 A connection defined on the input connections object of the type 499 supplied. The yielded value Will be an derived type of 502 for name
in getattr(connections, connectionType):
503 yield getattr(connections, name)
def __init__(self, kwargs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
std::vector< SchemaItem< Flag > > * items
def __setitem__(self, name, value)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def __init__(self, key, numDataIds)
def __delattr__(self, name)
def __init__(self, args, kwargs)