22 """Module defining few methods to manipulate or query pipelines. 
   26 __all__ = [
"isPipelineOrdered", 
"orderPipeline"]
 
   36 from .connections 
import iterConnections
 
   43 def _loadTaskClass(taskDef, taskFactory):
 
   44     """Import task class if necessary. 
   48     `ImportError` is raised when task class cannot be imported. 
   49     `MissingTaskFactoryError` is raised when TaskFactory is needed but not 
   52     taskClass = taskDef.taskClass
 
   56                                           "factory instance is not provided")
 
   57         taskClass = taskFactory.loadTaskClass(taskDef.taskName)
 
   66     """Exception raised when client fails to provide TaskFactory instance. 
   71 class DuplicateOutputError(Exception):
 
   72     """Exception raised when Pipeline has more than one task for the same 
   79     """Exception raised when Pipeline has data dependency cycle. 
   85     """Checks whether tasks in pipeline are correctly ordered. 
   87     Pipeline is correctly ordered if for any DatasetType produced by a task 
   88     in a pipeline all its consumer tasks are located after producer. 
   92     pipeline : `pipe.base.Pipeline` 
   94     taskFactory: `pipe.base.TaskFactory`, optional 
   95         Instance of an object which knows how to import task classes. It is 
   96         only used if pipeline task definitions do not define task classes. 
  100     True for correctly ordered pipeline, False otherwise. 
  104     `ImportError` is raised when task class cannot be imported. 
  105     `DuplicateOutputError` is raised when there is more than one producer for a 
  107     `MissingTaskFactoryError` is raised when TaskFactory is needed but not 
  112     for idx, taskDef 
in enumerate(pipeline):
 
  115             if attr.name 
in producerIndex:
 
  116                 raise DuplicateOutputError(
"DatasetType `{}' appears more than " 
  117                                            "once as output".
format(attr.name))
 
  118             producerIndex[attr.name] = idx
 
  121     for idx, taskDef 
in enumerate(pipeline):
 
  124         inputs = {name: getattr(taskDef.connections, name) 
for name 
in taskDef.connections.inputs}
 
  125         for dsTypeDescr 
in inputs.values():
 
  127             prodIdx = producerIndex.get(dsTypeDescr.name, -1)
 
  136     """Re-order tasks in pipeline to satisfy data dependencies. 
  138     When possible new ordering keeps original relative order of the tasks. 
  142     pipeline : `list` of `pipe.base.TaskDef` 
  143         Pipeline description. 
  147     Correctly ordered pipeline (`list` of `pipe.base.TaskDef` objects). 
  151     `DuplicateOutputError` is raised when there is more than one producer for a 
  153     `PipelineDataCycleError` is also raised when pipeline has dependency 
  154     cycles.  `MissingTaskFactoryError` is raised when TaskFactory is needed but 
  165     for idx, taskDef 
in enumerate(pipeline):
 
  167         dsMap = {name: getattr(taskDef.connections, name) 
for name 
in taskDef.connections.outputs}
 
  168         for dsTypeDescr 
in dsMap.values():
 
  169             if dsTypeDescr.name 
in allOutputs:
 
  171                                            "once as output".
format(dsTypeDescr.name))
 
  172         outputs[idx] = 
set(dsTypeDescr.name 
for dsTypeDescr 
in dsMap.values())
 
  173         allOutputs.update(outputs[idx])
 
  176         connectionInputs = itertools.chain(taskDef.connections.inputs, taskDef.connections.prerequisiteInputs)
 
  177         dsMap = [getattr(taskDef.connections, name).name 
for name 
in connectionInputs]
 
  178         inputs[idx] = 
set(dsMap)
 
  179         allInputs.update(inputs[idx])
 
  183     preExisting = allInputs - allOutputs
 
  184     outputs[-1] = preExisting
 
  197         thisTaskOutputs = outputs.get(idx, 
set())
 
  198         for taskInputs 
in inputs.values():
 
  199             taskInputs -= thisTaskOutputs
 
  202         topNodes = [key 
for key, value 
in inputs.items() 
if not value]
 
  214         for idx, inputNames 
in inputs.items():
 
  215             taskName = pipeline[idx].label
 
  216             outputNames = outputs[idx]
 
  217             edge = 
"   {} -> {} -> {}".
format(inputNames, taskName, outputNames)
 
  221     return [pipeline[idx] 
for idx 
in result]