21 from __future__
import annotations
23 """Module defining Pipeline class and related methods.
26 __all__ = [
"Pipeline",
"TaskDef",
"TaskDatasetTypes",
"PipelineDatasetTypes",
"LabelSpecifier"]
31 from dataclasses
import dataclass
32 from types
import MappingProxyType
33 from typing
import Mapping, Set, Union, Generator, TYPE_CHECKING, Optional
40 from lsst.daf.butler
import DatasetType, NamedValueSet, Registry, SkyPixDimension
42 from .configOverrides
import ConfigOverrides
43 from .connections
import iterConnections
44 from .pipelineTask
import PipelineTask
46 from .
import pipelineIR
47 from .
import pipeTools
50 from lsst.obs.base.instrument
import Instrument
63 """A structure to specify a subset of labels to load
65 This structure may contain a set of labels to be used in subsetting a
66 pipeline, or a beginning and end point. Beginning or end may be empty,
67 in which case the range will be a half open interval. Unlike python
68 iteration bounds, end bounds are *INCLUDED*. Note that range based
69 selection is not well defined for pipelines that are not linear in nature,
70 and correct behavior is not guaranteed, or may vary from run to run.
72 labels: Optional[Set[str]] =
None
73 begin: Optional[str] =
None
74 end: Optional[str] =
None
77 if self.labels
is not None and (self.begin
or self.end):
78 raise ValueError(
"This struct can only be initialized with a labels set or "
79 "a begin (and/or) end specifier")
83 """TaskDef is a collection of information about task needed by Pipeline.
85 The information includes task name, configuration object and optional
86 task class. This class is just a collection of attributes and it exposes
87 all of them so that attributes could potentially be modified in place
88 (e.g. if configuration needs extra overrides).
93 `PipelineTask` class name, currently it is not specified whether this
94 is a fully-qualified name or partial name (e.g. ``module.TaskClass``).
95 Framework should be prepared to handle all cases.
96 config : `lsst.pex.config.Config`
97 Instance of the configuration class corresponding to this task class,
98 usually with all overrides applied. This config will be frozen.
99 taskClass : `type` or ``None``
100 `PipelineTask` class object, can be ``None``. If ``None`` then
101 framework will have to locate and load class.
102 label : `str`, optional
103 Task label, usually a short string unique in a pipeline.
105 def __init__(self, taskName, config, taskClass=None, label=""):
111 self.
connections = config.connections.ConnectionsClass(config=config)
115 """Name of a dataset type for configuration of this task (`str`)
117 return self.
label +
"_config"
121 """Name of a dataset type for metadata of this task, `None` if
122 metadata is not to be saved (`str`)
124 if self.
config.saveMetadata:
125 return self.
label +
"_metadata"
132 rep +=
", label=" + self.
label
137 if not isinstance(other, TaskDef):
139 return self.
config == other.config
and\
141 self.
label == other.label
148 """A `Pipeline` is a representation of a series of tasks to run, and the
149 configuration for those tasks.
154 A description of that this pipeline does.
157 pipeline_dict = {
"description": description,
"tasks": {}}
162 """Load a pipeline defined in a pipeline yaml file.
167 A path that points to a pipeline defined in yaml format. This
168 filename may also supply additional labels to be used in
169 subsetting the loaded Pipeline. These labels are separated from
170 the path by a colon, and may be specified as a comma separated
171 list, or a range denoted as beginning..end. Beginning or end may
172 be empty, in which case the range will be a half open interval.
173 Unlike python iteration bounds, end bounds are *INCLUDED*. Note
174 that range based selection is not well defined for pipelines that
175 are not linear in nature, and correct behavior is not guaranteed,
176 or may vary from run to run.
181 The pipeline loaded from specified location with appropriate (if
186 This method attempts to prune any contracts that contain labels which
187 are not in the declared subset of labels. This pruning is done using a
188 string based matching due to the nature of contracts and may prune more
193 pipeline: Pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_file(filename))
196 if labelSpecifier
is not None:
197 pipeline = pipeline.subsetFromLabels(labelSpecifier)
201 """Subset a pipeline to contain only labels specified in labelSpecifier
205 labelSpecifier : `labelSpecifier`
206 Object containing labels that describes how to subset a pipeline.
210 pipeline : `Pipeline`
211 A new pipeline object that is a subset of the old pipeline
216 Raised if there is an issue with specified labels
220 This method attempts to prune any contracts that contain labels which
221 are not in the declared subset of labels. This pruning is done using a
222 string based matching due to the nature of contracts and may prune more
226 if labelSpecifier.labels:
227 labelSet = labelSpecifier.labels
238 pipeline = copy.deepcopy(self)
239 pipeline._pipelineIR.contracts = []
240 labels = {taskdef.label:
True for taskdef
in pipeline.toExpandedPipeline()}
243 if labelSpecifier.begin
is not None:
244 if labelSpecifier.begin
not in labels:
245 raise ValueError(f
"Beginning of range subset, {labelSpecifier.begin}, not found in "
246 "pipeline definition")
247 if labelSpecifier.end
is not None:
248 if labelSpecifier.end
not in labels:
249 raise ValueError(f
"End of range subset, {labelSpecifier.end}, not found in pipeline "
254 if labelSpecifier.begin
is not None:
255 if label != labelSpecifier.begin:
258 labelSpecifier.begin =
None
260 if labelSpecifier.end
is not None and label == labelSpecifier.end:
262 return Pipeline.fromIR(self.
_pipelineIR.subset_from_labels(labelSet))
265 def _parseFileSpecifier(fileSpecifer):
266 """Split appart a filename path from label subsets
268 split = fileSpecifer.split(
':')
271 return fileSpecifer,
None
274 raise ValueError(
"Only one : is allowed when specifying a pipeline to load")
278 filename, labelSubset = split[0], split[1]
280 if ',' in labelSubset:
281 if '..' in labelSubset:
282 raise ValueError(
"Can only specify a list of labels or a range"
283 "when loading a Pipline not both")
284 labels =
set(labelSubset.split(
","))
287 elif '..' in labelSubset:
291 begin, end = labelSubset.split(
"..")
293 raise ValueError(
"Only one range can be specified when loading a pipeline")
294 specifier =
LabelSpecifier(begin=begin
if begin
else None, end=end
if end
else None)
297 labels = {labelSubset}
300 return filename, specifier
304 """Create a pipeline from string formatted as a pipeline document.
308 pipeline_string : `str`
309 A string that is formatted according like a pipeline document
315 pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
319 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
320 """Create a pipeline from an already created `PipelineIR` object.
324 deserialized_pipeline: `PipelineIR`
325 An already created pipeline intermediate representation object
331 pipeline = cls.__new__(cls)
332 pipeline._pipelineIR = deserialized_pipeline
337 """Create a new pipeline by copying an already existing `Pipeline`.
342 An already created pipeline intermediate representation object
348 return cls.
fromIR(copy.deep_copy(pipeline._pipelineIR))
354 """Add an instrument to the pipeline, or replace an instrument that is
359 instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
360 Either a derived class object of a `lsst.daf.butler.instrument` or
361 a string corresponding to a fully qualified
362 `lsst.daf.butler.instrument` name.
364 if isinstance(instrument, str):
369 instrument = f
"{instrument.__module__}.{instrument.__qualname__}"
373 """Get the instrument from the pipeline.
377 instrument : `~lsst.daf.butler.instrument.Instrument`, `str`, or None
378 A derived class object of a `lsst.daf.butler.instrument`, a string
379 corresponding to a fully qualified `lsst.daf.butler.instrument`
380 name, or None if the pipeline does not have an instrument.
384 def addTask(self, task: Union[PipelineTask, str], label: str):
385 """Add a new task to the pipeline, or replace a task that is already
386 associated with the supplied label.
390 task: `PipelineTask` or `str`
391 Either a derived class object of a `PipelineTask` or a string
392 corresponding to a fully qualified `PipelineTask` name.
394 A label that is used to identify the `PipelineTask` being added
396 if isinstance(task, str):
398 elif issubclass(task, PipelineTask):
399 taskName = f
"{task.__module__}.{task.__qualname__}"
401 raise ValueError(
"task must be either a child class of PipelineTask or a string containing"
402 " a fully qualified name to one")
407 if isinstance(task, str):
409 label = task._DefaultName
413 """Remove a task from the pipeline.
418 The label used to identify the task that is to be removed
423 If no task with that label exists in the pipeline
429 """Apply single config override.
436 Fully-qualified field name.
438 Value to be given to a field.
443 """Add overrides from a specified file.
448 The label used to identify the task associated with config to
451 Path to the override file.
456 """Add Overrides by running a snippet of python code against a config.
461 The label used to identity the task associated with config to
464 A string which is valid python code to be executed. This is done
465 with config as the only local accessible value.
471 raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
472 self.
_pipelineIR.tasks[label].add_or_update_config(newConfig)
478 """Returns a generator of TaskDefs which can be used to create quantum
483 generator : generator of `TaskDef`
484 The generator returned will be the sorted iterator of tasks which
485 are to be used in constructing a quantum graph.
490 If a dataId is supplied in a config block. This is in place for
494 for label, taskIR
in self.
_pipelineIR.tasks.items():
496 taskName = taskClass.__qualname__
497 config = taskClass.ConfigClass()
500 overrides.addInstrumentOverride(self.
_pipelineIR.instrument, taskClass._DefaultName)
501 if taskIR.config
is not None:
502 for configIR
in taskIR.config:
503 if configIR.dataId
is not None:
504 raise NotImplementedError(
"Specializing a config on a partial data id is not yet "
505 "supported in Pipeline definition")
507 if configIR.dataId
is None:
509 for configFile
in configIR.file:
510 overrides.addFileOverride(os.path.expandvars(configFile))
511 if configIR.python
is not None:
512 overrides.addPythonOverride(configIR.python)
513 for key, value
in configIR.rest.items():
514 overrides.addValueOverride(key, value)
515 overrides.applyTo(config)
518 taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
522 label_to_config = {x.label: x.config
for x
in taskDefs}
526 success = eval(contract.contract,
None, label_to_config)
528 extra_info = f
": {contract.msg}" if contract.msg
is not None else ""
530 f
"satisfied{extra_info}")
532 yield from pipeTools.orderPipeline(taskDefs)
538 if not isinstance(other, Pipeline):
543 @dataclass(frozen=
True)
545 """An immutable struct that extracts and classifies the dataset types used
549 initInputs: NamedValueSet[DatasetType]
550 """Dataset types that are needed as inputs in order to construct this Task.
552 Task-level `initInputs` may be classified as either
553 `~PipelineDatasetTypes.initInputs` or
554 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
557 initOutputs: NamedValueSet[DatasetType]
558 """Dataset types that may be written after constructing this Task.
560 Task-level `initOutputs` may be classified as either
561 `~PipelineDatasetTypes.initOutputs` or
562 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
565 inputs: NamedValueSet[DatasetType]
566 """Dataset types that are regular inputs to this Task.
568 If an input dataset needed for a Quantum cannot be found in the input
569 collection(s) or produced by another Task in the Pipeline, that Quantum
570 (and all dependent Quanta) will not be produced.
572 Task-level `inputs` may be classified as either
573 `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates`
574 at the Pipeline level.
577 prerequisites: NamedValueSet[DatasetType]
578 """Dataset types that are prerequisite inputs to this Task.
580 Prerequisite inputs must exist in the input collection(s) before the
581 pipeline is run, but do not constrain the graph - if a prerequisite is
582 missing for a Quantum, `PrerequisiteMissingError` is raised.
584 Prerequisite inputs are not resolved until the second stage of
585 QuantumGraph generation.
588 outputs: NamedValueSet[DatasetType]
589 """Dataset types that are produced by this Task.
591 Task-level `outputs` may be classified as either
592 `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates`
593 at the Pipeline level.
597 def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
598 """Extract and classify the dataset types from a single `PipelineTask`.
603 An instance of a `TaskDef` class for a particular `PipelineTask`.
605 Registry used to construct normalized `DatasetType` objects and
606 retrieve those that are incomplete.
610 types: `TaskDatasetTypes`
611 The dataset types used by this task.
613 def makeDatasetTypesSet(connectionType, freeze=True):
614 """Constructs a set of true `DatasetType` objects
618 connectionType : `str`
619 Name of the connection type to produce a set for, corresponds
620 to an attribute of type `list` on the connection class instance
621 freeze : `bool`, optional
622 If `True`, call `NamedValueSet.freeze` on the object returned.
626 datasetTypes : `NamedValueSet`
627 A set of all datasetTypes which correspond to the input
628 connection type specified in the connection class of this
633 This function is a closure over the variables ``registry`` and
636 datasetTypes = NamedValueSet()
638 dimensions =
set(getattr(c,
'dimensions',
set()))
639 if "skypix" in dimensions:
641 datasetType = registry.getDatasetType(c.name)
642 except LookupError
as err:
644 f
"DatasetType '{c.name}' referenced by "
645 f
"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
646 f
"placeholder, but does not already exist in the registry. "
647 f
"Note that reference catalog names are now used as the dataset "
648 f
"type name instead of 'ref_cat'."
650 rest1 =
set(registry.dimensions.extract(dimensions -
set([
"skypix"])).names)
651 rest2 =
set(dim.name
for dim
in datasetType.dimensions
652 if not isinstance(dim, SkyPixDimension))
654 raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in "
655 f
"connections ({rest1}) are inconsistent with those in "
656 f
"registry's version of this dataset ({rest2}).")
662 registryDatasetType =
None
664 registryDatasetType = registry.getDatasetType(c.name)
666 compositeName, componentName = DatasetType.splitDatasetTypeName(c.name)
667 parentStorageClass = DatasetType.PlaceholderParentStorageClass \
668 if componentName
else None
669 datasetType = c.makeDatasetType(
671 parentStorageClass=parentStorageClass
673 registryDatasetType = datasetType
675 datasetType = c.makeDatasetType(
677 parentStorageClass=registryDatasetType.parentStorageClass
680 if registryDatasetType
and datasetType != registryDatasetType:
681 raise ValueError(f
"Supplied dataset type ({datasetType}) inconsistent with "
682 f
"registry definition ({registryDatasetType}) "
683 f
"for {taskDef.label}.")
684 datasetTypes.add(datasetType)
686 datasetTypes.freeze()
690 outputs = makeDatasetTypesSet(
"outputs", freeze=
False)
691 if taskDef.metadataDatasetName
is not None:
694 dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
695 outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions,
"PropertySet")}
699 initInputs=makeDatasetTypesSet(
"initInputs"),
700 initOutputs=makeDatasetTypesSet(
"initOutputs"),
701 inputs=makeDatasetTypesSet(
"inputs"),
702 prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
707 @dataclass(frozen=
True)
709 """An immutable struct that classifies the dataset types used in a
713 initInputs: NamedValueSet[DatasetType]
714 """Dataset types that are needed as inputs in order to construct the Tasks
717 This does not include dataset types that are produced when constructing
718 other Tasks in the Pipeline (these are classified as `initIntermediates`).
721 initOutputs: NamedValueSet[DatasetType]
722 """Dataset types that may be written after constructing the Tasks in this
725 This does not include dataset types that are also used as inputs when
726 constructing other Tasks in the Pipeline (these are classified as
727 `initIntermediates`).
730 initIntermediates: NamedValueSet[DatasetType]
731 """Dataset types that are both used when constructing one or more Tasks
732 in the Pipeline and produced as a side-effect of constructing another
733 Task in the Pipeline.
736 inputs: NamedValueSet[DatasetType]
737 """Dataset types that are regular inputs for the full pipeline.
739 If an input dataset needed for a Quantum cannot be found in the input
740 collection(s), that Quantum (and all dependent Quanta) will not be
744 prerequisites: NamedValueSet[DatasetType]
745 """Dataset types that are prerequisite inputs for the full Pipeline.
747 Prerequisite inputs must exist in the input collection(s) before the
748 pipeline is run, but do not constrain the graph - if a prerequisite is
749 missing for a Quantum, `PrerequisiteMissingError` is raised.
751 Prerequisite inputs are not resolved until the second stage of
752 QuantumGraph generation.
755 intermediates: NamedValueSet[DatasetType]
756 """Dataset types that are output by one Task in the Pipeline and consumed
757 as inputs by one or more other Tasks in the Pipeline.
760 outputs: NamedValueSet[DatasetType]
761 """Dataset types that are output by a Task in the Pipeline and not consumed
762 by any other Task in the Pipeline.
765 byTask: Mapping[str, TaskDatasetTypes]
766 """Per-Task dataset types, keyed by label in the `Pipeline`.
768 This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming
769 neither has been modified since the dataset types were extracted, of
774 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
775 """Extract and classify the dataset types from all tasks in a
781 An ordered collection of tasks that can be run together.
783 Registry used to construct normalized `DatasetType` objects and
784 retrieve those that are incomplete.
788 types: `PipelineDatasetTypes`
789 The dataset types used by this `Pipeline`.
794 Raised if Tasks are inconsistent about which datasets are marked
795 prerequisite. This indicates that the Tasks cannot be run as part
796 of the same `Pipeline`.
798 allInputs = NamedValueSet()
799 allOutputs = NamedValueSet()
800 allInitInputs = NamedValueSet()
801 allInitOutputs = NamedValueSet()
802 prerequisites = NamedValueSet()
804 if isinstance(pipeline, Pipeline):
805 pipeline = pipeline.toExpandedPipeline()
806 for taskDef
in pipeline:
807 thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
808 allInitInputs |= thisTask.initInputs
809 allInitOutputs |= thisTask.initOutputs
810 allInputs |= thisTask.inputs
811 prerequisites |= thisTask.prerequisites
812 allOutputs |= thisTask.outputs
813 byTask[taskDef.label] = thisTask
814 if not prerequisites.isdisjoint(allInputs):
815 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
816 {dt.name
for dt
in allInputs & prerequisites}
818 if not prerequisites.isdisjoint(allOutputs):
819 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
820 {dt.name
for dt
in allOutputs & prerequisites}
825 intermediateComponents = NamedValueSet()
826 intermediateComposites = NamedValueSet()
827 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
828 for dsType
in allInputs:
830 name, component = dsType.nameAndComponent()
834 if component
is not None:
835 if name
in outputNameMapping:
836 if outputNameMapping[name].dimensions != dsType.dimensions:
837 raise ValueError(f
"Component dataset type {dsType.name} has different "
838 f
"dimensions ({dsType.dimensions}) than its parent "
839 f
"({outputNameMapping[name].dimensions}).")
840 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
841 universe=registry.dimensions)
842 intermediateComponents.add(dsType)
843 intermediateComposites.add(composite)
845 def checkConsistency(a: NamedValueSet, b: NamedValueSet):
846 common = a.names & b.names
848 if a[name] != b[name]:
849 raise ValueError(f
"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
851 checkConsistency(allInitInputs, allInitOutputs)
852 checkConsistency(allInputs, allOutputs)
853 checkConsistency(allInputs, intermediateComposites)
854 checkConsistency(allOutputs, intermediateComposites)
856 def frozen(s: NamedValueSet) -> NamedValueSet:
861 initInputs=
frozen(allInitInputs - allInitOutputs),
862 initIntermediates=
frozen(allInitInputs & allInitOutputs),
863 initOutputs=
frozen(allInitOutputs - allInitInputs),
864 inputs=
frozen(allInputs - allOutputs - intermediateComponents),
865 intermediates=
frozen(allInputs & allOutputs | intermediateComponents),
866 outputs=
frozen(allOutputs - allInputs - intermediateComposites),
867 prerequisites=
frozen(prerequisites),
868 byTask=MappingProxyType(byTask),