LSSTApplications  18.1.0
LSSTDataManagementBasePackage
graph.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining quantum graph classes and related methods.
23 
24 There could be different representations of the quantum graph depending
25 on the client needs. Presently this module contains graph implementation
26 which is based on requirements of command-line environment. In the future
27 we could add other implementations and methods to convert between those
28 representations.
29 """
30 
31 # "exported" names
32 __all__ = ["QuantumGraph", "QuantumGraphTaskNodes", "QuantumIterData"]
33 
34 # -------------------------------
35 # Imports of standard modules --
36 # -------------------------------
37 from itertools import chain
38 
39 # -----------------------------
40 # Imports for other modules --
41 # -----------------------------
42 from .pipeline import Pipeline
43 from .pipeTools import orderPipeline
44 from lsst.daf.butler import DataId
45 
46 # ----------------------------------
47 # Local non-exported definitions --
48 # ----------------------------------
49 
50 # ------------------------
51 # Exported definitions --
52 # ------------------------
53 
54 
56  """Helper class for iterating over quanta in a graph.
57 
58  `QuantumGraph.traverse` method needs to return topologically ordered
59  Quanta together with their dependencies. This class is used as a value
60  for iterator, it contains enumerated Quantum and its dependencies.
61 
62  Parameters
63  ----------
64  quantumId : `int`
65  Index of this Quantum, unique but arbitrary integer.
66  quantum : `~lsst.daf.butler.Quantum`
67  Quantum corresponding to a graph node.
68  taskDef : `TaskDef`
69  Task to be run on this quantum.
70  dependencies : iterable of `int`
71  Possibly empty sequence of indices of dependencies for this Quantum.
72  Prerequisites include other nodes in the graph; they do not reflect
73  data already in butler (there are no graph nodes for those).
74  """
75 
76  __slots__ = ["quantumId", "quantum", "taskDef", "dependencies"]
77 
78  def __init__(self, quantumId, quantum, taskDef, dependencies):
79  self.quantumId = quantumId
80  self.quantum = quantum
81  self.taskDef = taskDef
82  self.dependencies = frozenset(dependencies)
83 
84  def __str__(self):
85  return "QuantumIterData({}, {}, {})".format(self.quantumId,
86  self.taskDef,
87  self.dependencies)
88 
89 
91  """QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph
92  corresponding to a single task.
93 
94  The node in quantum graph is represented by the `PipelineTask` and a
95  single `~lsst.daf.butler.Quantum` instance. One possible representation
96  of the graph is just a list of nodes without edges (edges can be deduced
97  from nodes' quantum inputs and outputs if needed). That representation can
98  be reduced to the list of PipelineTasks (or their corresponding TaskDefs)
99  and the corresponding list of Quanta. This class is used in this reduced
100  representation for a single task, and full `QuantumGraph` is a sequence of
101  tinstances of this class for one or more tasks.
102 
103  Different frameworks may use different graph representation, this
104  representation was based mostly on requirements of command-line
105  executor which does not need explicit edges information.
106 
107  Attributes
108  ----------
109  taskDef : `TaskDef`
110  Task defintion for this set of nodes.
111  quanta : `list` of `~lsst.daf.butler.Quantum`
112  List of quanta corresponding to the task.
113  """
114  def __init__(self, taskDef, quanta):
115  self.taskDef = taskDef
116  self.quanta = quanta
117 
118 
120  """QuantumGraph is a sequence of `QuantumGraphTaskNodes` objects.
121 
122  Typically the order of the tasks in the list will be the same as the
123  order of tasks in a pipeline (obviously depends on the code which
124  constructs graph).
125 
126  Parameters
127  ----------
128  iterable : iterable of `QuantumGraphTaskNodes`, optional
129  Initial sequence of per-task nodes.
130  """
131  def __init__(self, iterable=None):
132  list.__init__(self, iterable or [])
133  self.initInputs = []
134  self.initOutputs = []
135  self._inputDatasetTypes = set()
136  self._outputDatasetTypes = set()
137 
138  def quanta(self):
139  """Iterator over quanta in a graph.
140 
141  Quanta are returned in unspecified order.
142 
143  Yields
144  ------
145  taskDef : `TaskDef`
146  Task definition for a Quantum.
147  quantum : `~lsst.daf.butler.Quantum`
148  Single quantum.
149  """
150  for taskNodes in self:
151  taskDef = taskNodes.taskDef
152  for quantum in taskNodes.quanta:
153  yield taskDef, quantum
154 
155  def traverse(self):
156  """Return topologically ordered Quanta and their dependencies.
157 
158  This method iterates over all Quanta in topological order, enumerating
159  them during iteration. Returned `QuantumIterData` object contains
160  Quantum instance, its ``quantumId`` and ``quantumId`` of all its
161  prerequsites (Quanta that produce inputs for this Quantum):
162  - the ``quantumId`` values are generated by an iteration of a
163  QuantumGraph, and are not intrinsic to the QuantumGraph
164  - during iteration, each ID will appear in quantumId before it ever
165  appears in dependencies.
166 
167  Yields
168  ------
169  quantumData : `QuantumIterData`
170  """
171 
172  def orderedTaskNodes(graph):
173  """Return topologically ordered task nodes.
174 
175  Yields
176  ------
177  nodes : `QuantumGraphTaskNodes`
178  """
179  # Tasks in a graph are probably topologically sorted already but there
180  # is no guarantee for that. Just re-construct Pipeline and order tasks
181  # in a pipeline using existing method.
182  nodesMap = {id(item.taskDef): item for item in graph}
183  pipeline = orderPipeline(Pipeline(item.taskDef for item in graph))
184  for taskDef in pipeline:
185  yield nodesMap[id(taskDef)]
186 
187  index = 0
188  outputs = {} # maps (DatasetType.name, DataId) to its producing quantum index
189  for nodes in orderedTaskNodes(self):
190  for quantum in nodes.quanta:
191 
192  # Find quantum dependencies (must be in `outputs` already)
193  prereq = []
194  for dataRef in chain.from_iterable(quantum.predictedInputs.values()):
195  # if data exists in butler then `id` is not None
196  if dataRef.id is None:
197  key = (dataRef.datasetType.name, DataId(dataRef.dataId))
198  try:
199  prereq.append(outputs[key])
200  except KeyError:
201  # The Quantum that makes our inputs is not in the graph,
202  # this could happen if we run on a "split graph" which is
203  # usually just one quantum. Check for number of Quanta
204  # in a graph and ignore error if it's just one.
205  # TODO: This code has to be removed or replaced with
206  # something more generic
207  if not (len(self) == 1 and len(self[0].quanta) == 1):
208  raise
209 
210  # Update `outputs` with this quantum outputs
211  for dataRef in chain.from_iterable(quantum.outputs.values()):
212  key = (dataRef.datasetType.name, DataId(dataRef.dataId))
213  outputs[key] = index
214 
215  yield QuantumIterData(index, quantum, nodes.taskDef, prereq)
216  index += 1
217 
218  def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True):
219  total = set()
220  if initInputs:
221  for dsRef in self.initInputs:
222  total.add(dsRef.datasetType)
223  if initOutputs:
224  for dsRef in self.initOutputs:
225  total.add(dsRef.datasetType)
226  if inputs:
227  total |= self._inputDatasetTypes
228  if outputs:
229  total |= self._outputDatasetTypes
230  return total
def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True)
Definition: graph.py:218
def __init__(self, quantumId, quantum, taskDef, dependencies)
Definition: graph.py:78
table::Key< int > id
Definition: Detector.cc:166
daf::base::PropertySet * set
Definition: fits.cc:884
def __init__(self, iterable=None)
Definition: graph.py:131
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def orderPipeline(pipeline, taskFactory=None)
Definition: pipeTools.py:140
daf::base::PropertyList * list
Definition: fits.cc:885
def __init__(self, taskDef, quanta)
Definition: graph.py:114