21 from __future__
import annotations
23 """Module defining Pipeline class and related methods. 26 __all__ = [
"Pipeline",
"TaskDef",
"TaskDatasetTypes",
"PipelineDatasetTypes"]
31 from dataclasses
import dataclass
32 from types
import MappingProxyType
33 from typing
import FrozenSet, Mapping, Union, Generator, TYPE_CHECKING
39 from lsst.daf.butler
import DatasetType, Registry, SkyPixDimension
41 from .configOverrides
import ConfigOverrides
42 from .connections
import PipelineTaskConnections, iterConnections
43 from .pipelineTask
import PipelineTask
45 from .
import pipelineIR
46 from .
import pipeTools
61 """TaskDef is a collection of information about task needed by Pipeline. 63 The information includes task name, configuration object and optional 64 task class. This class is just a collection of attributes and it exposes 65 all of them so that attributes could potentially be modified in place 66 (e.g. if configuration needs extra overrides). 71 `PipelineTask` class name, currently it is not specified whether this 72 is a fully-qualified name or partial name (e.g. ``module.TaskClass``). 73 Framework should be prepared to handle all cases. 74 config : `lsst.pex.config.Config` 75 Instance of the configuration class corresponding to this task class, 76 usually with all overrides applied. 77 taskClass : `type` or ``None`` 78 `PipelineTask` class object, can be ``None``. If ``None`` then 79 framework will have to locate and load class. 80 label : `str`, optional 81 Task label, usually a short string unique in a pipeline. 83 def __init__(self, taskName, config, taskClass=None, label=""):
88 self.
connections = config.connections.ConnectionsClass(config=config)
93 rep +=
", label=" + self.
label 99 """A `Pipeline` is a representation of a series of tasks to run, and the 100 configuration for those tasks. 105 A description of that this pipeline does. 108 pipeline_dict = {
"description": description,
"tasks": {}}
113 """Load a pipeline defined in a pipeline yaml file. 118 A path that points to a pipeline defined in yaml format 124 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_file(filename))
129 """Create a pipeline from string formatted as a pipeline document. 133 pipeline_string : `str` 134 A string that is formatted according like a pipeline document 140 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
144 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
145 """Create a pipeline from an already created `PipelineIR` object. 149 deserialized_pipeline: `PipelineIR` 150 An already created pipeline intermediate representation object 156 pipeline = cls.__new__(cls)
157 pipeline._pipelineIR = deserialized_pipeline
162 """Create a new pipeline by copying an already existing `Pipeline`. 167 An already created pipeline intermediate representation object 173 return cls.
fromIR(copy.deep_copy(pipeline._pipelineIR))
179 """Add an instrument to the pipeline, or replace an instrument that is 184 instrument : `~lsst.daf.butler.instrument.Instrument` or `str` 185 Either a derived class object of a `lsst.daf.butler.instrument` or a 186 string corresponding to a fully qualified 187 `lsst.daf.butler.instrument` name. 189 if isinstance(instrument, str):
193 instrument = f
"{instrument.__module__}.{instrument.__qualname__}" 196 def addTask(self, task: Union[PipelineTask, str], label: str):
197 """Add a new task to the pipeline, or replace a task that is already 198 associated with the supplied label. 202 task: `PipelineTask` or `str` 203 Either a derived class object of a `PipelineTask` or a string 204 corresponding to a fully qualified `PipelineTask` name. 206 A label that is used to identify the `PipelineTask` being added 208 if isinstance(task, str):
210 elif issubclass(task, PipelineTask):
211 taskName = f
"{task.__module__}.{task.__qualname__}" 213 raise ValueError(
"task must be either a child class of PipelineTask or a string containing" 214 " a fully qualified name to one")
218 """Remove a task from the pipeline. 223 The label used to identify the task that is to be removed 228 If no task with that label exists in the pipeline 234 """Apply single config override. 241 Fully-qualified field name. 243 Value to be given to a field. 248 """Add overrides from a specified file. 253 The label used to identify the task associated with config to 256 Path to the override file. 261 """Add Overrides by running a snippet of python code against a config. 266 The label used to identity the task associated with config to 269 A string which is valid python code to be executed. This is done 270 with config as the only local accessible value. 276 raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
277 self.
_pipelineIR.tasks[label].add_or_update_config(newConfig)
283 """Returns a generator of TaskDefs which can be used to create quantum 288 generator : generator of `TaskDef` 289 The generator returned will be the sorted iterator of tasks which 290 are to be used in constructing a quantum graph. 295 If a dataId is supplied in a config block. This is in place for 299 for label, taskIR
in self.
_pipelineIR.tasks.items():
301 taskName = taskClass.__qualname__
302 config = taskClass.ConfigClass()
305 overrides.addInstrumentOverride(self.
_pipelineIR.instrument, taskClass._DefaultName)
306 if taskIR.config
is not None:
307 for configIR
in taskIR.config:
308 if configIR.dataId
is not None:
309 raise NotImplementedError(
"Specializing a config on a partial data id is not yet " 310 "supported in Pipeline definition")
312 if configIR.dataId
is None:
314 for configFile
in configIR.file:
315 overrides.addFileOverride(configFile)
316 if configIR.python
is not None:
317 overrides.addPythonOverride(configIR.python)
318 for key, value
in configIR.rest.items():
319 overrides.addValueOverride(key, value)
320 overrides.applyTo(config)
323 taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
327 label_to_config = {x.label: x.config
for x
in taskDefs}
331 success = eval(contract.contract,
None, label_to_config)
333 extra_info = f
": {contract.msg}" if contract.msg
is not None else "" 335 f
"satisfied{extra_info}")
337 yield from pipeTools.orderPipeline(taskDefs)
343 if not isinstance(other, Pipeline):
348 @dataclass(frozen=
True)
350 """An immutable struct that extracts and classifies the dataset types used 354 initInputs: FrozenSet[DatasetType]
355 """Dataset types that are needed as inputs in order to construct this Task. 357 Task-level `initInputs` may be classified as either 358 `~PipelineDatasetTypes.initInputs` or 359 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 362 initOutputs: FrozenSet[DatasetType]
363 """Dataset types that may be written after constructing this Task. 365 Task-level `initOutputs` may be classified as either 366 `~PipelineDatasetTypes.initOutputs` or 367 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 370 inputs: FrozenSet[DatasetType]
371 """Dataset types that are regular inputs to this Task. 373 If an input dataset needed for a Quantum cannot be found in the input 374 collection(s) or produced by another Task in the Pipeline, that Quantum 375 (and all dependent Quanta) will not be produced. 377 Task-level `inputs` may be classified as either 378 `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates` 379 at the Pipeline level. 382 prerequisites: FrozenSet[DatasetType]
383 """Dataset types that are prerequisite inputs to this Task. 385 Prerequisite inputs must exist in the input collection(s) before the 386 pipeline is run, but do not constrain the graph - if a prerequisite is 387 missing for a Quantum, `PrerequisiteMissingError` is raised. 389 Prerequisite inputs are not resolved until the second stage of 390 QuantumGraph generation. 393 outputs: FrozenSet[DatasetType]
394 """Dataset types that are produced by this Task. 396 Task-level `outputs` may be classified as either 397 `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates` 398 at the Pipeline level. 403 registry: Registry) -> TaskDatasetTypes:
404 """Extract and classify the dataset types from a single `PipelineTask`. 408 connectionsInstance: `PipelineTaskConnections` 409 An instance of a `PipelineTaskConnections` class for a particular 412 Registry used to construct normalized `DatasetType` objects and 413 retrieve those that are incomplete. 417 types: `TaskDatasetTypes` 418 The dataset types used by this task. 420 def makeDatasetTypesSet(connectionType):
421 """Constructs a set of true `DatasetType` objects 425 connectionType : `str` 426 Name of the connection type to produce a set for, corresponds 427 to an attribute of type `list` on the connection class instance 431 datasetTypes : `frozenset` 432 A set of all datasetTypes which correspond to the input 433 connection type specified in the connection class of this 438 This function is a closure over the variables ``registry`` and 439 ``connectionsInstance``. 443 dimensions =
set(getattr(c,
'dimensions',
set()))
444 if "skypix" in dimensions:
446 datasetType = registry.getDatasetType(c.name)
447 except LookupError
as err:
449 f
"DatasetType '{c.name}' referenced by " 450 f
"{type(connectionsInstance).__name__} uses 'skypix' as a dimension " 451 f
"placeholder, but does not already exist in the registry. " 452 f
"Note that reference catalog names are now used as the dataset " 453 f
"type name instead of 'ref_cat'." 455 rest1 =
set(registry.dimensions.extract(dimensions -
set([
"skypix"])).names)
456 rest2 =
set(dim.name
for dim
in datasetType.dimensions
457 if not isinstance(dim, SkyPixDimension))
459 raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in " 460 f
"connections ({rest1}) are inconsistent with those in " 461 f
"registry's version of this dataset ({rest2}).")
463 datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
465 datasetTypes.append(datasetType)
466 return frozenset(datasetTypes)
469 initInputs=makeDatasetTypesSet(
"initInputs"),
470 initOutputs=makeDatasetTypesSet(
"initOutputs"),
471 inputs=makeDatasetTypesSet(
"inputs"),
472 prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
473 outputs=makeDatasetTypesSet(
"outputs"),
477 @dataclass(frozen=
True)
479 """An immutable struct that classifies the dataset types used in a 483 initInputs: FrozenSet[DatasetType]
484 """Dataset types that are needed as inputs in order to construct the Tasks 487 This does not include dataset types that are produced when constructing 488 other Tasks in the Pipeline (these are classified as `initIntermediates`). 491 initOutputs: FrozenSet[DatasetType]
492 """Dataset types that may be written after constructing the Tasks in this 495 This does not include dataset types that are also used as inputs when 496 constructing other Tasks in the Pipeline (these are classified as 497 `initIntermediates`). 500 initIntermediates: FrozenSet[DatasetType]
501 """Dataset types that are both used when constructing one or more Tasks 502 in the Pipeline and produced as a side-effect of constructing another 503 Task in the Pipeline. 506 inputs: FrozenSet[DatasetType]
507 """Dataset types that are regular inputs for the full pipeline. 509 If an input dataset needed for a Quantum cannot be found in the input 510 collection(s), that Quantum (and all dependent Quanta) will not be 514 prerequisites: FrozenSet[DatasetType]
515 """Dataset types that are prerequisite inputs for the full Pipeline. 517 Prerequisite inputs must exist in the input collection(s) before the 518 pipeline is run, but do not constrain the graph - if a prerequisite is 519 missing for a Quantum, `PrerequisiteMissingError` is raised. 521 Prerequisite inputs are not resolved until the second stage of 522 QuantumGraph generation. 525 intermediates: FrozenSet[DatasetType]
526 """Dataset types that are output by one Task in the Pipeline and consumed 527 as inputs by one or more other Tasks in the Pipeline. 530 outputs: FrozenSet[DatasetType]
531 """Dataset types that are output by a Task in the Pipeline and not consumed 532 by any other Task in the Pipeline. 535 byTask: Mapping[str, TaskDatasetTypes]
536 """Per-Task dataset types, keyed by label in the `Pipeline`. 538 This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming 539 neither has been modified since the dataset types were extracted, of 544 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
545 """Extract and classify the dataset types from all tasks in a 551 An ordered collection of tasks that can be run together. 553 Registry used to construct normalized `DatasetType` objects and 554 retrieve those that are incomplete. 558 types: `PipelineDatasetTypes` 559 The dataset types used by this `Pipeline`. 564 Raised if Tasks are inconsistent about which datasets are marked 565 prerequisite. This indicates that the Tasks cannot be run as part 566 of the same `Pipeline`. 570 allInitInputs =
set()
571 allInitOutputs =
set()
572 prerequisites =
set()
574 if isinstance(pipeline, Pipeline):
575 pipeline = pipeline.toExpandedPipeline()
576 for taskDef
in pipeline:
577 thisTask = TaskDatasetTypes.fromConnections(taskDef.connections, registry=registry)
578 allInitInputs.update(thisTask.initInputs)
579 allInitOutputs.update(thisTask.initOutputs)
580 allInputs.update(thisTask.inputs)
581 prerequisites.update(thisTask.prerequisites)
582 allOutputs.update(thisTask.outputs)
583 byTask[taskDef.label] = thisTask
584 if not prerequisites.isdisjoint(allInputs):
585 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
586 {dt.name
for dt
in allInputs & prerequisites}
588 if not prerequisites.isdisjoint(allOutputs):
589 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
590 {dt.name
for dt
in allOutputs & prerequisites}
595 intermediateComponents =
set()
596 intermediateComposites =
set()
597 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
598 for dsType
in allInputs:
600 name, component = dsType.nameAndComponent()
604 if component
is not None:
605 if name
in outputNameMapping
and outputNameMapping[name].dimensions == dsType.dimensions:
606 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
607 universe=registry.dimensions)
608 intermediateComponents.add(dsType)
609 intermediateComposites.add(composite)
611 initInputs=frozenset(allInitInputs - allInitOutputs),
612 initIntermediates=frozenset(allInitInputs & allInitOutputs),
613 initOutputs=frozenset(allInitOutputs - allInitInputs),
614 inputs=frozenset(allInputs - allOutputs - intermediateComponents),
615 intermediates=frozenset(allInputs & allOutputs | intermediateComponents),
616 outputs=frozenset(allOutputs - allInputs - intermediateComposites),
617 prerequisites=frozenset(prerequisites),
618 byTask=MappingProxyType(byTask),
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
daf::base::PropertySet * set
def __init__(self, taskName, config, taskClass=None, label="")
def toExpandedPipeline(self)