21 from __future__ 
import annotations
 
   23 """Module defining Pipeline class and related methods. 
   26 __all__ = [
"Pipeline", 
"TaskDef", 
"TaskDatasetTypes", 
"PipelineDatasetTypes"]
 
   31 from dataclasses 
import dataclass
 
   32 from types 
import MappingProxyType
 
   33 from typing 
import Mapping, Union, Generator, TYPE_CHECKING
 
   39 from lsst.daf.butler 
import DatasetType, NamedValueSet, Registry, SkyPixDimension
 
   41 from .configOverrides 
import ConfigOverrides
 
   42 from .connections 
import iterConnections
 
   43 from .pipelineTask 
import PipelineTask
 
   45 from . 
import pipelineIR
 
   46 from . 
import pipeTools
 
   49     from lsst.obs.base.instrument 
import Instrument
 
   61     """TaskDef is a collection of information about task needed by Pipeline. 
   63     The information includes task name, configuration object and optional 
   64     task class. This class is just a collection of attributes and it exposes 
   65     all of them so that attributes could potentially be modified in place 
   66     (e.g. if configuration needs extra overrides). 
   71         `PipelineTask` class name, currently it is not specified whether this 
   72         is a fully-qualified name or partial name (e.g. ``module.TaskClass``). 
   73         Framework should be prepared to handle all cases. 
   74     config : `lsst.pex.config.Config` 
   75         Instance of the configuration class corresponding to this task class, 
   76         usually with all overrides applied. 
   77     taskClass : `type` or ``None`` 
   78         `PipelineTask` class object, can be ``None``. If ``None`` then 
   79         framework will have to locate and load class. 
   80     label : `str`, optional 
   81         Task label, usually a short string unique in a pipeline. 
   83     def __init__(self, taskName, config, taskClass=None, label=""):
 
   88         self.
connections = config.connections.ConnectionsClass(config=config)
 
   92         """Name of a dataset type for configuration of this task (`str`) 
   94         return self.
label + 
"_config" 
   98         """Name of a dataset type for metadata of this task, `None` if 
   99         metadata is not to be saved (`str`) 
  101         if self.
config.saveMetadata:
 
  102             return self.
label + 
"_metadata" 
  109             rep += 
", label=" + self.
label 
  115     """A `Pipeline` is a representation of a series of tasks to run, and the 
  116     configuration for those tasks. 
  121         A description of that this pipeline does. 
  124         pipeline_dict = {
"description": description, 
"tasks": {}}
 
  129         """Load a pipeline defined in a pipeline yaml file. 
  134            A path that points to a pipeline defined in yaml format 
  140         pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_file(filename))
 
  145         """Create a pipeline from string formatted as a pipeline document. 
  149         pipeline_string : `str` 
  150             A string that is formatted according like a pipeline document 
  156         pipeline = cls.
fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
 
  160     def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
 
  161         """Create a pipeline from an already created `PipelineIR` object. 
  165         deserialized_pipeline: `PipelineIR` 
  166             An already created pipeline intermediate representation object 
  172         pipeline = cls.__new__(cls)
 
  173         pipeline._pipelineIR = deserialized_pipeline
 
  178         """Create a new pipeline by copying an already existing `Pipeline`. 
  183             An already created pipeline intermediate representation object 
  189         return cls.
fromIR(copy.deep_copy(pipeline._pipelineIR))
 
  195         """Add an instrument to the pipeline, or replace an instrument that is 
  200         instrument : `~lsst.daf.butler.instrument.Instrument` or `str` 
  201             Either a derived class object of a `lsst.daf.butler.instrument` or a 
  202             string corresponding to a fully qualified 
  203             `lsst.daf.butler.instrument` name. 
  205         if isinstance(instrument, str):
 
  209             instrument = f
