22 """Module defining few methods to manipulate or query pipelines. 26 __all__ = [
"isPipelineOrdered",
"orderPipeline"]
35 from .pipeline
import Pipeline
42 def _loadTaskClass(taskDef, taskFactory):
43 """Import task class if necessary. 47 `ImportError` is raised when task class cannot be imported. 48 `MissingTaskFactoryError` is raised when TaskFactory is needed but not provided. 50 taskClass = taskDef.taskClass
54 "factory instance is not provided")
55 taskClass = taskFactory.loadTaskClass(taskDef.taskName)
64 """Exception raised when client fails to provide TaskFactory instance. 69 class DuplicateOutputError(Exception):
70 """Exception raised when Pipeline has more than one task for the same 77 """Exception raised when Pipeline has data dependency cycle. 83 """Checks whether tasks in pipeline are correctly ordered. 85 Pipeline is correctly ordered if for any DatasetType produced by a task 86 in a pipeline all its consumer tasks are located after producer. 90 pipeline : `pipe.base.Pipeline` 92 taskFactory: `pipe.base.TaskFactory`, optional 93 Instance of an object which knows how to import task classes. It is only 94 used if pipeline task definitions do not define task classes. 98 True for correctly ordered pipeline, False otherwise. 102 `ImportError` is raised when task class cannot be imported. 103 `DuplicateOutputError` is raised when there is more than one producer for a 105 `MissingTaskFactoryError` is raised when TaskFactory is needed but not 110 for idx, taskDef
in enumerate(pipeline):
113 taskDef.taskClass = _loadTaskClass(taskDef, taskFactory)
116 outputs = taskDef.taskClass.getOutputDatasetTypes(taskDef.config)
117 for dsTypeDescr
in outputs.values():
118 dsType = dsTypeDescr.datasetType
119 if dsType.name
in producerIndex:
120 raise DuplicateOutputError(
"DatasetType `{}' appears more than " 121 "once as" " output".
format(dsType.name))
122 producerIndex[dsType.name] = idx
125 for idx, taskDef
in enumerate(pipeline):
128 inputs = taskDef.taskClass.getInputDatasetTypes(taskDef.config)
129 for dsTypeDescr
in inputs.values():
130 dsType = dsTypeDescr.datasetType
132 prodIdx = producerIndex.get(dsType.name, -1)
141 """Re-order tasks in pipeline to satisfy data dependencies. 143 When possible new ordering keeps original relative order of the tasks. 147 pipeline : `pipe.base.Pipeline` 148 Pipeline description. 149 taskFactory: `pipe.base.TaskFactory`, optional 150 Instance of an object which knows how to import task classes. It is only 151 used if pipeline task definitions do not define task classes. 155 Correctly ordered pipeline (`pipe.base.Pipeline` instance). 159 `ImportError` is raised when task class cannot be imported. 160 `DuplicateOutputError` is raised when there is more than one producer for a 162 `PipelineDataCycleError` is also raised when pipeline has dependency cycles. 163 `MissingTaskFactoryError` is raised when TaskFactory is needed but not 174 for idx, taskDef
in enumerate(pipeline):
177 taskClass = _loadTaskClass(taskDef, taskFactory)
180 dsMap = taskClass.getOutputDatasetTypes(taskDef.config)
181 for dsTypeDescr
in dsMap.values():
182 dsType = dsTypeDescr.datasetType
183 if dsType.name
in allOutputs:
185 "once as" " output".
format(dsType.name))
186 outputs[idx] =
set(dsTypeDescr.datasetType.name
for dsTypeDescr
in dsMap.values())
187 allOutputs.update(outputs[idx])
190 dsMap = taskClass.getInputDatasetTypes(taskDef.config)
191 inputs[idx] =
set(dsTypeDescr.datasetType.name
for dsTypeDescr
in dsMap.values())
192 allInputs.update(inputs[idx])
196 preExisting = allInputs - allOutputs
197 outputs[-1] = preExisting
210 thisTaskOutputs = outputs.get(idx,
set())
211 for taskInputs
in inputs.values():
212 taskInputs -= thisTaskOutputs
215 topNodes = [key
for key, value
in inputs.items()
if not value]
227 for idx, inputNames
in inputs.items():
228 taskName = pipeline[idx].label
229 outputNames = outputs[idx]
230 edge =
" {} -> {} -> {}".
format(inputNames, taskName, outputNames)
234 return Pipeline(pipeline[idx]
for idx
in result)
daf::base::PropertySet * set
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)