22 """Module defining quantum graph classes and related methods. 24 There could be different representations of the quantum graph depending 25 on the client needs. Presently this module contains graph implementation 26 which is based on requirements of command-line environment. In the future 27 we could add other implementations and methods to convert between those 32 __all__ = [
"QuantumGraph",
"QuantumGraphTaskNodes",
"QuantumIterData"]
37 from itertools
import chain
42 from .pipeline
import Pipeline
43 from .pipeTools
import orderPipeline
44 from lsst.daf.butler
import DataId
56 """Helper class for iterating over quanta in a graph. 58 `QuantumGraph.traverse` method needs to return topologically ordered 59 Quanta together with their dependencies. This class is used as a value 60 for iterator, it contains enumerated Quantum and its dependencies. 65 Index of this Quantum, unique but arbitrary integer. 66 quantum : `~lsst.daf.butler.Quantum` 67 Quantum corresponding to a graph node. 69 Task to be run on this quantum. 70 dependencies : iterable of `int` 71 Possibly empty sequence of indices of dependencies for this Quantum. 72 Prerequisites include other nodes in the graph; they do not reflect 73 data already in butler (there are no graph nodes for those). 76 __slots__ = [
"quantumId",
"quantum",
"taskDef",
"dependencies"]
78 def __init__(self, quantumId, quantum, taskDef, dependencies):
91 """QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph 92 corresponding to a single task. 94 The node in quantum graph is represented by the `PipelineTask` and a 95 single `~lsst.daf.butler.Quantum` instance. One possible representation 96 of the graph is just a list of nodes without edges (edges can be deduced 97 from nodes' quantum inputs and outputs if needed). That representation can 98 be reduced to the list of PipelineTasks (or their corresponding TaskDefs) 99 and the corresponding list of Quanta. This class is used in this reduced 100 representation for a single task, and full `QuantumGraph` is a sequence of 101 tinstances of this class for one or more tasks. 103 Different frameworks may use different graph representation, this 104 representation was based mostly on requirements of command-line 105 executor which does not need explicit edges information. 110 Task defintion for this set of nodes. 111 quanta : `list` of `~lsst.daf.butler.Quantum` 112 List of quanta corresponding to the task. 120 """QuantumGraph is a sequence of `QuantumGraphTaskNodes` objects. 122 Typically the order of the tasks in the list will be the same as the 123 order of tasks in a pipeline (obviously depends on the code which 128 iterable : iterable of `QuantumGraphTaskNodes`, optional 129 Initial sequence of per-task nodes. 132 list.__init__(self, iterable
or [])
139 """Iterator over quanta in a graph. 141 Quanta are returned in unspecified order. 146 Task definition for a Quantum. 147 quantum : `~lsst.daf.butler.Quantum` 150 for taskNodes
in self:
151 taskDef = taskNodes.taskDef
152 for quantum
in taskNodes.quanta:
153 yield taskDef, quantum
156 """Return topologically ordered Quanta and their dependencies. 158 This method iterates over all Quanta in topological order, enumerating 159 them during iteration. Returned `QuantumIterData` object contains 160 Quantum instance, its ``quantumId`` and ``quantumId`` of all its 161 prerequsites (Quanta that produce inputs for this Quantum): 162 - the ``quantumId`` values are generated by an iteration of a 163 QuantumGraph, and are not intrinsic to the QuantumGraph 164 - during iteration, each ID will appear in quantumId before it ever 165 appears in dependencies. 169 quantumData : `QuantumIterData` 172 def orderedTaskNodes(graph):
173 """Return topologically ordered task nodes. 177 nodes : `QuantumGraphTaskNodes` 182 nodesMap = {
id(item.taskDef): item
for item
in graph}
184 for taskDef
in pipeline:
185 yield nodesMap[
id(taskDef)]
189 for nodes
in orderedTaskNodes(self):
190 for quantum
in nodes.quanta:
194 for dataRef
in chain.from_iterable(quantum.predictedInputs.values()):
196 if dataRef.id
is None:
197 key = (dataRef.datasetType.name, DataId(dataRef.dataId))
199 prereq.append(outputs[key])
207 if not (len(self) == 1
and len(self[0].quanta) == 1):
211 for dataRef
in chain.from_iterable(quantum.outputs.values()):
212 key = (dataRef.datasetType.name, DataId(dataRef.dataId))
218 def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True):
222 total.add(dsRef.datasetType)
225 total.add(dsRef.datasetType)
def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True)
def __init__(self, quantumId, quantum, taskDef, dependencies)
daf::base::PropertySet * set
def __init__(self, iterable=None)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
daf::base::PropertyList * list
def __init__(self, taskDef, quanta)