22 """Module defining connection classes for PipelineTask. 25 __all__ = [
"PipelineTaskConnections",
"InputQuantizedConnection",
"OutputQuantizedConnection",
26 "DeferredDatasetRef",
"iterConnections"]
28 from collections
import UserDict, namedtuple
29 from types
import SimpleNamespace
35 from .
import config
as configMod
36 from .connectionTypes
import (InitInput, InitOutput, Input, PrerequisiteInput,
37 Output, BaseConnection)
38 from lsst.daf.butler
import DatasetRef, Quantum
40 if typing.TYPE_CHECKING:
41 from .config
import PipelineTaskConfig
45 """Exception raised when dataset type is configured as scalar 46 but there are multiple DataIds in a Quantum for that dataset. 51 Name of the configuration field for dataset type. 53 Actual number of DataIds in a Quantum for this dataset type. 56 super().
__init__((f
"Expected scalar for output dataset field {key}, " 57 "received {numDataIds} DataIds"))
61 """This is a special dict class used by PipelineTaskConnectionMetaclass 63 This dict is used in PipelineTaskConnection class creation, as the 64 dictionary that is initially used as __dict__. It exists to 65 intercept connection fields declared in a PipelineTaskConnection, and 66 what name is used to identify them. The names are then added to class 67 level list according to the connection type of the class attribute. The 68 names are also used as keys in a class level dictionary associated with 69 the corresponding class attribute. This information is a duplicate of 70 what exists in __dict__, but provides a simple place to lookup and 71 iterate on only these variables. 78 self.data[
'inputs'] = []
79 self.data[
'prerequisiteInputs'] = []
80 self.data[
'outputs'] = []
81 self.data[
'initInputs'] = []
82 self.data[
'initOutputs'] = []
83 self.data[
'allConnections'] = {}
86 if isinstance(value, Input):
87 self.data[
'inputs'].
append(name)
88 elif isinstance(value, PrerequisiteInput):
89 self.data[
'prerequisiteInputs'].
append(name)
90 elif isinstance(value, Output):
91 self.data[
'outputs'].
append(name)
92 elif isinstance(value, InitInput):
93 self.data[
'initInputs'].
append(name)
94 elif isinstance(value, InitOutput):
95 self.data[
'initOutputs'].
append(name)
98 if isinstance(value, BaseConnection):
99 object.__setattr__(value,
'varName', name)
100 self.data[
'allConnections'][name] = value
106 """Metaclass used in the declaration of PipelineTaskConnections classes 114 if isinstance(base, PipelineTaskConnectionsMetaclass):
115 for name, value
in base.allConnections.items():
120 dimensionsValueError = TypeError(
"PipelineTaskConnections class must be created with a dimensions " 121 "attribute which is an iterable of dimension names")
123 if name !=
'PipelineTaskConnections':
126 if 'dimensions' not in kwargs:
128 if hasattr(base,
'dimensions'):
129 kwargs[
'dimensions'] = base.dimensions
131 if 'dimensions' not in kwargs:
132 raise dimensionsValueError
134 dct[
'dimensions'] =
set(kwargs[
'dimensions'])
135 except TypeError
as exc:
136 raise dimensionsValueError
from exc
140 stringFormatter = string.Formatter()
142 for obj
in dct[
'allConnections'].values():
145 for param
in stringFormatter.parse(nameValue):
146 if param[1]
is not None:
147 allTemplates.add(param[1])
152 for base
in bases[::-1]:
153 if hasattr(base,
'defaultTemplates'):
154 mergeDict.update(base.defaultTemplates)
155 if 'defaultTemplates' in kwargs:
156 mergeDict.update(kwargs[
'defaultTemplates'])
158 if len(mergeDict) > 0:
159 kwargs[
'defaultTemplates'] = mergeDict
164 if len(allTemplates) > 0
and 'defaultTemplates' not in kwargs:
165 raise TypeError(
"PipelineTaskConnection class contains templated attribute names, but no " 166 "defaut templates were provided, add a dictionary attribute named " 167 "defaultTemplates which contains the mapping between template key and value")
168 if len(allTemplates) > 0:
170 defaultTemplateKeys =
set(kwargs[
'defaultTemplates'].
keys())
171 templateDifference = allTemplates.difference(defaultTemplateKeys)
172 if templateDifference:
173 raise TypeError(f
"Default template keys were not provided for {templateDifference}")
177 nameTemplateIntersection = allTemplates.intersection(
set(dct[
'allConnections'].
keys()))
178 if len(nameTemplateIntersection) > 0:
179 raise TypeError(f
"Template parameters cannot share names with Class attributes")
180 dct[
'defaultTemplates'] = kwargs.get(
'defaultTemplates', {})
184 for connectionName
in (
"inputs",
"prerequisiteInputs",
"outputs",
"initInputs",
"initOutputs"):
185 dct[connectionName] = frozenset(dct[connectionName])
188 return super().
__new__(cls, name, bases, dict(dct))
200 """A Namespace to map defined variable names of connections to their 201 `lsst.daf.buter.DatasetRef`s 203 This class maps the names used to define a connection on a 204 PipelineTaskConnectionsClass to the corresponding 205 `lsst.daf.butler.DatasetRef`s provided by a `lsst.daf.butler.Quantum` 206 instance. This will be a quantum of execution based on the graph created 207 by examining all the connections defined on the 208 `PipelineTaskConnectionsClass`. 213 object.__setattr__(self,
"_attributes",
set())
215 def __setattr__(self, name: str, value: typing.Union[DatasetRef, typing.List[DatasetRef]]):
217 self._attributes.add(name)
221 object.__delattr__(self, name)
222 self._attributes.remove(name)
224 def __iter__(self) -> typing.Generator[typing.Tuple[str, typing.Union[DatasetRef,
225 typing.List[DatasetRef]]], None, None]:
226 """Make an Iterator for this QuantizedConnection 228 Iterating over a QuantizedConnection will yield a tuple with the name 229 of an attribute and the value associated with that name. This is 230 similar to dict.items() but is on the namespace attributes rather than 233 yield from ((name, getattr(self, name))
for name
in self._attributes)
235 def keys(self) -> typing.Generator[str, None, None]:
236 """Returns an iterator over all the attributes added to a 237 QuantizedConnection class 239 yield from self._attributes
246 class OutputQuantizedConnection(QuantizedConnection):
251 """Class which denotes that a datasetRef should be treated as deferred when 252 interacting with the butler 256 datasetRef : `lsst.daf.butler.DatasetRef` 257 The `lsst.daf.butler.DatasetRef` that will be eventually used to 264 """PipelineTaskConnections is a class used to declare desired IO when a 265 PipelineTask is run by an activator 269 config : `PipelineTaskConfig` A `PipelineTaskConfig` class instance who's 270 class has been configured to use this `PipelineTaskConnectionsClass` 272 Notes ----- PipelineTaskConnection classes are created by declaring class 273 attributes of types defined in lsst.pipe.base.connectionTypes and are 276 * InitInput - Defines connections in a quantum graph which are used as 277 inputs to the __init__ function of the PipelineTask corresponding to this 279 * InitOuput - Defines connections in a quantum graph which are to be 280 persisted using a butler at the end of the __init__ function of the 281 PipelineTask corresponding to this class. The variable name used to define 282 this connection should be the same as an attribute name on the PipelineTask 283 instance. E.g. if a InitOutput is declared with the name outputSchema in a 284 PipelineTaskConnections class, then a PipelineTask instance should have an 285 attribute self.outputSchema defined. Its value is what will be saved by the 287 * PrerequisiteInput - An input connection type that defines a 288 `lsst.daf.butler.DatasetType` that must be present at execution time, but 289 that will not be used during the course of creating the quantum graph to be 290 executed. These most often are things produced outside the processing 291 pipeline, such as reference catalogs. 292 * Input - Input `lsst.daf.butler.DatasetType`s that will be used in the run 293 method of a PipelineTask. The name used to declare class attribute must 294 match a function argument name in the run method of a PipelineTask. E.g. If 295 the PipelineTaskConnections defines an Input with the name calexp, then the 296 corresponding signature should be PipelineTask.run(calexp, ...) 297 * Output - A `lsst.daf.butler.DatasetType` that will be produced by an 298 execution of a PipelineTask. The name used to declare the connection must 299 correspond to an attribute of a `Struct` that is returned by a 300 `PipelineTask` run method. E.g. if an output connection is defined with 301 the name measCat, then the corresponding PipelineTask.run method must 302 return Struct(measCat=X,..) where X matches the storageClass type defined 303 on the output connection. 305 The process of declaring a PipelineTaskConnection class involves parameters 306 passed in the declaration statement. 308 The first parameter is dimensions which is an iterable of strings which 309 defines the unit of processing the run method of a corresponding 310 `PipelineTask` will operate on. These dimensions must match dimensions that 311 exist in the butler registry which will be used in executing the 312 corresponding `PipelineTask`. 314 The second parameter is labeled ``defaultTemplates`` and is conditionally 315 optional. The name attributes of connections can be specified as python 316 format strings, with named format arguments. If any of the name parameters 317 on connections defined in a `PipelineTaskConnections` class contain a 318 template, then a default template value must be specified in the 319 ``defaultTemplates`` argument. This is done by passing a dictionary with 320 keys corresponding to a template identifier, and values corresponding to 321 the value to use as a default when formatting the string. For example if 322 ConnectionClass.calexp.name = '{input}Coadd_calexp' then 323 ``defaultTemplates`` = {'input': 'deep'}. 325 Once a `PipelineTaskConnections` class is created, it is used in the 326 creation of a `PipelineTaskConfig`. This is further documented in the 327 documentation of `PipelineTaskConfig`. For the purposes of this 328 documentation, the relevant information is that the config class allows 329 configuration of connection names by users when running a pipeline. 331 Instances of a `PipelineTaskConnections` class are used by the pipeline 332 task execution framework to introspect what a corresponding `PipelineTask` 333 will require, and what it will produce. 337 >>> from lsst.pipe.base import connectionTypes as cT 338 >>> from lsst.pipe.base import PipelineTaskConnections 339 >>> from lsst.pipe.base import PipelineTaskConfig 340 >>> class ExampleConnections(PipelineTaskConnections, 341 ... dimensions=("A", "B"), 342 ... defaultTemplates={"foo": "Example"}): 343 ... inputConnection = cT.Input(doc="Example input", 344 ... dimensions=("A", "B"), 345 ... storageClass=Exposure, 346 ... name="{foo}Dataset") 347 ... outputConnection = cT.Output(doc="Example output", 348 ... dimensions=("A", "B"), 349 ... storageClass=Exposure, 350 ... name="{foo}output") 351 >>> class ExampleConfig(PipelineTaskConfig, 352 ... pipelineConnections=ExampleConnections): 354 >>> config = ExampleConfig() 355 >>> config.connections.foo = Modified 356 >>> config.connections.outputConnection = "TotallyDifferent" 357 >>> connections = ExampleConnections(config=config) 358 >>> assert(connections.inputConnection.name == "ModifiedDataset") 359 >>> assert(connections.outputCOnnection.name == "TotallyDifferent") 362 def __init__(self, *, config:
'PipelineTaskConfig' =
None):
369 if config
is None or not isinstance(config, configMod.PipelineTaskConfig):
370 raise ValueError(
"PipelineTaskConnections must be instantiated with" 371 " a PipelineTaskConfig instance")
376 templateValues = {name: getattr(config.connections, name)
for name
in getattr(self,
377 'defaultTemplates').
keys()}
382 for name
in self.allConnections.
keys()}
390 OutputQuantizedConnection]:
391 """Builds QuantizedConnections corresponding to input Quantum 395 quantum : `lsst.daf.butler.Quantum` 396 Quantum object which defines the inputs and outputs for a given 401 retVal : `tuple` of (`InputQuantizedConnection`, 402 `OutputQuantizedConnection`) Namespaces mapping attribute names 403 (identifiers of connections) to butler references defined in the 404 input `lsst.daf.butler.Quantum` 410 for refs, names
in zip((inputDatasetRefs, outputDatasetRefs),
413 for attributeName
in names:
415 attribute = getattr(self, attributeName)
417 if attribute.name
in quantum.predictedInputs:
419 quantumInputRefs = quantum.predictedInputs[attribute.name]
422 if attribute.deferLoad:
426 if not attribute.multiple:
427 if len(quantumInputRefs) != 1:
428 raise ScalarError(attributeName, len(quantumInputRefs))
429 quantumInputRefs = quantumInputRefs[0]
431 setattr(refs, attributeName, quantumInputRefs)
433 elif attribute.name
in quantum.outputs:
434 value = quantum.outputs[attribute.name]
437 if not attribute.multiple:
440 setattr(refs, attributeName, value)
444 raise ValueError(f
"Attribute with name {attributeName} has no counterpoint " 446 return inputDatasetRefs, outputDatasetRefs
449 """Override to make adjustments to `lsst.daf.butler.DatasetRef`s in the 450 `lsst.daf.butler.core.Quantum` during the graph generation stage of the 455 datasetRefMap : `dict` 456 Mapping with keys of dataset type name to `list` of 457 `lsst.daf.butler.DatasetRef`s 461 datasetRefMap : `dict` 462 Modified mapping of input with possible adjusted 463 `lsst.daf.butler.DatasetRef`s 468 Overrides of this function have the option of raising and Exception 469 if a field in the input does not satisfy a need for a corresponding 470 pipelineTask, i.e. no reference catalogs are found. 475 def iterConnections(connections: PipelineTaskConnections, connectionType: str) -> typing.Generator:
476 """Creates an iterator over the selected connections type which yields 477 all the defined connections of that type. 481 connections: `PipelineTaskConnections` 482 An instance of a `PipelineTaskConnections` object that will be iterated 484 connectionType: `str` 485 The type of connections to iterate over, valid values are inputs, 486 outputs, prerequisiteInputs, initInputs, initOutputs. 490 connection: `BaseConnection` 491 A connection defined on the input connections object of the type 492 supplied. The yielded value Will be an derived type of 495 for name
in getattr(connections, connectionType):
496 yield getattr(connections, name)
def __init__(self, kwargs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
std::vector< SchemaItem< Flag > > * items
def __setitem__(self, name, value)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def __init__(self, key, numDataIds)
def __delattr__(self, name)
def __init__(self, args, kwargs)