"{instrument.__module__}.{instrument.__qualname__}" 
  212     def addTask(self, task: Union[PipelineTask, str], label: str):
 
  213         """Add a new task to the pipeline, or replace a task that is already 
  214         associated with the supplied label. 
  218         task: `PipelineTask` or `str` 
  219             Either a derived class object of a `PipelineTask` or a string 
  220             corresponding to a fully qualified `PipelineTask` name. 
  222             A label that is used to identify the `PipelineTask` being added 
  224         if isinstance(task, str):
 
  226         elif issubclass(task, PipelineTask):
 
  227             taskName = f
"{task.__module__}.{task.__qualname__}" 
  229             raise ValueError(
"task must be either a child class of PipelineTask or a string containing" 
  230                              " a fully qualified name to one")
 
  235             if isinstance(task, str):
 
  237             label = task._DefaultName
 
  241         """Remove a task from the pipeline. 
  246             The label used to identify the task that is to be removed 
  251             If no task with that label exists in the pipeline 
  257         """Apply single config override. 
  264             Fully-qualified field name. 
  266             Value to be given to a field. 
  271         """Add overrides from a specified file. 
  276             The label used to identify the task associated with config to 
  279             Path to the override file. 
  284         """Add Overrides by running a snippet of python code against a config. 
  289             The label used to identity the task associated with config to 
  292             A string which is valid python code to be executed. This is done 
  293             with config as the only local accessible value. 
  299             raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
 
  300         self.
_pipelineIR.tasks[label].add_or_update_config(newConfig)
 
  306         """Returns a generator of TaskDefs which can be used to create quantum 
  311         generator : generator of `TaskDef` 
  312             The generator returned will be the sorted iterator of tasks which 
  313             are to be used in constructing a quantum graph. 
  318             If a dataId is supplied in a config block. This is in place for 
  322         for label, taskIR 
in self.
_pipelineIR.tasks.items():
 
  324             taskName = taskClass.__qualname__
 
  325             config = taskClass.ConfigClass()
 
  328                 overrides.addInstrumentOverride(self.
_pipelineIR.instrument, taskClass._DefaultName)
 
  329             if taskIR.config 
is not None:
 
  330                 for configIR 
in taskIR.config:
 
  331                     if configIR.dataId 
is not None:
 
  332                         raise NotImplementedError(
"Specializing a config on a partial data id is not yet " 
  333                                                   "supported in Pipeline definition")
 
  335                     if configIR.dataId 
is None:
 
  337                             for configFile 
in configIR.file:
 
  338                                 overrides.addFileOverride(configFile)
 
  339                         if configIR.python 
is not None:
 
  340                             overrides.addPythonOverride(configIR.python)
 
  341                         for key, value 
in configIR.rest.items():
 
  342                             overrides.addValueOverride(key, value)
 
  343             overrides.applyTo(config)
 
  346             taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
 
  350             label_to_config = {x.label: x.config 
for x 
in taskDefs}
 
  354                 success = eval(contract.contract, 
None, label_to_config)
 
  356                     extra_info = f
": {contract.msg}" if contract.msg 
is not None else "" 
  358                                                    f
