LSSTApplications  18.1.0
LSSTDataManagementBasePackage
pipeTools.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining few methods to manipulate or query pipelines.
23 """
24 
25 # No one should do import * from this module
26 __all__ = ["isPipelineOrdered", "orderPipeline"]
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 
32 # -----------------------------
33 # Imports for other modules --
34 # -----------------------------
35 from .pipeline import Pipeline
36 
37 # ----------------------------------
38 # Local non-exported definitions --
39 # ----------------------------------
40 
41 
42 def _loadTaskClass(taskDef, taskFactory):
43  """Import task class if necessary.
44 
45  Raises
46  ------
47  `ImportError` is raised when task class cannot be imported.
48  `MissingTaskFactoryError` is raised when TaskFactory is needed but not provided.
49  """
50  taskClass = taskDef.taskClass
51  if not taskClass:
52  if not taskFactory:
53  raise MissingTaskFactoryError("Task class is not defined but task "
54  "factory instance is not provided")
55  taskClass = taskFactory.loadTaskClass(taskDef.taskName)
56  return taskClass
57 
58 # ------------------------
59 # Exported definitions --
60 # ------------------------
61 
62 
63 class MissingTaskFactoryError(Exception):
64  """Exception raised when client fails to provide TaskFactory instance.
65  """
66  pass
67 
68 
69 class DuplicateOutputError(Exception):
70  """Exception raised when Pipeline has more than one task for the same
71  output.
72  """
73  pass
74 
75 
76 class PipelineDataCycleError(Exception):
77  """Exception raised when Pipeline has data dependency cycle.
78  """
79  pass
80 
81 
82 def isPipelineOrdered(pipeline, taskFactory=None):
83  """Checks whether tasks in pipeline are correctly ordered.
84 
85  Pipeline is correctly ordered if for any DatasetType produced by a task
86  in a pipeline all its consumer tasks are located after producer.
87 
88  Parameters
89  ----------
90  pipeline : `pipe.base.Pipeline`
91  Pipeline description.
92  taskFactory: `pipe.base.TaskFactory`, optional
93  Instance of an object which knows how to import task classes. It is only
94  used if pipeline task definitions do not define task classes.
95 
96  Returns
97  -------
98  True for correctly ordered pipeline, False otherwise.
99 
100  Raises
101  ------
102  `ImportError` is raised when task class cannot be imported.
103  `DuplicateOutputError` is raised when there is more than one producer for a
104  dataset type.
105  `MissingTaskFactoryError` is raised when TaskFactory is needed but not
106  provided.
107  """
108  # Build a map of DatasetType name to producer's index in a pipeline
109  producerIndex = {}
110  for idx, taskDef in enumerate(pipeline):
111 
112  # we will need task class for next operations, make sure it is loaded
113  taskDef.taskClass = _loadTaskClass(taskDef, taskFactory)
114 
115  # get task output DatasetTypes, this can only be done via class method
116  outputs = taskDef.taskClass.getOutputDatasetTypes(taskDef.config)
117  for dsTypeDescr in outputs.values():
118  dsType = dsTypeDescr.datasetType
119  if dsType.name in producerIndex:
120  raise DuplicateOutputError("DatasetType `{}' appears more than "
121  "once as" " output".format(dsType.name))
122  producerIndex[dsType.name] = idx
123 
124  # check all inputs that are also someone's outputs
125  for idx, taskDef in enumerate(pipeline):
126 
127  # get task input DatasetTypes, this can only be done via class method
128  inputs = taskDef.taskClass.getInputDatasetTypes(taskDef.config)
129  for dsTypeDescr in inputs.values():
130  dsType = dsTypeDescr.datasetType
131  # all pre-existing datasets have effective index -1
132  prodIdx = producerIndex.get(dsType.name, -1)
133  if prodIdx >= idx:
134  # not good, producer is downstream
135  return False
136 
137  return True
138 
139 
140 def orderPipeline(pipeline, taskFactory=None):
141  """Re-order tasks in pipeline to satisfy data dependencies.
142 
143  When possible new ordering keeps original relative order of the tasks.
144 
145  Parameters
146  ----------
147  pipeline : `pipe.base.Pipeline`
148  Pipeline description.
149  taskFactory: `pipe.base.TaskFactory`, optional
150  Instance of an object which knows how to import task classes. It is only
151  used if pipeline task definitions do not define task classes.
152 
153  Returns
154  -------
155  Correctly ordered pipeline (`pipe.base.Pipeline` instance).
156 
157  Raises
158  ------
159  `ImportError` is raised when task class cannot be imported.
160  `DuplicateOutputError` is raised when there is more than one producer for a
161  dataset type.
162  `PipelineDataCycleError` is also raised when pipeline has dependency cycles.
163  `MissingTaskFactoryError` is raised when TaskFactory is needed but not
164  provided.
165  """
166 
167  # This is a modified version of Kahn's algorithm that preserves order
168 
169  # build mapping of the tasks to their inputs and outputs
170  inputs = {} # maps task index to its input DatasetType names
171  outputs = {} # maps task index to its output DatasetType names
172  allInputs = set() # all inputs of all tasks
173  allOutputs = set() # all outputs of all tasks
174  for idx, taskDef in enumerate(pipeline):
175 
176  # we will need task class for next operations, make sure it is loaded
177  taskClass = _loadTaskClass(taskDef, taskFactory)
178 
179  # task outputs
180  dsMap = taskClass.getOutputDatasetTypes(taskDef.config)
181  for dsTypeDescr in dsMap.values():
182  dsType = dsTypeDescr.datasetType
183  if dsType.name in allOutputs:
184  raise DuplicateOutputError("DatasetType `{}' appears more than "
185  "once as" " output".format(dsType.name))
186  outputs[idx] = set(dsTypeDescr.datasetType.name for dsTypeDescr in dsMap.values())
187  allOutputs.update(outputs[idx])
188 
189  # task inputs
190  dsMap = taskClass.getInputDatasetTypes(taskDef.config)
191  inputs[idx] = set(dsTypeDescr.datasetType.name for dsTypeDescr in dsMap.values())
192  allInputs.update(inputs[idx])
193 
194  # for simplicity add pseudo-node which is a producer for all pre-existing
195  # inputs, its index is -1
196  preExisting = allInputs - allOutputs
197  outputs[-1] = preExisting
198 
199  # Set of nodes with no incoming edges, initially set to pseudo-node
200  queue = [-1]
201  result = []
202  while queue:
203 
204  # move to final list, drop -1
205  idx = queue.pop(0)
206  if idx >= 0:
207  result.append(idx)
208 
209  # remove task outputs from other tasks inputs
210  thisTaskOutputs = outputs.get(idx, set())
211  for taskInputs in inputs.values():
212  taskInputs -= thisTaskOutputs
213 
214  # find all nodes with no incoming edges and move them to the queue
215  topNodes = [key for key, value in inputs.items() if not value]
216  queue += topNodes
217  for key in topNodes:
218  del inputs[key]
219 
220  # keep queue ordered
221  queue.sort()
222 
223  # if there is something left it means cycles
224  if inputs:
225  # format it in usable way
226  loops = []
227  for idx, inputNames in inputs.items():
228  taskName = pipeline[idx].label
229  outputNames = outputs[idx]
230  edge = " {} -> {} -> {}".format(inputNames, taskName, outputNames)
231  loops.append(edge)
232  raise PipelineDataCycleError("Pipeline has data cycles:\n" + "\n".join(loops))
233 
234  return Pipeline(pipeline[idx] for idx in result)
daf::base::PropertySet * set
Definition: fits.cc:884
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def isPipelineOrdered(pipeline, taskFactory=None)
Definition: pipeTools.py:82
def orderPipeline(pipeline, taskFactory=None)
Definition: pipeTools.py:140