21 from __future__
import annotations
23 """Module defining Pipeline class and related methods.
26 __all__ = [
"Pipeline",
"TaskDef",
"TaskDatasetTypes",
"PipelineDatasetTypes",
"LabelSpecifier"]
31 from dataclasses
import dataclass
32 from types
import MappingProxyType
33 from typing
import Dict, Mapping, Set, Union, Generator, TYPE_CHECKING, Optional, Tuple
43 from lsst.daf.butler
import DatasetType, NamedValueSet, Registry, SkyPixDimension, ButlerURI
45 from .configOverrides
import ConfigOverrides
46 from .connections
import iterConnections
47 from .pipelineTask
import PipelineTask
49 from .
import pipelineIR
50 from .
import pipeTools
66 """A structure to specify a subset of labels to load
68 This structure may contain a set of labels to be used in subsetting a
69 pipeline, or a beginning and end point. Beginning or end may be empty,
70 in which case the range will be a half open interval. Unlike python
71 iteration bounds, end bounds are *INCLUDED*. Note that range based
72 selection is not well defined for pipelines that are not linear in nature,
73 and correct behavior is not guaranteed, or may vary from run to run.
75 labels: Optional[Set[str]] =
None
76 begin: Optional[str] =
None
77 end: Optional[str] =
None
80 if self.labels
is not None and (self.begin
or self.end):
81 raise ValueError(
"This struct can only be initialized with a labels set or "
82 "a begin (and/or) end specifier")
86 """TaskDef is a collection of information about task needed by Pipeline.
88 The information includes task name, configuration object and optional
89 task class. This class is just a collection of attributes and it exposes
90 all of them so that attributes could potentially be modified in place
91 (e.g. if configuration needs extra overrides).
96 `PipelineTask` class name, currently it is not specified whether this
97 is a fully-qualified name or partial name (e.g. ``module.TaskClass``).
98 Framework should be prepared to handle all cases.
99 config : `lsst.pex.config.Config`
100 Instance of the configuration class corresponding to this task class,
101 usually with all overrides applied. This config will be frozen.
102 taskClass : `type` or ``None``
103 `PipelineTask` class object, can be ``None``. If ``None`` then
104 framework will have to locate and load class.
105 label : `str`, optional
106 Task label, usually a short string unique in a pipeline.
108 def __init__(self, taskName, config, taskClass=None, label=""):
114 self.
connectionsconnections = config.connections.ConnectionsClass(config=config)
118 """Name of a dataset type for configuration of this task (`str`)
120 return self.
labellabel +
"_config"
124 """Name of a dataset type for metadata of this task, `None` if
125 metadata is not to be saved (`str`)
127 if self.
configconfig.saveMetadata:
128 return self.
labellabel +
"_metadata"
133 rep =
"TaskDef(" + self.
taskNametaskName
135 rep +=
", label=" + self.
labellabel
140 if not isinstance(other, TaskDef):
145 return self.
taskClasstaskClass == other.taskClass
and self.
labellabel == other.label
152 """A `Pipeline` is a representation of a series of tasks to run, and the
153 configuration for those tasks.
158 A description of that this pipeline does.
161 pipeline_dict = {
"description": description,
"tasks": {}}
166 """Load a pipeline defined in a pipeline yaml file.
171 A path that points to a pipeline defined in yaml format. This
172 filename may also supply additional labels to be used in
173 subsetting the loaded Pipeline. These labels are separated from
174 the path by a \\#, and may be specified as a comma separated
175 list, or a range denoted as beginning..end. Beginning or end may
176 be empty, in which case the range will be a half open interval.
177 Unlike python iteration bounds, end bounds are *INCLUDED*. Note
178 that range based selection is not well defined for pipelines that
179 are not linear in nature, and correct behavior is not guaranteed,
180 or may vary from run to run.
185 The pipeline loaded from specified location with appropriate (if
190 This method attempts to prune any contracts that contain labels which
191 are not in the declared subset of labels. This pruning is done using a
192 string based matching due to the nature of contracts and may prune more
195 return cls.
from_urifrom_uri(filename)
198 def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
199 """Load a pipeline defined in a pipeline yaml file at a location
204 uri: `str` or `ButlerURI`
205 If a string is supplied this should be a URI path that points to a
206 pipeline defined in yaml format. This uri may also supply
207 additional labels to be used in subsetting the loaded Pipeline.
208 These labels are separated from the path by a \\#, and may be
209 specified as a comma separated list, or a range denoted as
210 beginning..end. Beginning or end may be empty, in which case the
211 range will be a half open interval. Unlike python iteration
212 bounds, end bounds are *INCLUDED*. Note that range based selection
213 is not well defined for pipelines that are not linear in nature,
214 and correct behavior is not guaranteed, or may vary from run to
215 run. The same specifiers can be used with a ButlerURI object, by
216 being the sole contents in the fragments attribute.
221 The pipeline loaded from specified location with appropriate (if
226 This method attempts to prune any contracts that contain labels which
227 are not in the declared subset of labels. This pruning is done using a
228 string based matching due to the nature of contracts and may prune more
233 pipeline: Pipeline = cls.
fromIRfromIR(pipelineIR.PipelineIR.from_uri(uri))
236 if label_specifier
is not None:
237 pipeline = pipeline.subsetFromLabels(label_specifier)
241 """Subset a pipeline to contain only labels specified in labelSpecifier
245 labelSpecifier : `labelSpecifier`
246 Object containing labels that describes how to subset a pipeline.
250 pipeline : `Pipeline`
251 A new pipeline object that is a subset of the old pipeline
256 Raised if there is an issue with specified labels
260 This method attempts to prune any contracts that contain labels which
261 are not in the declared subset of labels. This pruning is done using a
262 string based matching due to the nature of contracts and may prune more
266 if labelSpecifier.labels:
267 labelSet = labelSpecifier.labels
278 pipeline = copy.deepcopy(self)
279 pipeline._pipelineIR.contracts = []
280 labels = {taskdef.label:
True for taskdef
in pipeline.toExpandedPipeline()}
283 if labelSpecifier.begin
is not None:
284 if labelSpecifier.begin
not in labels:
285 raise ValueError(f
"Beginning of range subset, {labelSpecifier.begin}, not found in "
286 "pipeline definition")
287 if labelSpecifier.end
is not None:
288 if labelSpecifier.end
not in labels:
289 raise ValueError(f
"End of range subset, {labelSpecifier.end}, not found in pipeline "
294 if labelSpecifier.begin
is not None:
295 if label != labelSpecifier.begin:
298 labelSpecifier.begin =
None
300 if labelSpecifier.end
is not None and label == labelSpecifier.end:
302 return Pipeline.fromIR(self.
_pipelineIR_pipelineIR.subset_from_labels(labelSet))
305 def _parse_file_specifier(uri: Union[str, ButlerURI]
306 ) -> Tuple[ButlerURI, Optional[LabelSpecifier]]:
307 """Split appart a uri and any possible label subsets
309 if isinstance(uri, str):
311 uri, num_replace = re.subn(
"[:](?!\\/\\/)",
"#", uri)
313 warnings.warn(f
"The pipeline file {uri} seems to use the legacy : to separate "
314 "labels, this is deprecated and will be removed after June 2021, please use "
316 category=FutureWarning)
317 if uri.count(
"#") > 1:
318 raise ValueError(
"Only one set of labels is allowed when specifying a pipeline to load")
320 label_subset = uri.fragment
or None
322 specifier: Optional[LabelSpecifier]
323 if label_subset
is not None:
324 label_subset = urllib.parse.unquote(label_subset)
325 args: Dict[str, Union[Set[str], str,
None]]
327 if ',' in label_subset:
328 if '..' in label_subset:
329 raise ValueError(
"Can only specify a list of labels or a range"
330 "when loading a Pipline not both")
331 args = {
"labels":
set(label_subset.split(
","))}
333 elif '..' in label_subset:
336 begin, end, *rest = label_subset.split(
"..")
338 raise ValueError(
"Only one range can be specified when loading a pipeline")
339 args = {
"begin": begin
if begin
else None,
"end": end
if end
else None}
342 args = {
"labels": {label_subset}}
348 return uri, specifier
352 """Create a pipeline from string formatted as a pipeline document.
356 pipeline_string : `str`
357 A string that is formatted according like a pipeline document
363 pipeline = cls.
fromIRfromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
367 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
368 """Create a pipeline from an already created `PipelineIR` object.
372 deserialized_pipeline: `PipelineIR`
373 An already created pipeline intermediate representation object
379 pipeline = cls.__new__(cls)
380 pipeline._pipelineIR = deserialized_pipeline
385 """Create a new pipeline by copying an already existing `Pipeline`.
390 An already created pipeline intermediate representation object
396 return cls.
fromIRfromIR(copy.deepcopy(pipeline._pipelineIR))
402 """Add an instrument to the pipeline, or replace an instrument that is
407 instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
408 Either a derived class object of a `lsst.daf.butler.instrument` or
409 a string corresponding to a fully qualified
410 `lsst.daf.butler.instrument` name.
412 if isinstance(instrument, str):
417 instrument = f
"{instrument.__module__}.{instrument.__qualname__}"
418 self.
_pipelineIR_pipelineIR.instrument = instrument
421 """Get the instrument from the pipeline.
425 instrument : `~lsst.daf.butler.instrument.Instrument`, `str`, or None
426 A derived class object of a `lsst.daf.butler.instrument`, a string
427 corresponding to a fully qualified `lsst.daf.butler.instrument`
428 name, or None if the pipeline does not have an instrument.
432 def addTask(self, task: Union[PipelineTask, str], label: str) ->
None:
433 """Add a new task to the pipeline, or replace a task that is already
434 associated with the supplied label.
438 task: `PipelineTask` or `str`
439 Either a derived class object of a `PipelineTask` or a string
440 corresponding to a fully qualified `PipelineTask` name.
442 A label that is used to identify the `PipelineTask` being added
444 if isinstance(task, str):
446 elif issubclass(task, PipelineTask):
447 taskName = f
"{task.__module__}.{task.__qualname__}"
449 raise ValueError(
"task must be either a child class of PipelineTask or a string containing"
450 " a fully qualified name to one")
455 if isinstance(task, str):
457 label = task._DefaultName
461 """Remove a task from the pipeline.
466 The label used to identify the task that is to be removed
471 If no task with that label exists in the pipeline
477 """Apply single config override.
484 Fully-qualified field name.
486 Value to be given to a field.
491 """Add overrides from a specified file.
496 The label used to identify the task associated with config to
499 Path to the override file.
504 """Add Overrides by running a snippet of python code against a config.
509 The label used to identity the task associated with config to
512 A string which is valid python code to be executed. This is done
513 with config as the only local accessible value.
518 if label ==
"parameters":
519 if newConfig.rest.keys() - self.
_pipelineIR_pipelineIR.parameters.mapping.keys():
520 raise ValueError(
"Cannot override parameters that are not defined in pipeline")
521 self.
_pipelineIR_pipelineIR.parameters.mapping.update(newConfig.rest)
523 raise ValueError(
"Setting parameters section with config file is not supported")
525 raise ValueError(
"Setting parameters section using python block in unsupported")
528 raise LookupError(f
"There are no tasks labeled '{label}' in the pipeline")
529 self.
_pipelineIR_pipelineIR.tasks[label].add_or_update_config(newConfig)
538 """Returns a generator of TaskDefs which can be used to create quantum
543 generator : generator of `TaskDef`
544 The generator returned will be the sorted iterator of tasks which
545 are to be used in constructing a quantum graph.
550 If a dataId is supplied in a config block. This is in place for
554 for label, taskIR
in self.
_pipelineIR_pipelineIR.tasks.items():
556 taskName = taskClass.__qualname__
557 config = taskClass.ConfigClass()
559 if self.
_pipelineIR_pipelineIR.instrument
is not None:
560 overrides.addInstrumentOverride(self.
_pipelineIR_pipelineIR.instrument, taskClass._DefaultName)
561 if taskIR.config
is not None:
562 for configIR
in (configIr.formatted(self.
_pipelineIR_pipelineIR.parameters)
563 for configIr
in taskIR.config):
564 if configIR.dataId
is not None:
565 raise NotImplementedError(
"Specializing a config on a partial data id is not yet "
566 "supported in Pipeline definition")
568 if configIR.dataId
is None:
570 for configFile
in configIR.file:
571 overrides.addFileOverride(os.path.expandvars(configFile))
572 if configIR.python
is not None:
573 overrides.addPythonOverride(configIR.python)
574 for key, value
in configIR.rest.items():
575 overrides.addValueOverride(key, value)
576 overrides.applyTo(config)
579 taskDefs.append(
TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
582 if self.
_pipelineIR_pipelineIR.contracts
is not None:
583 label_to_config = {x.label: x.config
for x
in taskDefs}
584 for contract
in self.
_pipelineIR_pipelineIR.contracts:
587 success = eval(contract.contract,
None, label_to_config)
589 extra_info = f
": {contract.msg}" if contract.msg
is not None else ""
591 f
"satisfied{extra_info}")
593 yield from pipeTools.orderPipeline(taskDefs)
599 if not isinstance(other, Pipeline):
601 return self.
_pipelineIR_pipelineIR == other._pipelineIR
604 @dataclass(frozen=True)
606 """An immutable struct that extracts and classifies the dataset types used
610 initInputs: NamedValueSet[DatasetType]
611 """Dataset types that are needed as inputs in order to construct this Task.
613 Task-level `initInputs` may be classified as either
614 `~PipelineDatasetTypes.initInputs` or
615 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
618 initOutputs: NamedValueSet[DatasetType]
619 """Dataset types that may be written after constructing this Task.
621 Task-level `initOutputs` may be classified as either
622 `~PipelineDatasetTypes.initOutputs` or
623 `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
626 inputs: NamedValueSet[DatasetType]
627 """Dataset types that are regular inputs to this Task.
629 If an input dataset needed for a Quantum cannot be found in the input
630 collection(s) or produced by another Task in the Pipeline, that Quantum
631 (and all dependent Quanta) will not be produced.
633 Task-level `inputs` may be classified as either
634 `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates`
635 at the Pipeline level.
638 prerequisites: NamedValueSet[DatasetType]
639 """Dataset types that are prerequisite inputs to this Task.
641 Prerequisite inputs must exist in the input collection(s) before the
642 pipeline is run, but do not constrain the graph - if a prerequisite is
643 missing for a Quantum, `PrerequisiteMissingError` is raised.
645 Prerequisite inputs are not resolved until the second stage of
646 QuantumGraph generation.
649 outputs: NamedValueSet[DatasetType]
650 """Dataset types that are produced by this Task.
652 Task-level `outputs` may be classified as either
653 `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates`
654 at the Pipeline level.
658 def fromTaskDef(cls, taskDef: TaskDef, *, registry: Registry) -> TaskDatasetTypes:
659 """Extract and classify the dataset types from a single `PipelineTask`.
664 An instance of a `TaskDef` class for a particular `PipelineTask`.
666 Registry used to construct normalized `DatasetType` objects and
667 retrieve those that are incomplete.
671 types: `TaskDatasetTypes`
672 The dataset types used by this task.
674 def makeDatasetTypesSet(connectionType: str, freeze: bool =
True) -> NamedValueSet[DatasetType]:
675 """Constructs a set of true `DatasetType` objects
679 connectionType : `str`
680 Name of the connection type to produce a set for, corresponds
681 to an attribute of type `list` on the connection class instance
682 freeze : `bool`, optional
683 If `True`, call `NamedValueSet.freeze` on the object returned.
687 datasetTypes : `NamedValueSet`
688 A set of all datasetTypes which correspond to the input
689 connection type specified in the connection class of this
694 This function is a closure over the variables ``registry`` and
697 datasetTypes = NamedValueSet()
699 dimensions =
set(getattr(c,
'dimensions',
set()))
700 if "skypix" in dimensions:
702 datasetType = registry.getDatasetType(c.name)
703 except LookupError
as err:
705 f
"DatasetType '{c.name}' referenced by "
706 f
"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
707 f
"placeholder, but does not already exist in the registry. "
708 f
"Note that reference catalog names are now used as the dataset "
709 f
"type name instead of 'ref_cat'."
711 rest1 =
set(registry.dimensions.extract(dimensions -
set([
"skypix"])).names)
712 rest2 =
set(dim.name
for dim
in datasetType.dimensions
713 if not isinstance(dim, SkyPixDimension))
715 raise ValueError(f
"Non-skypix dimensions for dataset type {c.name} declared in "
716 f
"connections ({rest1}) are inconsistent with those in "
717 f
"registry's version of this dataset ({rest2}).")
723 registryDatasetType =
None
725 registryDatasetType = registry.getDatasetType(c.name)
727 compositeName, componentName = DatasetType.splitDatasetTypeName(c.name)
728 parentStorageClass = DatasetType.PlaceholderParentStorageClass \
729 if componentName
else None
730 datasetType = c.makeDatasetType(
732 parentStorageClass=parentStorageClass
734 registryDatasetType = datasetType
736 datasetType = c.makeDatasetType(
738 parentStorageClass=registryDatasetType.parentStorageClass
741 if registryDatasetType
and datasetType != registryDatasetType:
742 raise ValueError(f
"Supplied dataset type ({datasetType}) inconsistent with "
743 f
"registry definition ({registryDatasetType}) "
744 f
"for {taskDef.label}.")
745 datasetTypes.add(datasetType)
747 datasetTypes.freeze()
751 outputs = makeDatasetTypesSet(
"outputs", freeze=
False)
752 if taskDef.metadataDatasetName
is not None:
755 dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
756 outputs |= {DatasetType(taskDef.metadataDatasetName, dimensions,
"PropertySet")}
760 initInputs=makeDatasetTypesSet(
"initInputs"),
761 initOutputs=makeDatasetTypesSet(
"initOutputs"),
762 inputs=makeDatasetTypesSet(
"inputs"),
763 prerequisites=makeDatasetTypesSet(
"prerequisiteInputs"),
768 @dataclass(frozen=True)
770 """An immutable struct that classifies the dataset types used in a
774 initInputs: NamedValueSet[DatasetType]
775 """Dataset types that are needed as inputs in order to construct the Tasks
778 This does not include dataset types that are produced when constructing
779 other Tasks in the Pipeline (these are classified as `initIntermediates`).
782 initOutputs: NamedValueSet[DatasetType]
783 """Dataset types that may be written after constructing the Tasks in this
786 This does not include dataset types that are also used as inputs when
787 constructing other Tasks in the Pipeline (these are classified as
788 `initIntermediates`).
791 initIntermediates: NamedValueSet[DatasetType]
792 """Dataset types that are both used when constructing one or more Tasks
793 in the Pipeline and produced as a side-effect of constructing another
794 Task in the Pipeline.
797 inputs: NamedValueSet[DatasetType]
798 """Dataset types that are regular inputs for the full pipeline.
800 If an input dataset needed for a Quantum cannot be found in the input
801 collection(s), that Quantum (and all dependent Quanta) will not be
805 prerequisites: NamedValueSet[DatasetType]
806 """Dataset types that are prerequisite inputs for the full Pipeline.
808 Prerequisite inputs must exist in the input collection(s) before the
809 pipeline is run, but do not constrain the graph - if a prerequisite is
810 missing for a Quantum, `PrerequisiteMissingError` is raised.
812 Prerequisite inputs are not resolved until the second stage of
813 QuantumGraph generation.
816 intermediates: NamedValueSet[DatasetType]
817 """Dataset types that are output by one Task in the Pipeline and consumed
818 as inputs by one or more other Tasks in the Pipeline.
821 outputs: NamedValueSet[DatasetType]
822 """Dataset types that are output by a Task in the Pipeline and not consumed
823 by any other Task in the Pipeline.
826 byTask: Mapping[str, TaskDatasetTypes]
827 """Per-Task dataset types, keyed by label in the `Pipeline`.
829 This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming
830 neither has been modified since the dataset types were extracted, of
835 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
836 """Extract and classify the dataset types from all tasks in a
842 An ordered collection of tasks that can be run together.
844 Registry used to construct normalized `DatasetType` objects and
845 retrieve those that are incomplete.
849 types: `PipelineDatasetTypes`
850 The dataset types used by this `Pipeline`.
855 Raised if Tasks are inconsistent about which datasets are marked
856 prerequisite. This indicates that the Tasks cannot be run as part
857 of the same `Pipeline`.
859 allInputs = NamedValueSet()
860 allOutputs = NamedValueSet()
861 allInitInputs = NamedValueSet()
862 allInitOutputs = NamedValueSet()
863 prerequisites = NamedValueSet()
865 if isinstance(pipeline, Pipeline):
866 pipeline = pipeline.toExpandedPipeline()
867 for taskDef
in pipeline:
868 thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
869 allInitInputs |= thisTask.initInputs
870 allInitOutputs |= thisTask.initOutputs
871 allInputs |= thisTask.inputs
872 prerequisites |= thisTask.prerequisites
873 allOutputs |= thisTask.outputs
874 byTask[taskDef.label] = thisTask
875 if not prerequisites.isdisjoint(allInputs):
876 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
877 {dt.name
for dt
in allInputs & prerequisites}
879 if not prerequisites.isdisjoint(allOutputs):
880 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
881 {dt.name
for dt
in allOutputs & prerequisites}
886 intermediateComponents = NamedValueSet()
887 intermediateComposites = NamedValueSet()
888 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
889 for dsType
in allInputs:
891 name, component = dsType.nameAndComponent()
895 if component
is not None:
896 if name
in outputNameMapping:
897 if outputNameMapping[name].dimensions != dsType.dimensions:
898 raise ValueError(f
"Component dataset type {dsType.name} has different "
899 f
"dimensions ({dsType.dimensions}) than its parent "
900 f
"({outputNameMapping[name].dimensions}).")
901 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
902 universe=registry.dimensions)
903 intermediateComponents.add(dsType)
904 intermediateComposites.add(composite)
906 def checkConsistency(a: NamedValueSet, b: NamedValueSet):
907 common = a.names & b.names
909 if a[name] != b[name]:
910 raise ValueError(f
"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
912 checkConsistency(allInitInputs, allInitOutputs)
913 checkConsistency(allInputs, allOutputs)
914 checkConsistency(allInputs, intermediateComposites)
915 checkConsistency(allOutputs, intermediateComposites)
917 def frozen(s: NamedValueSet) -> NamedValueSet:
922 initInputs=frozen(allInitInputs - allInitOutputs),
923 initIntermediates=frozen(allInitInputs & allInitOutputs),
924 initOutputs=frozen(allInitOutputs - allInitInputs),
925 inputs=frozen(allInputs - allOutputs - intermediateComponents),
926 intermediates=frozen(allInputs & allOutputs | intermediateComponents),
927 outputs=frozen(allOutputs - allInputs - intermediateComposites),
928 prerequisites=frozen(prerequisites),
929 byTask=MappingProxyType(byTask),
PipelineDatasetTypes fromPipeline(cls, pipeline, *Registry registry)
None toFile(self, str filename)
def __init__(self, str description)
Pipeline subsetFromLabels(self, LabelSpecifier labelSpecifier)
None _addConfigImpl(self, str label, pipelineIR.ConfigIR newConfig)
None addInstrument(self, Union[Instrument, str] instrument)
Pipeline from_uri(cls, Union[str, ButlerURI] uri)
None addTask(self, Union[PipelineTask, str] task, str label)
None addConfigFile(self, str label, str filename)
Pipeline fromPipeline(cls, pipelineIR.PipelineIR pipeline)
None addConfigOverride(self, str label, str key, object value)
Generator[TaskDef, None, None] toExpandedPipeline(self)
None addConfigPython(self, str label, str pythonString)
None write_to_uri(self, Union[str, ButlerURI] uri)
Pipeline fromString(cls, str pipeline_string)
def __eq__(self, object other)
Pipeline fromFile(cls, str filename)
None removeTask(self, str label)
Instrument getInstrument(self)
Pipeline fromIR(cls, pipelineIR.PipelineIR deserialized_pipeline)
Tuple[ButlerURI, Optional[LabelSpecifier]] _parse_file_specifier(Union[str, ButlerURI] uri)
TaskDatasetTypes fromTaskDef(cls, TaskDef taskDef, *Registry registry)
def __init__(self, taskName, config, taskClass=None, label="")
Optional[str] metadataDatasetName(self)
str configDatasetName(self)
bool __eq__(self, object other)
daf::base::PropertySet * set
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
typing.Generator[BaseConnection, None, None] iterConnections(PipelineTaskConnections connections, Union[str, Iterable[str]] connectionType)