LSSTApplications  18.1.0
LSSTDataManagementBasePackage
Classes | Functions
lsst.pipe.base.pipeTools Namespace Reference

Classes

class  DuplicateOutputError
 
class  MissingTaskFactoryError
 
class  PipelineDataCycleError
 

Functions

def isPipelineOrdered (pipeline, taskFactory=None)
 
def orderPipeline (pipeline, taskFactory=None)
 

Function Documentation

◆ isPipelineOrdered()

def lsst.pipe.base.pipeTools.isPipelineOrdered (   pipeline,
  taskFactory = None 
)
Checks whether tasks in pipeline are correctly ordered.

Pipeline is correctly ordered if for any DatasetType produced by a task
in a pipeline all its consumer tasks are located after producer.

Parameters
----------
pipeline : `pipe.base.Pipeline`
    Pipeline description.
taskFactory: `pipe.base.TaskFactory`, optional
    Instance of an object which knows how to import task classes. It is only
    used if pipeline task definitions do not define task classes.

Returns
-------
True for correctly ordered pipeline, False otherwise.

Raises
------
`ImportError` is raised when task class cannot be imported.
`DuplicateOutputError` is raised when there is more than one producer for a
dataset type.
`MissingTaskFactoryError` is raised when TaskFactory is needed but not
provided.

Definition at line 82 of file pipeTools.py.

82 def isPipelineOrdered(pipeline, taskFactory=None):
83  """Checks whether tasks in pipeline are correctly ordered.
84 
85  Pipeline is correctly ordered if for any DatasetType produced by a task
86  in a pipeline all its consumer tasks are located after producer.
87 
88  Parameters
89  ----------
90  pipeline : `pipe.base.Pipeline`
91  Pipeline description.
92  taskFactory: `pipe.base.TaskFactory`, optional
93  Instance of an object which knows how to import task classes. It is only
94  used if pipeline task definitions do not define task classes.
95 
96  Returns
97  -------
98  True for correctly ordered pipeline, False otherwise.
99 
100  Raises
101  ------
102  `ImportError` is raised when task class cannot be imported.
103  `DuplicateOutputError` is raised when there is more than one producer for a
104  dataset type.
105  `MissingTaskFactoryError` is raised when TaskFactory is needed but not
106  provided.
107  """
108  # Build a map of DatasetType name to producer's index in a pipeline
109  producerIndex = {}
110  for idx, taskDef in enumerate(pipeline):
111 
112  # we will need task class for next operations, make sure it is loaded
113  taskDef.taskClass = _loadTaskClass(taskDef, taskFactory)
114 
115  # get task output DatasetTypes, this can only be done via class method
116  outputs = taskDef.taskClass.getOutputDatasetTypes(taskDef.config)
117  for dsTypeDescr in outputs.values():
118  dsType = dsTypeDescr.datasetType
119  if dsType.name in producerIndex:
120  raise DuplicateOutputError("DatasetType `{}' appears more than "
121  "once as" " output".format(dsType.name))
122  producerIndex[dsType.name] = idx
123 
124  # check all inputs that are also someone's outputs
125  for idx, taskDef in enumerate(pipeline):
126 
127  # get task input DatasetTypes, this can only be done via class method
128  inputs = taskDef.taskClass.getInputDatasetTypes(taskDef.config)
129  for dsTypeDescr in inputs.values():
130  dsType = dsTypeDescr.datasetType
131  # all pre-existing datasets have effective index -1
132  prodIdx = producerIndex.get(dsType.name, -1)
133  if prodIdx >= idx:
134  # not good, producer is downstream
135  return False
136 
137  return True
138 
139 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def isPipelineOrdered(pipeline, taskFactory=None)
Definition: pipeTools.py:82

◆ orderPipeline()

def lsst.pipe.base.pipeTools.orderPipeline (   pipeline,
  taskFactory = None 
)
Re-order tasks in pipeline to satisfy data dependencies.

When possible new ordering keeps original relative order of the tasks.

Parameters
----------
pipeline : `pipe.base.Pipeline`
    Pipeline description.
taskFactory: `pipe.base.TaskFactory`, optional
    Instance of an object which knows how to import task classes. It is only
    used if pipeline task definitions do not define task classes.

Returns
-------
Correctly ordered pipeline (`pipe.base.Pipeline` instance).

