LSSTApplications  20.0.0
LSSTDataManagementBasePackage
graph.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining quantum graph classes and related methods.
24 
25 There could be different representations of the quantum graph depending
26 on the client needs. Presently this module contains graph implementation
27 which is based on requirements of command-line environment. In the future
28 we could add other implementations and methods to convert between those
29 representations.
30 """
31 
32 # "exported" names
33 __all__ = ["QuantumGraph", "QuantumGraphTaskNodes", "QuantumIterData"]
34 
35 # -------------------------------
36 # Imports of standard modules --
37 # -------------------------------
38 from itertools import chain
39 from dataclasses import dataclass
40 import pickle
41 from typing import List, FrozenSet, Mapping
42 
43 # -----------------------------
44 # Imports for other modules --
45 # -----------------------------
46 from .pipeline import TaskDef
47 from .pipeTools import orderPipeline
48 from lsst.daf.butler import DatasetRef, DatasetType, NamedKeyDict, Quantum
49 
50 # ----------------------------------
51 # Local non-exported definitions --
52 # ----------------------------------
53 
54 # ------------------------
55 # Exported definitions --
56 # ------------------------
57 
58 
59 @dataclass
61  """Helper class for iterating over quanta in a graph.
62 
63  The `QuantumGraph.traverse` method needs to return topologically ordered
64  Quanta together with their dependencies. This class is used as a value
65  for the iterator, it contains enumerated Quantum and its dependencies.
66  """
67 
68  __slots__ = ["index", "quantum", "taskDef", "dependencies"]
69 
70  index: int
71  """Index of this Quantum, a unique but arbitrary integer."""
72 
73  quantum: Quantum
74  """Quantum corresponding to a graph node."""
75 
76  taskDef: TaskDef
77  """Task class to be run on this quantum, and corresponding label and
78  config.
79  """
80 
81  dependencies: FrozenSet(int)
82  """Possibly empty set of indices of dependencies for this Quantum.
83  Dependencies include other nodes in the graph; they do not reflect data
84  already in butler (there are no graph nodes for those).
85  """
86 
87 
88 @dataclass
90  """QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph
91  corresponding to a single task.
92 
93  The node in quantum graph is represented by the `PipelineTask` and a
94  single `~lsst.daf.butler.Quantum` instance. One possible representation
95  of the graph is just a list of nodes without edges (edges can be deduced
96  from nodes' quantum inputs and outputs if needed). That representation can
97  be reduced to the list of PipelineTasks (or their corresponding TaskDefs)
98  and the corresponding list of Quanta. This class is used in this reduced
99  representation for a single task, and full `QuantumGraph` is a sequence of
100  tinstances of this class for one or more tasks.
101 
102  Different frameworks may use different graph representation, this
103  representation was based mostly on requirements of command-line
104  executor which does not need explicit edges information.
105  """
106 
107  taskDef: TaskDef
108  """Task defintion for this set of nodes."""
109 
110  quanta: List[Quantum]
111  """List of quanta corresponding to the task."""
112 
113  initInputs: Mapping[DatasetType, DatasetRef]
114  """Datasets that must be loaded or created to construct this task."""
115 
116  initOutputs: Mapping[DatasetType, DatasetRef]
117  """Datasets that may be written after constructing this task."""
118 
119 
121  """QuantumGraph is a sequence of `QuantumGraphTaskNodes` objects.
122 
123  Typically the order of the tasks in the list will be the same as the
124  order of tasks in a pipeline (obviously depends on the code which
125  constructs graph).
126 
127  Parameters
128  ----------
129  iterable : iterable of `QuantumGraphTaskNodes`, optional
130  Initial sequence of per-task nodes.
131  """
132  def __init__(self, iterable=None):
133  list.__init__(self, iterable or [])
134  self.initInputs = NamedKeyDict()
135  self.initIntermediates = NamedKeyDict()
136  self.initOutputs = NamedKeyDict()
137 
138  initInputs: NamedKeyDict
139  """Datasets that must be provided to construct one or more Tasks in this
140  graph, and must be obtained from the data repository.
141 
142  This is disjoint with both `initIntermediates` and `initOutputs`.
143  """
144 
145  initIntermediates: NamedKeyDict
146  """Datasets that must be provided to construct one or more Tasks in this
147  graph, but are also produced after constructing a Task in this graph.
148 
149  This is disjoint with both `initInputs` and `initOutputs`.
150  """
151 
152  initOutputs: NamedKeyDict
153  """Datasets that are produced after constructing a Task in this graph,
154  and are not used to construct any other Task in this graph.
155 
156  This is disjoint from both `initInputs` and `initIntermediates`.
157  """
158 
159  @classmethod
160  def load(cls, file, universe):
161  """Read QuantumGraph from a file that was made by `save`.
162 
163  Parameters
164  ----------
165  file : `io.BufferedIOBase`
166  File with pickle data open in binary mode.
167  universe: `~lsst.daf.butler.DimensionUniverse`
168  DimensionUniverse instance, not used by the method itself but
169  needed to ensure that registry data structures are initialized.
170 
171  Returns
172  -------
173  graph : `QuantumGraph`
174  Resulting QuantumGraph instance.
175 
176  Raises
177  ------
178  TypeError
179  Raised if pickle contains instance of a type other than
180  QuantumGraph.
181 
182  Notes
183  -----
184  Reading Quanta from pickle requires existence of singleton
185  DimensionUniverse which is usually instantiated during Registry
186  initializaion. To make sure that DimensionUniverse exists this method
187  accepts dummy DimensionUniverse argument.
188  """
189  qgraph = pickle.load(file)
190  if not isinstance(qgraph, QuantumGraph):
191  raise TypeError(f"QuantumGraph pickle file has contains unexpected object type: {type(qgraph)}")
192  return qgraph
193 
194  def save(self, file):
195  """Save QuantumGraph to a file.
196 
197  Presently we store QuantumGraph in pickle format, this could
198  potentially change in the future if better format is found.
199 
200  Parameters
201  ----------
202  file : `io.BufferedIOBase`
203  File to write pickle data open in binary mode.
204  """
205  pickle.dump(self, file)
206 
207  def quanta(self):
208  """Iterator over quanta in a graph.
209 
210  Quanta are returned in unspecified order.
211 
212  Yields
213  ------
214  taskDef : `TaskDef`
215  Task definition for a Quantum.
216  quantum : `~lsst.daf.butler.Quantum`
217  Single quantum.
218  """
219  for taskNodes in self:
220  taskDef = taskNodes.taskDef
221  for quantum in taskNodes.quanta:
222  yield taskDef, quantum
223 
224  def quantaAsQgraph(self):
225  """Iterator over quanta in a graph.
226 
227  QuantumGraph containing individual quanta are returned.
228 
229  Yields
230  ------
231  graph : `QuantumGraph`
232  """
233  for taskDef, quantum in self.quanta():
234  node = QuantumGraphTaskNodes(taskDef, [quantum],
235  quantum.initInputs, quantum.outputs)
236  graph = QuantumGraph([node])
237  yield graph
238 
239  def countQuanta(self):
240  """Return total count of quanta in a graph.
241 
242  Returns
243  -------
244  count : `int`
245  Number of quanta in a graph.
246  """
247  return sum(len(taskNodes.quanta) for taskNodes in self)
248 
249  def traverse(self):
250  """Return topologically ordered Quanta and their dependencies.
251 
252  This method iterates over all Quanta in topological order, enumerating
253  them during iteration. Returned `QuantumIterData` object contains
254  Quantum instance, its ``index`` and the ``index`` of all its
255  prerequsites (Quanta that produce inputs for this Quantum):
256 
257  - the ``index`` values are generated by an iteration of a
258  QuantumGraph, and are not intrinsic to the QuantumGraph
259  - during iteration, each ID will appear in index before it ever
260  appears in dependencies.
261 
262  Yields
263  ------
264  quantumData : `QuantumIterData`
265  """
266 
267  def orderedTaskNodes(graph):
268  """Return topologically ordered task nodes.
269 
270  Yields
271  ------
272  nodes : `QuantumGraphTaskNodes`
273  """
274  # Tasks in a graph are probably topologically sorted already but there
275  # is no guarantee for that. Just re-construct Pipeline and order tasks
276  # in a pipeline using existing method.
277  nodesMap = {id(item.taskDef): item for item in graph}
278  pipeline = orderPipeline([item.taskDef for item in graph])
279  for taskDef in pipeline:
280  yield nodesMap[id(taskDef)]
281 
282  index = 0
283  outputs = {} # maps (DatasetType.name, dataId) to its producing quantum index
284  for nodes in orderedTaskNodes(self):
285  for quantum in nodes.quanta:
286 
287  # Find quantum dependencies (must be in `outputs` already)
288  prereq = []
289  for dataRef in chain.from_iterable(quantum.predictedInputs.values()):
290  # if data exists in butler then `id` is not None
291  if dataRef.id is None:
292  # Get the base name if this is a component
293  name, component = dataRef.datasetType.nameAndComponent()
294  key = (name, dataRef.dataId)
295  try:
296  prereq.append(outputs[key])
297  except KeyError:
298  # The Quantum that makes our inputs is not in the graph,
299  # this could happen if we run on a "split graph" which is
300  # usually just one quantum. Check for number of Quanta
301  # in a graph and ignore error if it's just one.
302  # TODO: This code has to be removed or replaced with
303  # something more generic
304  if not (len(self) == 1 and len(self[0].quanta) == 1):
305  raise
306 
307  # Update `outputs` with this quantum outputs
308  for dataRef in chain.from_iterable(quantum.outputs.values()):
309  key = (dataRef.datasetType.name, dataRef.dataId)
310  outputs[key] = index
311 
312  yield QuantumIterData(index=index, quantum=quantum, taskDef=nodes.taskDef,
313  dependencies=frozenset(prereq))
314  index += 1
lsst.pipe.base.graph.QuantumGraph
Definition: graph.py:120
lsst.pipe.base.graph.QuantumGraph.initInputs
initInputs
Definition: graph.py:134
lsst.pipe.base.graph.QuantumGraphTaskNodes
Definition: graph.py:89
lsst.pipe.base.graph.QuantumGraph.traverse
def traverse(self)
Definition: graph.py:249
id
table::Key< int > id
Definition: Detector.cc:162
lsst.pipe.base.graph.QuantumGraph.countQuanta
def countQuanta(self)
Definition: graph.py:239
lsst.pipe.base.graph.QuantumGraph.__init__
def __init__(self, iterable=None)
Definition: graph.py:132
lsst.pipe.base.graph.QuantumGraph.save
def save(self, file)
Definition: graph.py:194
lsst.pipe.base.graph.QuantumGraph.initIntermediates
initIntermediates
Definition: graph.py:135
lsst.pipe.base.graph.QuantumGraph.initOutputs
initOutputs
Definition: graph.py:136
lsst.pipe.base.pipeTools.orderPipeline
def orderPipeline(pipeline)
Definition: pipeTools.py:135
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.base.graph.QuantumGraph.quanta
def quanta(self)
Definition: graph.py:207
lsst.pipe.base.graph.QuantumGraph.load
def load(cls, file, universe)
Definition: graph.py:160
lsst.pipe.base.graph.QuantumGraph.quantaAsQgraph
def quantaAsQgraph(self)
Definition: graph.py:224
lsst.pipe.base.graph.QuantumIterData
Definition: graph.py:60