"satisfied{extra_info}")
 
  360         yield from pipeTools.orderPipeline(taskDefs)
 
  366         if not isinstance(other, Pipeline):
 
  371 @dataclass(frozen=
True)
 
  373     """An immutable struct that extracts and classifies the dataset types used 
  377     initInputs: NamedValueSet[DatasetType]
 
  378     """Dataset types that are needed as inputs in order to construct this Task. 
  380     Task-level `initInputs` may be classified as either 
  381     `~PipelineDatasetTypes.initInputs` or 
  382     `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 
  385     initOutputs: NamedValueSet[DatasetType]
 
  386     """Dataset types that may be written after constructing this Task. 
  388     Task-level `initOutputs` may be classified as either 
  389     `~PipelineDatasetTypes.initOutputs` or 
  390     `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. 
  393     inputs: NamedValueSet[DatasetType]
 
  394     """Dataset types that are regular inputs to this Task. 
  396     If an input dataset needed for a Quantum cannot be found in the input 
  397     collection(s) or produced by another Task in the Pipeline, that Quantum 
  398     (and all dependent Quanta) will not be produced. 
  400     Task-level `inputs` may be classified as either 
  401     `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates` 
  402     at the Pipeline level. 
  405     prerequisites: NamedValueSet[DatasetType]
 
  406     """Dataset types that are prerequisite inputs to this Task. 
  408     Prerequisite inputs must exist in the input collection(s) before the 
  409     pipeline is run, but do not constrain the graph - if a prerequisite is 
  410     missing for a Quantum, `PrerequisiteMissingError` is raised. 
  412     Prerequisite inputs are not resolved until the second stage of 
  413     QuantumGraph generation. 
  416     outputs: NamedValueSet[DatasetType]
 
  417     """Dataset types that are produced by this Task. 
  419     Task-level `outputs` may be classified as either 
  420     `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates` 
  421     at the Pipeline level. 
  425     def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
 
  426         """Extract and classify the dataset types from a single `PipelineTask`. 
  431             An instance of a `TaskDef` class for a particular `PipelineTask`. 
  433             Registry used to construct normalized `DatasetType` objects and 
  434             retrieve those that are incomplete. 
  438         types: `TaskDatasetTypes` 
  439             The dataset types used by this task. 
  441         def makeDatasetTypesSet(connectionType, freeze=True):
 
  442             """Constructs a set of true `DatasetType` objects 
  446             connectionType : `str` 
  447                 Name of the connection type to produce a set for, corresponds 
  448                 to an attribute of type `list` on the connection class instance 
  449             freeze : `bool`, optional 
  450                 If `True`, call `NamedValueSet.freeze` on the object returned. 
  454             datasetTypes : `NamedValueSet` 
  455                 A set of all datasetTypes which correspond to the input 
  456                 connection type specified in the connection class of this 
  461             This function is a closure over the variables ``registry`` and 
  464             datasetTypes = NamedValueSet()
 
  466                 dimensions = 
set(getattr(c, 
'dimensions', 
set()))
 
  467                 if "skypix" in dimensions:
 
  469                         datasetType = registry.getDatasetType(c.name)
 
  470                     except LookupError 
as err:
 
  472                             f
"DatasetType '{c.name}' referenced by " 
  473                             f
"{type(taskDef.connections).__name__} uses 'skypix' as a dimension " 
  474                             f
"placeholder, but does not already exist in the registry.  " 
  475                             f
"Note that reference catalog names are now used as the dataset " 
  476                             f
"type name instead of 'ref_cat'." 
  478                     rest1 = 
set(registry.dimensions.extract(dimensions - 
set([
"skypix"])).names)
 
  479                     rest2 = 
set(dim.name 
for dim 
in datasetType.dimensions
 
  480                                 if not isinstance(dim, SkyPixDimension))
 
  482                         raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in " 
  483                                          f
"connections ({rest1}) are inconsistent with those in " 
  484                                          f
"registry's version of this dataset ({rest2}).")
 
  486                     datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
 
  489                         registryDatasetType = registry.getDatasetType(c.name)
 
  491                         registryDatasetType = datasetType
 
  492                     if datasetType != registryDatasetType:
 
  493                         raise ValueError(f
"Supplied dataset type ({datasetType}) inconsistent with " 
  494                                          f
"registry definition ({registryDatasetType})")
 
  495                 datasetTypes.add(datasetType)
 
  497                 datasetTypes.freeze()
 
  501         outputs = makeDatasetTypesSet(
"outputs", freeze=
False)
 
  502         if taskDef.metadataDatasetName 
is not None:
 
  505             dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
 
  506             outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions, 
"PropertyList")}
 
  510             initInputs=makeDatasetTypesSet(
"initInputs"),
 
  511             initOutputs=makeDatasetTypesSet(
"initOutputs"),
 
  512             inputs=makeDatasetTypesSet(
"inputs"),
 
  513             prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
 
  518 @dataclass(frozen=
True)
 
  520     """An immutable struct that classifies the dataset types used in a 
  524     initInputs: NamedValueSet[DatasetType]
 
  525     """Dataset types that are needed as inputs in order to construct the Tasks 
  528     This does not include dataset types that are produced when constructing 
  529     other Tasks in the Pipeline (these are classified as `initIntermediates`). 
  532     initOutputs: NamedValueSet[DatasetType]
 
  533     """Dataset types that may be written after constructing the Tasks in this 
  536     This does not include dataset types that are also used as inputs when 
  537     constructing other Tasks in the Pipeline (these are classified as 
  538     `initIntermediates`). 
  541     initIntermediates: NamedValueSet[DatasetType]
 
  542     """Dataset types that are both used when constructing one or more Tasks 
  543     in the Pipeline and produced as a side-effect of constructing another 
  544     Task in the Pipeline. 
  547     inputs: NamedValueSet[DatasetType]
 
  548     """Dataset types that are regular inputs for the full pipeline. 
  550     If an input dataset needed for a Quantum cannot be found in the input 
  551     collection(s), that Quantum (and all dependent Quanta) will not be 
  555     prerequisites: NamedValueSet[DatasetType]
 
  556     """Dataset types that are prerequisite inputs for the full Pipeline. 
  558     Prerequisite inputs must exist in the input collection(s) before the 
  559     pipeline is run, but do not constrain the graph - if a prerequisite is 
  560     missing for a Quantum, `PrerequisiteMissingError` is raised. 
  562     Prerequisite inputs are not resolved until the second stage of 
  563     QuantumGraph generation. 
  566     intermediates: NamedValueSet[DatasetType]
 
  567     """Dataset types that are output by one Task in the Pipeline and consumed 
  568     as inputs by one or more other Tasks in the Pipeline. 
  571     outputs: NamedValueSet[DatasetType]
 
  572     """Dataset types that are output by a Task in the Pipeline and not consumed 
  573     by any other Task in the Pipeline. 
  576     byTask: Mapping[str, TaskDatasetTypes]
 
  577     """Per-Task dataset types, keyed by label in the `Pipeline`. 
  579     This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming 
  580     neither has been modified since the dataset types were extracted, of 
  585     def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
 
  586         """Extract and classify the dataset types from all tasks in a 
  592             An ordered collection of tasks that can be run together. 
  594             Registry used to construct normalized `DatasetType` objects and 
  595             retrieve those that are incomplete. 
  599         types: `PipelineDatasetTypes` 
  600             The dataset types used by this `Pipeline`. 
  605             Raised if Tasks are inconsistent about which datasets are marked 
  606             prerequisite.  This indicates that the Tasks cannot be run as part 
  607             of the same `Pipeline`. 
  609         allInputs = NamedValueSet()
 
  610         allOutputs = NamedValueSet()
 
  611         allInitInputs = NamedValueSet()
 
  612         allInitOutputs = NamedValueSet()
 
  613         prerequisites = NamedValueSet()
 
  615         if isinstance(pipeline, Pipeline):
 
  616             pipeline = pipeline.toExpandedPipeline()
 
  617         for taskDef 
in pipeline:
 
  618             thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
 
  619             allInitInputs |= thisTask.initInputs
 
  620             allInitOutputs |= thisTask.initOutputs
 
  621             allInputs |= thisTask.inputs
 
  622             prerequisites |= thisTask.prerequisites
 
  623             allOutputs |= thisTask.outputs
 
  624             byTask[taskDef.label] = thisTask
 
  625         if not prerequisites.isdisjoint(allInputs):
 
  626             raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
 
  627                 {dt.name 
for dt 
in allInputs & prerequisites}
 
  629         if not prerequisites.isdisjoint(allOutputs):
 
  630             raise ValueError(
"{} marked as both prerequisites and outputs".
format(
 
  631                 {dt.name 
for dt 
in allOutputs & prerequisites}
 
  636         intermediateComponents = NamedValueSet()
 
  637         intermediateComposites = NamedValueSet()
 
  638         outputNameMapping = {dsType.name: dsType 
for dsType 
in allOutputs}
 
  639         for dsType 
in allInputs:
 
  641             name, component = dsType.nameAndComponent()
 
  645             if component 
is not None:
 
  646                 if name 
in outputNameMapping:
 
  647                     if outputNameMapping[name].dimensions != dsType.dimensions:
 
  648                         raise ValueError(f
"Component dataset type {dsType.name} has different " 
  649                                          f
"dimensions ({dsType.dimensions}) than its parent " 
  650                                          f
"({outputNameMapping[name].dimensions}).")
 
  651                     composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
 
  652                                             universe=registry.dimensions)
 
  653                     intermediateComponents.add(dsType)
 
  654                     intermediateComposites.add(composite)
 
  656         def checkConsistency(a: NamedValueSet, b: NamedValueSet):
 
  657             common = a.names & b.names
 
  659                 if a[name] != b[name]:
 
  660                     raise ValueError(f
"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
 
  662         checkConsistency(allInitInputs, allInitOutputs)
 
  663         checkConsistency(allInputs, allOutputs)
 
  664         checkConsistency(allInputs, intermediateComposites)
 
  665         checkConsistency(allOutputs, intermediateComposites)
 
  667         def frozen(s: NamedValueSet) -> NamedValueSet:
 
  672             initInputs=
frozen(allInitInputs - allInitOutputs),
 
  673             initIntermediates=
frozen(allInitInputs & allInitOutputs),
 
  674             initOutputs=
frozen(allInitOutputs - allInitInputs),
 
  675             inputs=
frozen(allInputs - allOutputs - intermediateComponents),
 
  676             intermediates=
frozen(allInputs & allOutputs | intermediateComponents),
 
  677             outputs=
frozen(allOutputs - allInputs - intermediateComposites),
 
  678             prerequisites=
frozen(prerequisites),
 
  679             byTask=MappingProxyType(byTask),