Raises
------
`ImportError` is raised when task class cannot be imported.
`DuplicateOutputError` is raised when there is more than one producer for a
dataset type.
`PipelineDataCycleError` is also raised when pipeline has dependency cycles.
`MissingTaskFactoryError` is raised when TaskFactory is needed but not
provided.

Definition at line 140 of file pipeTools.py.

140 def orderPipeline(pipeline, taskFactory=None):
141  """Re-order tasks in pipeline to satisfy data dependencies.
142 
143  When possible new ordering keeps original relative order of the tasks.
144 
145  Parameters
146  ----------
147  pipeline : `pipe.base.Pipeline`
148  Pipeline description.
149  taskFactory: `pipe.base.TaskFactory`, optional
150  Instance of an object which knows how to import task classes. It is only
151  used if pipeline task definitions do not define task classes.
152 
153  Returns
154  -------
155  Correctly ordered pipeline (`pipe.base.Pipeline` instance).
156 
157  Raises
158  ------
159  `ImportError` is raised when task class cannot be imported.
160  `DuplicateOutputError` is raised when there is more than one producer for a
161  dataset type.
162  `PipelineDataCycleError` is also raised when pipeline has dependency cycles.
163  `MissingTaskFactoryError` is raised when TaskFactory is needed but not
164  provided.
165  """
166 
167  # This is a modified version of Kahn's algorithm that preserves order
168 
169  # build mapping of the tasks to their inputs and outputs
170  inputs = {} # maps task index to its input DatasetType names
171  outputs = {} # maps task index to its output DatasetType names
172  allInputs = set() # all inputs of all tasks
173  allOutputs = set() # all outputs of all tasks
174  for idx, taskDef in enumerate(pipeline):
175 
176  # we will need task class for next operations, make sure it is loaded
177  taskClass = _loadTaskClass(taskDef, taskFactory)
178 
179  # task outputs
180  dsMap = taskClass.getOutputDatasetTypes(taskDef.config)
181  for dsTypeDescr in dsMap.values():
182  dsType = dsTypeDescr.datasetType
183  if dsType.name in allOutputs:
184  raise DuplicateOutputError("DatasetType `{}' appears more than "
185  "once as" " output".format(dsType.name))
186  outputs[idx] = set(dsTypeDescr.datasetType.name for dsTypeDescr in dsMap.values())
187  allOutputs.update(outputs[idx])
188 
189  # task inputs
190  dsMap = taskClass.getInputDatasetTypes(taskDef.config)
191  inputs[idx] = set(dsTypeDescr.datasetType.name for dsTypeDescr in dsMap.values())
192  allInputs.update(inputs[idx])
193 
194  # for simplicity add pseudo-node which is a producer for all pre-existing
195  # inputs, its index is -1
196  preExisting = allInputs - allOutputs
197  outputs[-1] = preExisting
198 
199  # Set of nodes with no incoming edges, initially set to pseudo-node
200  queue = [-1]
201  result = []
202  while queue:
203 
204  # move to final list, drop -1
205  idx = queue.pop(0)
206  if idx >= 0:
207  result.append(idx)
208 
209  # remove task outputs from other tasks inputs
210  thisTaskOutputs = outputs.get(idx, set())
211  for taskInputs in inputs.values():
212  taskInputs -= thisTaskOutputs
213 
214  # find all nodes with no incoming edges and move them to the queue
215  topNodes = [key for key, value in inputs.items() if not value]
216  queue += topNodes
217  for key in topNodes:
218  del inputs[key]
219 
220  # keep queue ordered
221  queue.sort()
222 
223  # if there is something left it means cycles
224  if inputs:
225  # format it in usable way
226  loops = []
227  for idx, inputNames in inputs.items():
228  taskName = pipeline[idx].label
229  outputNames = outputs[idx]
230  edge = " {} -> {} -> {}".format(inputNames, taskName, outputNames)
231  loops.append(edge)
232  raise PipelineDataCycleError("Pipeline has data cycles:\n" + "\n".join(loops))
233 
234  return Pipeline(pipeline[idx] for idx in result)
235 
daf::base::PropertySet * set
Definition: fits.cc:884
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def orderPipeline(pipeline, taskFactory=None)
Definition: pipeTools.py:140