21 from __future__
import annotations
23 """Module defining Pipeline class and related methods.
26 __all__ = [
"Pipeline",
"TaskDef",
"TaskDatasetTypes",
"PipelineDatasetTypes"]
31 from dataclasses
import dataclass
32 from types
import MappingProxyType
33 from typing
import Mapping, Union, Generator, TYPE_CHECKING
39 from lsst.daf.butler
import DatasetType, NamedValueSet, Registry, SkyPixDimension
41 from .configOverrides
import ConfigOverrides
42 from .connections
import iterConnections
43 from .pipelineTask
import PipelineTask
45 from .
import pipelineIR
46 from .
import pipeTools
49 from lsst.obs.base.instrument
import Instrument
61 """TaskDef is a collection of information about task needed by Pipeline.
63 The information includes task name, configuration object and optional
64 task class. This class is just a collection of attributes and it exposes
65 all of them so that attributes could potentially be modified in place
66 (e.g. if configuration needs extra overrides).
71 `PipelineTask` class name, currently it is not specified whether this
72 is a fully-qualified name or partial name (e.g. ``module.TaskClass``).
73 Framework should be prepared to handle all cases.
74 config : `lsst.pex.config.Config`
75 Instance of the configuration class corresponding to this task class,
76 usually with all overrides applied.
77 taskClass : `type` or ``None``
78 `PipelineTask` class object, can be ``None``. If ``None`` then
79 framework will have to locate and load class.
80 label : `str`, optional
81 Task label, usually a short string unique in a pipeline.
83 def __init__(self, taskName, config, taskClass=None, label=""):
88 self.
connections = config.connections.ConnectionsClass(config=config)
92 """Name of a dataset type for configuration of this task (`str`)
94 return self.
label +
"_config"
98 """Name of a dataset type for metadata of this task, `None` if
99 metadata is not to be saved (`str`)
101 if self.
config.saveMetadata:
102 return self.
label +
"_metadata"
109 rep +=
", label=" + self.
label
115 """A `Pipeline` is a representation of a series of tasks to run, and the
116 configuration for those tasks.
121 A description of that this pipeline does.
124 pipeline_dict = {
"description": description,
"tasks": {}}
129 """Load a pipeline defined in a pipeline yaml file.
134 A path that points to a pipeline defined in yaml format
140 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_file(filename))
145 """Create a pipeline from string formatted as a pipeline document.
149 pipeline_string : `str`
150 A string that is formatted according like a pipeline document
156 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
160 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
161 """Create a pipeline from an already created `PipelineIR` object.
165 deserialized_pipeline: `PipelineIR`
166 An already created pipeline intermediate representation object
172 pipeline = cls.__new__(cls)
173 pipeline._pipelineIR = deserialized_pipeline
178 """Create a new pipeline by copying an already existing `Pipeline`.
183 An already created pipeline intermediate representation object
189 return cls.
fromIR(copy.deep_copy(pipeline._pipelineIR))
195 """Add an instrument to the pipeline, or replace an instrument that is
200 instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
201 Either a derived class object of a `lsst.daf.butler.instrument` or a
202 string corresponding to a fully qualified
203 `lsst.daf.butler.instrument` name.
205 if isinstance(instrument, str):
209 instrument = f
"{instrument.__module__}.{instrument.__qualname__}"
212 def addTask(self, task: Union[PipelineTask, str], label: str):
213 """Add a new task to the pipeline, or replace a task that is already
214 associated with the supplied label.
218 task: `PipelineTask` or `str`
219 Either a derived class object of a `PipelineTask` or a string
220 corresponding to a fully qualified `PipelineTask` name.
222 A label that is used to identify the `PipelineTask` being added
224 if isinstance(task, str):
226 elif issubclass(task, PipelineTask):
227 taskName = f
"{task.__module__}.{task.__qualname__}"
229 raise ValueError(
"task must be either a child class of PipelineTask or a string containing"
230 " a fully qualified name to one")
235 if isinstance(task, str):
237 label = task._DefaultName
241 """Remove a task from the pipeline.
246 The label used to identify the task that is to be removed
251 If no task with that label exists in the pipeline
257 """Apply single config override.
264 Fully-qualified field name.
266 Value to be given to a field.
271 """Add overrides from a specified file.
276 The label used to identify the task associated with config to
279 Path to the override file.
284 """Add Overrides by running a snippet of python code against a config.
289 The label used to identity the task associated with config to
292 A string which is valid python code to be executed. This is done
293 with config as the only local accessible value.
299 raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
300 self.
_pipelineIR.tasks[label].add_or_update_config(newConfig)
306 """Returns a generator of TaskDefs which can be used to create quantum
311 generator : generator of `TaskDef`
312 The generator returned will be the sorted iterator of tasks which
313 are to be used in constructing a quantum graph.
318 If a dataId is supplied in a config block. This is in place for
322 for label, taskIR
in self.
_pipelineIR.tasks.items():
324 taskName = taskClass.__qualname__
325 config = taskClass.ConfigClass()
328 overrides.addInstrumentOverride(self.
_pipelineIR.instrument, taskClass._DefaultName)
329 if taskIR.config
is not None:
330 for configIR
in taskIR.config:
331 if configIR.dataId
is not None:
332 raise NotImplementedError(
"Specializing a config on a partial data id is not yet "
333 "supported in Pipeline definition")
335 if configIR.dataId
is None:
337 for configFile
in configIR.file:
338 overrides.addFileOverride(configFile)
339 if configIR.python
is not None:
340 overrides.addPythonOverride(configIR.python)
341 for key, value
in configIR.rest.items():
342 overrides.addValueOverride(key, value)
343 overrides.applyTo(config)
346 taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
350 label_to_config = {x.label: x.config
for x
in taskDefs}
354 success = eval(contract.contract,
None, label_to_config)
356 extra_info = f
": {contract.msg}" if contract.msg
is not None else ""
358 f
"satisfied{extra_info}")
360 yield from pipeTools.orderPipeline(taskDefs)
366 if not isinstance(other, Pipeline):
371 @dataclass(frozen=
True)
373 """An immutable struct that extracts and classifies the dataset types used
377 initInputs: NamedValueSet[DatasetType]
378 """Dataset types that are needed as inputs in order to construct this Task.
380 Task-level `initInputs` may be classified as either
381 `~PipelineDatasetTypes.initInputs` or
382 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
385 initOutputs: NamedValueSet[DatasetType]
386 """Dataset types that may be written after constructing this Task.
388 Task-level `initOutputs` may be classified as either
389 `~PipelineDatasetTypes.initOutputs` or
390 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
393 inputs: NamedValueSet[DatasetType]
394 """Dataset types that are regular inputs to this Task.
396 If an input dataset needed for a Quantum cannot be found in the input
397 collection(s) or produced by another Task in the Pipeline, that Quantum
398 (and all dependent Quanta) will not be produced.
400 Task-level `inputs` may be classified as either
401 `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates`
402 at the Pipeline level.
405 prerequisites: NamedValueSet[DatasetType]
406 """Dataset types that are prerequisite inputs to this Task.
408 Prerequisite inputs must exist in the input collection(s) before the
409 pipeline is run, but do not constrain the graph - if a prerequisite is
410 missing for a Quantum, `PrerequisiteMissingError` is raised.
412 Prerequisite inputs are not resolved until the second stage of
413 QuantumGraph generation.
416 outputs: NamedValueSet[DatasetType]
417 """Dataset types that are produced by this Task.
419 Task-level `outputs` may be classified as either
420 `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates`
421 at the Pipeline level.
425 def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
426 """Extract and classify the dataset types from a single `PipelineTask`.
431 An instance of a `TaskDef` class for a particular `PipelineTask`.
433 Registry used to construct normalized `DatasetType` objects and
434 retrieve those that are incomplete.
438 types: `TaskDatasetTypes`
439 The dataset types used by this task.
441 def makeDatasetTypesSet(connectionType, freeze=True):
442 """Constructs a set of true `DatasetType` objects
446 connectionType : `str`
447 Name of the connection type to produce a set for, corresponds
448 to an attribute of type `list` on the connection class instance
449 freeze : `bool`, optional
450 If `True`, call `NamedValueSet.freeze` on the object returned.
454 datasetTypes : `NamedValueSet`
455 A set of all datasetTypes which correspond to the input
456 connection type specified in the connection class of this
461 This function is a closure over the variables ``registry`` and
464 datasetTypes = NamedValueSet()
466 dimensions =
set(getattr(c,
'dimensions',
set()))
467 if "skypix" in dimensions:
469 datasetType = registry.getDatasetType(c.name)
470 except LookupError
as err:
472 f
"DatasetType '{c.name}' referenced by "
473 f
"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
474 f
"placeholder, but does not already exist in the registry. "
475 f
"Note that reference catalog names are now used as the dataset "
476 f
"type name instead of 'ref_cat'."
478 rest1 =
set(registry.dimensions.extract(dimensions -
set([
"skypix"])).names)
479 rest2 =
set(dim.name
for dim
in datasetType.dimensions
480 if not isinstance(dim, SkyPixDimension))
482 raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in "
483 f
"connections ({rest1}) are inconsistent with those in "
484 f
"registry's version of this dataset ({rest2}).")
486 datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
489 registryDatasetType = registry.getDatasetType(c.name)
491 registryDatasetType = datasetType
492 if datasetType != registryDatasetType:
493 raise ValueError(f
"Supplied dataset type ({datasetType}) inconsistent with "
494 f
"registry definition ({registryDatasetType})")
495 datasetTypes.add(datasetType)
497 datasetTypes.freeze()
501 outputs = makeDatasetTypesSet(
"outputs", freeze=
False)
502 if taskDef.metadataDatasetName
is not None:
505 dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
506 outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions,
"PropertyList")}
510 initInputs=makeDatasetTypesSet(
"initInputs"),
511 initOutputs=makeDatasetTypesSet(
"initOutputs"),
512 inputs=makeDatasetTypesSet(
"inputs"),
513 prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
518 @dataclass(frozen=
True)
520 """An immutable struct that classifies the dataset types used in a
524 initInputs: NamedValueSet[DatasetType]
525 """Dataset types that are needed as inputs in order to construct the Tasks
528 This does not include dataset types that are produced when constructing
529 other Tasks in the Pipeline (these are classified as `initIntermediates`).
532 initOutputs: NamedValueSet[DatasetType]
533 """Dataset types that may be written after constructing the Tasks in this
536 This does not include dataset types that are also used as inputs when
537 constructing other Tasks in the Pipeline (these are classified as
538 `initIntermediates`).
541 initIntermediates: NamedValueSet[DatasetType]
542 """Dataset types that are both used when constructing one or more Tasks
543 in the Pipeline and produced as a side-effect of constructing another
544 Task in the Pipeline.
547 inputs: NamedValueSet[DatasetType]
548 """Dataset types that are regular inputs for the full pipeline.
550 If an input dataset needed for a Quantum cannot be found in the input
551 collection(s), that Quantum (and all dependent Quanta) will not be
555 prerequisites: NamedValueSet[DatasetType]
556 """Dataset types that are prerequisite inputs for the full Pipeline.
558 Prerequisite inputs must exist in the input collection(s) before the
559 pipeline is run, but do not constrain the graph - if a prerequisite is
560 missing for a Quantum, `PrerequisiteMissingError` is raised.
562 Prerequisite inputs are not resolved until the second stage of
563 QuantumGraph generation.
566 intermediates: NamedValueSet[DatasetType]
567 """Dataset types that are output by one Task in the Pipeline and consumed
568 as inputs by one or more other Tasks in the Pipeline.
571 outputs: NamedValueSet[DatasetType]
572 """Dataset types that are output by a Task in the Pipeline and not consumed
573 by any other Task in the Pipeline.
576 byTask: Mapping[str, TaskDatasetTypes]
577 """Per-Task dataset types, keyed by label in the `Pipeline`.
579 This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming
580 neither has been modified since the dataset types were extracted, of
585 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
586 """Extract and classify the dataset types from all tasks in a
592 An ordered collection of tasks that can be run together.
594 Registry used to construct normalized `DatasetType` objects and
595 retrieve those that are incomplete.
599 types: `PipelineDatasetTypes`
600 The dataset types used by this `Pipeline`.
605 Raised if Tasks are inconsistent about which datasets are marked
606 prerequisite. This indicates that the Tasks cannot be run as part
607 of the same `Pipeline`.
609 allInputs = NamedValueSet()
610 allOutputs = NamedValueSet()
611 allInitInputs = NamedValueSet()
612 allInitOutputs = NamedValueSet()
613 prerequisites = NamedValueSet()
615 if isinstance(pipeline, Pipeline):
616 pipeline = pipeline.toExpandedPipeline()
617 for taskDef
in pipeline:
618 thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
619 allInitInputs |= thisTask.initInputs
620 allInitOutputs |= thisTask.initOutputs
621 allInputs |= thisTask.inputs
622 prerequisites |= thisTask.prerequisites
623 allOutputs |= thisTask.outputs
624 byTask[taskDef.label] = thisTask
625 if not prerequisites.isdisjoint(allInputs):
626 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
627 {dt.name
for dt
in allInputs & prerequisites}
629 if not prerequisites.isdisjoint(allOutputs):
630 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
631 {dt.name
for dt
in allOutputs & prerequisites}
636 intermediateComponents = NamedValueSet()
637 intermediateComposites = NamedValueSet()
638 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
639 for dsType
in allInputs:
641 name, component = dsType.nameAndComponent()
645 if component
is not None:
646 if name
in outputNameMapping:
647 if outputNameMapping[name].dimensions != dsType.dimensions:
648 raise ValueError(f
"Component dataset type {dsType.name} has different "
649 f
"dimensions ({dsType.dimensions}) than its parent "
650 f
"({outputNameMapping[name].dimensions}).")
651 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
652 universe=registry.dimensions)
653 intermediateComponents.add(dsType)
654 intermediateComposites.add(composite)
656 def checkConsistency(a: NamedValueSet, b: NamedValueSet):
657 common = a.names & b.names
659 if a[name] != b[name]:
660 raise ValueError(f
"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
662 checkConsistency(allInitInputs, allInitOutputs)
663 checkConsistency(allInputs, allOutputs)
664 checkConsistency(allInputs, intermediateComposites)
665 checkConsistency(allOutputs, intermediateComposites)
667 def frozen(s: NamedValueSet) -> NamedValueSet:
672 initInputs=
frozen(allInitInputs - allInitOutputs),
673 initIntermediates=
frozen(allInitInputs & allInitOutputs),
674 initOutputs=
frozen(allInitOutputs - allInitInputs),
675 inputs=
frozen(allInputs - allOutputs - intermediateComponents),
676 intermediates=
frozen(allInputs & allOutputs | intermediateComponents),
677 outputs=
frozen(allOutputs - allInputs - intermediateComposites),
678 prerequisites=
frozen(prerequisites),
679 byTask=MappingProxyType(byTask),