141 """Re-order tasks in pipeline to satisfy data dependencies. 143 When possible new ordering keeps original relative order of the tasks. 147 pipeline : `pipe.base.Pipeline` 148 Pipeline description. 149 taskFactory: `pipe.base.TaskFactory`, optional 150 Instance of an object which knows how to import task classes. It is only 151 used if pipeline task definitions do not define task classes. 155 Correctly ordered pipeline (`pipe.base.Pipeline` instance). 159 `ImportError` is raised when task class cannot be imported. 160 `DuplicateOutputError` is raised when there is more than one producer for a 162 `PipelineDataCycleError` is also raised when pipeline has dependency cycles. 163 `MissingTaskFactoryError` is raised when TaskFactory is needed but not 174 for idx, taskDef
in enumerate(pipeline):
177 taskClass = _loadTaskClass(taskDef, taskFactory)
180 dsMap = taskClass.getOutputDatasetTypes(taskDef.config)
181 for dsTypeDescr
in dsMap.values():
182 dsType = dsTypeDescr.datasetType
183 if dsType.name
in allOutputs:
184 raise DuplicateOutputError(
"DatasetType `{}' appears more than " 185 "once as" " output".
format(dsType.name))
186 outputs[idx] =
set(dsTypeDescr.datasetType.name
for dsTypeDescr
in dsMap.values())
187 allOutputs.update(outputs[idx])
190 dsMap = taskClass.getInputDatasetTypes(taskDef.config)
191 inputs[idx] =
set(dsTypeDescr.datasetType.name
for dsTypeDescr
in dsMap.values())
192 allInputs.update(inputs[idx])
196 preExisting = allInputs - allOutputs
197 outputs[-1] = preExisting
210 thisTaskOutputs = outputs.get(idx,
set())
211 for taskInputs
in inputs.values():
212 taskInputs -= thisTaskOutputs
215 topNodes = [key
for key, value
in inputs.items()
if not value]
227 for idx, inputNames
in inputs.items():
228 taskName = pipeline[idx].label
229 outputNames = outputs[idx]
230 edge =
" {} -> {} -> {}".
format(inputNames, taskName, outputNames)
232 raise PipelineDataCycleError(
"Pipeline has data cycles:\n" +
"\n".join(loops))
234 return Pipeline(pipeline[idx]
for idx
in result)
235 daf::base::PropertySet * set
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)