21 from __future__
import annotations
23 """Module defining Pipeline class and related methods. 26 __all__ = [
"Pipeline",
"TaskDef",
"TaskDatasetTypes",
"PipelineDatasetTypes"]
31 from dataclasses
import dataclass
32 from types
import MappingProxyType
33 from typing
import FrozenSet, Mapping, Union, Generator, TYPE_CHECKING
39 from lsst.daf.butler
import DatasetType, Registry, SkyPixDimension
41 from .configOverrides
import ConfigOverrides
42 from .connections
import iterConnections
43 from .pipelineTask
import PipelineTask
45 from .
import pipelineIR
46 from .
import pipeTools
61 """TaskDef is a collection of information about task needed by Pipeline. 63 The information includes task name, configuration object and optional 64 task class. This class is just a collection of attributes and it exposes 65 all of them so that attributes could potentially be modified in place 66 (e.g. if configuration needs extra overrides). 71 `PipelineTask` class name, currently it is not specified whether this 72 is a fully-qualified name or partial name (e.g. ``module.TaskClass``). 73 Framework should be prepared to handle all cases. 74 config : `lsst.pex.config.Config` 75 Instance of the configuration class corresponding to this task class, 76 usually with all overrides applied. 77 taskClass : `type` or ``None`` 78 `PipelineTask` class object, can be ``None``. If ``None`` then 79 framework will have to locate and load class. 80 label : `str`, optional 81 Task label, usually a short string unique in a pipeline. 83 def __init__(self, taskName, config, taskClass=None, label=""):
88 self.
connections = config.connections.ConnectionsClass(config=config)
92 """Name of a dataset type for metadata of this task, `None` if 93 metadata is not to be saved (`str`) 95 if self.
config.saveMetadata:
96 return self.
label +
"_metadata" 103 rep +=
", label=" + self.
label 109 """A `Pipeline` is a representation of a series of tasks to run, and the 110 configuration for those tasks. 115 A description of that this pipeline does. 118 pipeline_dict = {
"description": description,
"tasks": {}}
123 """Load a pipeline defined in a pipeline yaml file. 128 A path that points to a pipeline defined in yaml format 134 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_file(filename))
139 """Create a pipeline from string formatted as a pipeline document. 143 pipeline_string : `str` 144 A string that is formatted according like a pipeline document 150 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
154 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
155 """Create a pipeline from an already created `PipelineIR` object. 159 deserialized_pipeline: `PipelineIR` 160 An already created pipeline intermediate representation object 166 pipeline = cls.__new__(cls)
167 pipeline._pipelineIR = deserialized_pipeline
172 """Create a new pipeline by copying an already existing `Pipeline`. 177 An already created pipeline intermediate representation object 183 return cls.
fromIR(copy.deep_copy(pipeline._pipelineIR))
189 """Add an instrument to the pipeline, or replace an instrument that is 194 instrument : `~lsst.daf.butler.instrument.Instrument` or `str` 195 Either a derived class object of a `lsst.daf.butler.instrument` or a 196 string corresponding to a fully qualified 197 `lsst.daf.butler.instrument` name. 199 if isinstance(instrument, str):
203 instrument = f
"{instrument.__module__}.{instrument.__qualname__}" 206 def addTask(self, task: Union[PipelineTask, str], label: str):
207 """Add a new task to the pipeline, or replace a task that is already 208 associated with the supplied label. 212 task: `PipelineTask` or `str` 213 Either a derived class object of a `PipelineTask` or a string 214 corresponding to a fully qualified `PipelineTask` name. 216 A label that is used to identify the `PipelineTask` being added 218 if isinstance(task, str):
220 elif issubclass(task, PipelineTask):
221 taskName = f
"{task.__module__}.{task.__qualname__}" 223 raise ValueError(
"task must be either a child class of PipelineTask or a string containing" 224 " a fully qualified name to one")
229 if isinstance(task, str):
231 label = task._DefaultName
235 """Remove a task from the pipeline. 240 The label used to identify the task that is to be removed 245 If no task with that label exists in the pipeline 251 """Apply single config override. 258 Fully-qualified field name. 260 Value to be given to a field. 265 """Add overrides from a specified file. 270 The label used to identify the task associated with config to 273 Path to the override file. 278 """Add Overrides by running a snippet of python code against a config. 283 The label used to identity the task associated with config to 286 A string which is valid python code to be executed. This is done 287 with config as the only local accessible value. 293 raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
294 self.
_pipelineIR.tasks[label].add_or_update_config(newConfig)
300 """Returns a generator of TaskDefs which can be used to create quantum 305 generator : generator of `TaskDef` 306 The generator returned will be the sorted iterator of tasks which 307 are to be used in constructing a quantum graph. 312 If a dataId is supplied in a config block. This is in place for 316 for label, taskIR
in self.
_pipelineIR.tasks.items():
318 taskName = taskClass.__qualname__
319 config = taskClass.ConfigClass()
322 overrides.addInstrumentOverride(self.
_pipelineIR.instrument, taskClass._DefaultName)
323 if taskIR.config
is not None:
324 for configIR
in taskIR.config:
325 if configIR.dataId
is not None:
326 raise NotImplementedError(
"Specializing a config on a partial data id is not yet " 327 "supported in Pipeline definition")
329 if configIR.dataId
is None:
331 for configFile
in configIR.file:
332 overrides.addFileOverride(configFile)
333 if configIR.python
is not None:
334 overrides.addPythonOverride(configIR.python)
335 for key, value
in configIR.rest.items():
336 overrides.addValueOverride(key, value)
337 overrides.applyTo(config)
340 taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
344 label_to_config = {x.label: x.config
for x
in taskDefs}
348 success = eval(contract.contract,
None, label_to_config)
350 extra_info = f
": {contract.msg}" if contract.msg
is not None else "" 352 f
"satisfied{extra_info}")
354 yield from pipeTools.orderPipeline(taskDefs)
360 if not isinstance(other, Pipeline):
365 @dataclass(frozen=
True)
367 """An immutable struct that extracts and classifies the dataset types used 371 initInputs: FrozenSet[DatasetType]
372 """Dataset types that are needed as inputs in order to construct this Task. 374 Task-level `initInputs` may be classified as either 375 `~PipelineDatasetTypes.initInputs` or 376 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 379 initOutputs: FrozenSet[DatasetType]
380 """Dataset types that may be written after constructing this Task. 382 Task-level `initOutputs` may be classified as either 383 `~PipelineDatasetTypes.initOutputs` or 384 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 387 inputs: FrozenSet[DatasetType]
388 """Dataset types that are regular inputs to this Task. 390 If an input dataset needed for a Quantum cannot be found in the input 391 collection(s) or produced by another Task in the Pipeline, that Quantum 392 (and all dependent Quanta) will not be produced. 394 Task-level `inputs` may be classified as either 395 `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates` 396 at the Pipeline level. 399 prerequisites: FrozenSet[DatasetType]
400 """Dataset types that are prerequisite inputs to this Task. 402 Prerequisite inputs must exist in the input collection(s) before the 403 pipeline is run, but do not constrain the graph - if a prerequisite is 404 missing for a Quantum, `PrerequisiteMissingError` is raised. 406 Prerequisite inputs are not resolved until the second stage of 407 QuantumGraph generation. 410 outputs: FrozenSet[DatasetType]
411 """Dataset types that are produced by this Task. 413 Task-level `outputs` may be classified as either 414 `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates` 415 at the Pipeline level. 419 def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
420 """Extract and classify the dataset types from a single `PipelineTask`. 425 An instance of a `TaskDef` class for a particular `PipelineTask`. 427 Registry used to construct normalized `DatasetType` objects and 428 retrieve those that are incomplete. 432 types: `TaskDatasetTypes` 433 The dataset types used by this task. 435 def makeDatasetTypesSet(connectionType):
436 """Constructs a set of true `DatasetType` objects 440 connectionType : `str` 441 Name of the connection type to produce a set for, corresponds 442 to an attribute of type `list` on the connection class instance 446 datasetTypes : `frozenset` 447 A set of all datasetTypes which correspond to the input 448 connection type specified in the connection class of this 453 This function is a closure over the variables ``registry`` and 458 dimensions =
set(getattr(c,
'dimensions',
set()))
459 if "skypix" in dimensions:
461 datasetType = registry.getDatasetType(c.name)
462 except LookupError
as err:
464 f
"DatasetType '{c.name}' referenced by " 465 f
"{type(taskDef.connections).__name__} uses 'skypix' as a dimension " 466 f
"placeholder, but does not already exist in the registry. " 467 f
"Note that reference catalog names are now used as the dataset " 468 f
"type name instead of 'ref_cat'." 470 rest1 =
set(registry.dimensions.extract(dimensions -
set([
"skypix"])).names)
471 rest2 =
set(dim.name
for dim
in datasetType.dimensions
472 if not isinstance(dim, SkyPixDimension))
474 raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in " 475 f
"connections ({rest1}) are inconsistent with those in " 476 f
"registry's version of this dataset ({rest2}).")
478 datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
480 datasetTypes.append(datasetType)
481 return frozenset(datasetTypes)
484 outputs = makeDatasetTypesSet(
"outputs")
485 if taskDef.metadataDatasetName
is not None:
488 dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
489 outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions,
"PropertyList")}
492 initInputs=makeDatasetTypesSet(
"initInputs"),
493 initOutputs=makeDatasetTypesSet(
"initOutputs"),
494 inputs=makeDatasetTypesSet(
"inputs"),
495 prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
500 @dataclass(frozen=
True)
502 """An immutable struct that classifies the dataset types used in a 506 initInputs: FrozenSet[DatasetType]
507 """Dataset types that are needed as inputs in order to construct the Tasks 510 This does not include dataset types that are produced when constructing 511 other Tasks in the Pipeline (these are classified as `initIntermediates`). 514 initOutputs: FrozenSet[DatasetType]
515 """Dataset types that may be written after constructing the Tasks in this 518 This does not include dataset types that are also used as inputs when 519 constructing other Tasks in the Pipeline (these are classified as 520 `initIntermediates`). 523 initIntermediates: FrozenSet[DatasetType]
524 """Dataset types that are both used when constructing one or more Tasks 525 in the Pipeline and produced as a side-effect of constructing another 526 Task in the Pipeline. 529 inputs: FrozenSet[DatasetType]
530 """Dataset types that are regular inputs for the full pipeline. 532 If an input dataset needed for a Quantum cannot be found in the input 533 collection(s), that Quantum (and all dependent Quanta) will not be 537 prerequisites: FrozenSet[DatasetType]
538 """Dataset types that are prerequisite inputs for the full Pipeline. 540 Prerequisite inputs must exist in the input collection(s) before the 541 pipeline is run, but do not constrain the graph - if a prerequisite is 542 missing for a Quantum, `PrerequisiteMissingError` is raised. 544 Prerequisite inputs are not resolved until the second stage of 545 QuantumGraph generation. 548 intermediates: FrozenSet[DatasetType]
549 """Dataset types that are output by one Task in the Pipeline and consumed 550 as inputs by one or more other Tasks in the Pipeline. 553 outputs: FrozenSet[DatasetType]
554 """Dataset types that are output by a Task in the Pipeline and not consumed 555 by any other Task in the Pipeline. 558 byTask: Mapping[str, TaskDatasetTypes]
559 """Per-Task dataset types, keyed by label in the `Pipeline`. 561 This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming 562 neither has been modified since the dataset types were extracted, of 567 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
568 """Extract and classify the dataset types from all tasks in a 574 An ordered collection of tasks that can be run together. 576 Registry used to construct normalized `DatasetType` objects and 577 retrieve those that are incomplete. 581 types: `PipelineDatasetTypes` 582 The dataset types used by this `Pipeline`. 587 Raised if Tasks are inconsistent about which datasets are marked 588 prerequisite. This indicates that the Tasks cannot be run as part 589 of the same `Pipeline`. 593 allInitInputs =
set()
594 allInitOutputs =
set()
595 prerequisites =
set()
597 if isinstance(pipeline, Pipeline):
598 pipeline = pipeline.toExpandedPipeline()
599 for taskDef
in pipeline:
600 thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
601 allInitInputs.update(thisTask.initInputs)
602 allInitOutputs.update(thisTask.initOutputs)
603 allInputs.update(thisTask.inputs)
604 prerequisites.update(thisTask.prerequisites)
605 allOutputs.update(thisTask.outputs)
606 byTask[taskDef.label] = thisTask
607 if not prerequisites.isdisjoint(allInputs):
608 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
609 {dt.name
for dt
in allInputs & prerequisites}
611 if not prerequisites.isdisjoint(allOutputs):
612 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
613 {dt.name
for dt
in allOutputs & prerequisites}
618 intermediateComponents =
set()
619 intermediateComposites =
set()
620 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
621 for dsType
in allInputs:
623 name, component = dsType.nameAndComponent()
627 if component
is not None:
628 if name
in outputNameMapping
and outputNameMapping[name].dimensions == dsType.dimensions:
629 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
630 universe=registry.dimensions)
631 intermediateComponents.add(dsType)
632 intermediateComposites.add(composite)
634 initInputs=frozenset(allInitInputs - allInitOutputs),
635 initIntermediates=frozenset(allInitInputs & allInitOutputs),
636 initOutputs=frozenset(allInitOutputs - allInitInputs),
637 inputs=frozenset(allInputs - allOutputs - intermediateComponents),
638 intermediates=frozenset(allInputs & allOutputs | intermediateComponents),
639 outputs=frozenset(allOutputs - allInputs - intermediateComposites),
640 prerequisites=frozenset(prerequisites),
641 byTask=MappingProxyType(byTask),
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
daf::base::PropertySet * set
def metadataDatasetName(self)
def __init__(self, taskName, config, taskClass=None, label="")
def toExpandedPipeline(self)