22 """Module defining few methods to manipulate or query pipelines.
26 __all__ = [
"isPipelineOrdered",
"orderPipeline"]
36 from .connections
import iterConnections
43 def _loadTaskClass(taskDef, taskFactory):
44 """Import task class if necessary.
48 `ImportError` is raised when task class cannot be imported.
49 `MissingTaskFactoryError` is raised when TaskFactory is needed but not
52 taskClass = taskDef.taskClass
56 "factory instance is not provided")
57 taskClass = taskFactory.loadTaskClass(taskDef.taskName)
66 """Exception raised when client fails to provide TaskFactory instance.
71 class DuplicateOutputError(Exception):
72 """Exception raised when Pipeline has more than one task for the same
79 """Exception raised when Pipeline has data dependency cycle.
85 """Checks whether tasks in pipeline are correctly ordered.
87 Pipeline is correctly ordered if for any DatasetType produced by a task
88 in a pipeline all its consumer tasks are located after producer.
92 pipeline : `pipe.base.Pipeline`
94 taskFactory: `pipe.base.TaskFactory`, optional
95 Instance of an object which knows how to import task classes. It is
96 only used if pipeline task definitions do not define task classes.
100 True for correctly ordered pipeline, False otherwise.
104 `ImportError` is raised when task class cannot be imported.
105 `DuplicateOutputError` is raised when there is more than one producer for a
107 `MissingTaskFactoryError` is raised when TaskFactory is needed but not
112 for idx, taskDef
in enumerate(pipeline):
115 if attr.name
in producerIndex:
116 raise DuplicateOutputError(
"DatasetType `{}' appears more than "
117 "once as output".
format(attr.name))
118 producerIndex[attr.name] = idx
121 for idx, taskDef
in enumerate(pipeline):
124 inputs = {name: getattr(taskDef.connections, name)
for name
in taskDef.connections.inputs}
125 for dsTypeDescr
in inputs.values():
127 prodIdx = producerIndex.get(dsTypeDescr.name, -1)
136 """Re-order tasks in pipeline to satisfy data dependencies.
138 When possible new ordering keeps original relative order of the tasks.
142 pipeline : `list` of `pipe.base.TaskDef`
143 Pipeline description.
147 Correctly ordered pipeline (`list` of `pipe.base.TaskDef` objects).
151 `DuplicateOutputError` is raised when there is more than one producer for a
153 `PipelineDataCycleError` is also raised when pipeline has dependency
154 cycles. `MissingTaskFactoryError` is raised when TaskFactory is needed but
165 for idx, taskDef
in enumerate(pipeline):
167 dsMap = {name: getattr(taskDef.connections, name)
for name
in taskDef.connections.outputs}
168 for dsTypeDescr
in dsMap.values():
169 if dsTypeDescr.name
in allOutputs:
171 "once as output".
format(dsTypeDescr.name))
172 outputs[idx] =
set(dsTypeDescr.name
for dsTypeDescr
in dsMap.values())
173 allOutputs.update(outputs[idx])
176 connectionInputs = itertools.chain(taskDef.connections.inputs, taskDef.connections.prerequisiteInputs)
177 dsMap = [getattr(taskDef.connections, name).name
for name
in connectionInputs]
178 inputs[idx] =
set(dsMap)
179 allInputs.update(inputs[idx])
183 preExisting = allInputs - allOutputs
184 outputs[-1] = preExisting
197 thisTaskOutputs = outputs.get(idx,
set())
198 for taskInputs
in inputs.values():
199 taskInputs -= thisTaskOutputs
202 topNodes = [key
for key, value
in inputs.items()
if not value]
214 for idx, inputNames
in inputs.items():
215 taskName = pipeline[idx].label
216 outputNames = outputs[idx]
217 edge =
" {} -> {} -> {}".
format(inputNames, taskName, outputNames)
221 return [pipeline[idx]
for idx
in result]