21 from __future__
import annotations
23 """Module defining quantum graph classes and related methods.
25 There could be different representations of the quantum graph depending
26 on the client needs. Presently this module contains graph implementation
27 which is based on requirements of command-line environment. In the future
28 we could add other implementations and methods to convert between those
33 __all__ = [
"QuantumGraph",
"QuantumGraphTaskNodes",
"QuantumIterData"]
38 from itertools
import chain
39 from dataclasses
import dataclass
41 from typing
import List, FrozenSet, Mapping
46 from .pipeline
import TaskDef
47 from .pipeTools
import orderPipeline
48 from lsst.daf.butler
import DatasetRef, DatasetType, NamedKeyDict, Quantum
61 """Helper class for iterating over quanta in a graph.
63 The `QuantumGraph.traverse` method needs to return topologically ordered
64 Quanta together with their dependencies. This class is used as a value
65 for the iterator, it contains enumerated Quantum and its dependencies.
68 __slots__ = [
"index",
"quantum",
"taskDef",
"dependencies"]
71 """Index of this Quantum, a unique but arbitrary integer."""
74 """Quantum corresponding to a graph node."""
77 """Task class to be run on this quantum, and corresponding label and
81 dependencies: FrozenSet(int)
82 """Possibly empty set of indices of dependencies for this Quantum.
83 Dependencies include other nodes in the graph; they do not reflect data
84 already in butler (there are no graph nodes for those).
90 """QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph
91 corresponding to a single task.
93 The node in quantum graph is represented by the `PipelineTask` and a
94 single `~lsst.daf.butler.Quantum` instance. One possible representation
95 of the graph is just a list of nodes without edges (edges can be deduced
96 from nodes' quantum inputs and outputs if needed). That representation can
97 be reduced to the list of PipelineTasks (or their corresponding TaskDefs)
98 and the corresponding list of Quanta. This class is used in this reduced
99 representation for a single task, and full `QuantumGraph` is a sequence of
100 tinstances of this class for one or more tasks.
102 Different frameworks may use different graph representation, this
103 representation was based mostly on requirements of command-line
104 executor which does not need explicit edges information.
108 """Task defintion for this set of nodes."""
110 quanta: List[Quantum]
111 """List of quanta corresponding to the task."""
113 initInputs: Mapping[DatasetType, DatasetRef]
114 """Datasets that must be loaded or created to construct this task."""
116 initOutputs: Mapping[DatasetType, DatasetRef]
117 """Datasets that may be written after constructing this task."""
121 """QuantumGraph is a sequence of `QuantumGraphTaskNodes` objects.
123 Typically the order of the tasks in the list will be the same as the
124 order of tasks in a pipeline (obviously depends on the code which
129 iterable : iterable of `QuantumGraphTaskNodes`, optional
130 Initial sequence of per-task nodes.
133 list.__init__(self, iterable
or [])
138 initInputs: NamedKeyDict
139 """Datasets that must be provided to construct one or more Tasks in this
140 graph, and must be obtained from the data repository.
142 This is disjoint with both `initIntermediates` and `initOutputs`.
145 initIntermediates: NamedKeyDict
146 """Datasets that must be provided to construct one or more Tasks in this
147 graph, but are also produced after constructing a Task in this graph.
149 This is disjoint with both `initInputs` and `initOutputs`.
152 initOutputs: NamedKeyDict
153 """Datasets that are produced after constructing a Task in this graph,
154 and are not used to construct any other Task in this graph.
156 This is disjoint from both `initInputs` and `initIntermediates`.
161 """Read QuantumGraph from a file that was made by `save`.
165 file : `io.BufferedIOBase`
166 File with pickle data open in binary mode.
167 universe: `~lsst.daf.butler.DimensionUniverse`
168 DimensionUniverse instance, not used by the method itself but
169 needed to ensure that registry data structures are initialized.
173 graph : `QuantumGraph`
174 Resulting QuantumGraph instance.
179 Raised if pickle contains instance of a type other than
184 Reading Quanta from pickle requires existence of singleton
185 DimensionUniverse which is usually instantiated during Registry
186 initializaion. To make sure that DimensionUniverse exists this method
187 accepts dummy DimensionUniverse argument.
189 qgraph = pickle.load(file)
190 if not isinstance(qgraph, QuantumGraph):
191 raise TypeError(f
"QuantumGraph pickle file has contains unexpected object type: {type(qgraph)}")
195 """Save QuantumGraph to a file.
197 Presently we store QuantumGraph in pickle format, this could
198 potentially change in the future if better format is found.
202 file : `io.BufferedIOBase`
203 File to write pickle data open in binary mode.
205 pickle.dump(self, file)
208 """Iterator over quanta in a graph.
210 Quanta are returned in unspecified order.
215 Task definition for a Quantum.
216 quantum : `~lsst.daf.butler.Quantum`
219 for taskNodes
in self:
220 taskDef = taskNodes.taskDef
221 for quantum
in taskNodes.quanta:
222 yield taskDef, quantum
225 """Iterator over quanta in a graph.
227 QuantumGraph containing individual quanta are returned.
231 graph : `QuantumGraph`
233 for taskDef, quantum
in self.
quanta():
235 quantum.initInputs, quantum.outputs)
240 """Return total count of quanta in a graph.
245 Number of quanta in a graph.
247 return sum(len(taskNodes.quanta)
for taskNodes
in self)
250 """Return topologically ordered Quanta and their dependencies.
252 This method iterates over all Quanta in topological order, enumerating
253 them during iteration. Returned `QuantumIterData` object contains
254 Quantum instance, its ``index`` and the ``index`` of all its
255 prerequsites (Quanta that produce inputs for this Quantum):
257 - the ``index`` values are generated by an iteration of a
258 QuantumGraph, and are not intrinsic to the QuantumGraph
259 - during iteration, each ID will appear in index before it ever
260 appears in dependencies.
264 quantumData : `QuantumIterData`
267 def orderedTaskNodes(graph):
268 """Return topologically ordered task nodes.
272 nodes : `QuantumGraphTaskNodes`
277 nodesMap = {
id(item.taskDef): item
for item
in graph}
279 for taskDef
in pipeline:
280 yield nodesMap[
id(taskDef)]
284 for nodes
in orderedTaskNodes(self):
285 for quantum
in nodes.quanta:
289 for dataRef
in chain.from_iterable(quantum.predictedInputs.values()):
291 if dataRef.id
is None:
293 name, component = dataRef.datasetType.nameAndComponent()
294 key = (name, dataRef.dataId)
296 prereq.append(outputs[key])
304 if not (len(self) == 1
and len(self[0].quanta) == 1):
308 for dataRef
in chain.from_iterable(quantum.outputs.values()):
309 key = (dataRef.datasetType.name, dataRef.dataId)
312 yield QuantumIterData(index=index, quantum=quantum, taskDef=nodes.taskDef,
313 dependencies=frozenset(prereq))