21 from __future__
import annotations
23 __all__ = (
"QuantumGraph",
"IncompatibleGraphError")
25 from collections
import defaultdict
27 from itertools
import chain, count
30 from networkx.drawing.nx_agraph
import write_dot
34 from typing
import (DefaultDict, Dict, FrozenSet, Iterable, List, Mapping, Set, Generator, Optional, Tuple,
37 from ..connections
import iterConnections
38 from ..pipeline
import TaskDef
39 from lsst.daf.butler
import Quantum, DatasetRef
41 from ._implDetails
import _DatasetTracker, DatasetTypeName
42 from .quantumNode
import QuantumNode, NodeId, BuildId
44 _T = TypeVar(
"_T", bound=
"QuantumGraph")
48 """Exception class to indicate that a lookup by NodeId is impossible due
55 """QuantumGraph is a directed acyclic graph of `QuantumNode` objects
57 This data structure represents a concrete workflow generated from a
62 quanta : Mapping of `TaskDef` to sets of `Quantum`
63 This maps tasks (and their configs) to the sets of data they are to
66 def __init__(self, quanta: Mapping[TaskDef, Set[Quantum]]):
69 def _buildGraphs(self,
70 quanta: Mapping[TaskDef, Set[Quantum]],
72 _quantumToNodeId: Optional[Mapping[Quantum, NodeId]] =
None,
73 _buildId: Optional[BuildId] =
None):
74 """Builds the graph that is used to store the relation between tasks,
75 and the graph that holds the relations between quanta
78 self.
_buildId = _buildId
if _buildId
is not None else BuildId(f
"{time.time()}-{os.getpid()}")
82 self.
_datasetDict = _DatasetTracker[DatasetTypeName, TaskDef]()
85 nodeNumberGenerator = count()
86 self._nodeIdMap: Dict[NodeId, QuantumNode] = {}
89 connections = taskDef.connections
94 for inpt
in iterConnections(connections, (
"inputs",
"prerequisiteInputs",
"initInputs")):
105 self.
_count += len(quantumSet)
106 for quantum
in quantumSet:
108 nodeId = _quantumToNodeId.get(quantum)
110 raise ValueError(
"If _quantuMToNodeNumber is not None, all quanta must have an "
111 "associated value in the mapping")
115 inits = quantum.initInputs.values()
116 inputs = quantum.inputs.values()
118 self._nodeIdMap[nodeId] = value
120 for dsRef
in chain(inits, inputs):
124 if isinstance(dsRef, Iterable):
129 for dsRef
in chain.from_iterable(quantum.outputs.values()):
140 """Return a graph representing the relations between the tasks inside
145 taskGraph : `networkx.Digraph`
146 Internal datastructure that holds relations of `TaskDef` objects
152 """Return a graph representing the relations between all the
153 `QuantumNode` objects. Largely it should be preferred to iterate
154 over, and use methods of this class, but sometimes direct access to
155 the networkx object may be helpful
159 graph : `networkx.Digraph`
160 Internal datastructure that holds relations of `QuantumNode`
167 """Make a `list` of all `QuantumNode` objects that are 'input' nodes
168 to the graph, meaning those nodes to not depend on any other nodes in
173 inputNodes : iterable of `QuantumNode`
174 A list of nodes that are inputs to the graph
180 """Make a `list` of all `QuantumNode` objects that are 'output' nodes
181 to the graph, meaning those nodes have no nodes that depend them in
186 outputNodes : iterable of `QuantumNode`
187 A list of nodes that are outputs of the graph
193 """Return all the `DatasetTypeName` objects that are contained inside
198 tuple of `DatasetTypeName`
199 All the data set type names that are present in the graph
205 """Return True if all of the nodes in the graph are connected, ignores
206 directionality of connections.
211 """Lookup a `QuantumNode` from an id associated with the node.
216 The number associated with a node
221 The node corresponding with input number
226 Raised if the requested nodeId is not in the graph.
227 IncompatibleGraphError
228 Raised if the nodeId was built with a different graph than is not
229 this instance (or a graph instance that produced this instance
230 through and operation such as subset)
234 return self._nodeIdMap[nodeId]
237 """Return all the `Quantum` associated with a `TaskDef`.
242 The `TaskDef` for which `Quantum` are to be queried
246 frozenset of `Quantum`
247 The `set` of `Quantum` that is associated with the specified
250 return frozenset(self.
_quanta[taskDef])
253 """Find all tasks that have the specified dataset type name as an
258 datasetTypeName : `str`
259 A string representing the name of a dataset type to be queried,
260 can also accept a `DatasetTypeName` which is a `NewType` of str for
261 type safety in static type checking.
265 tasks : iterable of `TaskDef`
266 `TaskDef` objects that have the specified `DatasetTypeName` as an
267 input, list will be empty if no tasks use specified
268 `DatasetTypeName` as an input.
273 Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
275 return (c
for c
in self.
_datasetDict.getInputs(datasetTypeName))
278 """Find all tasks that have the specified dataset type name as an
283 datasetTypeName : `str`
284 A string representing the name of a dataset type to be queried,
285 can also accept a `DatasetTypeName` which is a `NewType` of str for
286 type safety in static type checking.
291 `TaskDef` that outputs `DatasetTypeName` as an output or None if
292 none of the tasks produce this `DatasetTypeName`.
297 Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
302 """Find all tasks that are associated with the specified dataset type
307 datasetTypeName : `str`
308 A string representing the name of a dataset type to be queried,
309 can also accept a `DatasetTypeName` which is a `NewType` of str for
310 type safety in static type checking.
314 result : iterable of `TaskDef`
315 `TaskDef` objects that are associated with the specified
321 Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
325 if output
is not None:
326 results = chain(results, (output,))
330 """Determine which `TaskDef` objects in this graph are associated
331 with a `str` representing a task name (looks at the taskName property
332 of `TaskDef` objects).
334 Returns a list of `TaskDef` objects as a `PipelineTask` may appear
335 multiple times in a graph with different labels.
340 Name of a task to search for
344 result : list of `TaskDef`
345 List of the `TaskDef` objects that have the name specified.
346 Multiple values are returned in the case that a task is used
347 multiple times with different labels.
351 split = task.taskName.split(
'.')
352 if split[-1] == taskName:
357 """Determine which `TaskDef` objects in this graph are associated
358 with a `str` representing a tasks label.
363 Name of a task to search for
368 `TaskDef` objects that has the specified label.
371 if label == task.label:
376 """Return all the `Quantum` that contain a specified `DatasetTypeName`.
380 datasetTypeName : `str`
381 The name of the dataset type to search for as a string,
382 can also accept a `DatasetTypeName` which is a `NewType` of str for
383 type safety in static type checking.
387 result : `set` of `QuantumNode` objects
388 A `set` of `QuantumNode`s that contain specified `DatasetTypeName`
393 Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
397 result: Set[Quantum] =
set()
398 result = result.union(*(self.
_quanta[task]
for task
in tasks))
402 """Check if specified quantum appears in the graph as part of a node.
407 The quantum to search for
412 The result of searching for the quantum
414 for qset
in self.
_quanta.values():
420 """Write out the graph as a dot graph.
424 output : str or `io.BufferedIOBase`
425 Either a filesystem path to write to, or a file handle object
429 def subset(self: _T, nodes: Union[QuantumNode, Iterable[QuantumNode]]) -> _T:
430 """Create a new graph object that contains the subset of the nodes
431 specified as input. Node number is preserved.
435 nodes : `QuantumNode` or iterable of `QuantumNode`
439 graph : instance of graph type
440 An instance of the type from which the subset was created
442 if not isinstance(nodes, Iterable):
445 quantumMap = defaultdict(set)
448 for node
in quantumSubgraph:
449 quantumMap[node.taskDef].add(node.quantum)
451 newInst =
type(self)({})
452 newInst._buildGraphs(quantumMap, _quantumToNodeId={n.quantum: n.nodeId
for n
in nodes},
457 """Generate a list of subgraphs where each is connected.
461 result : list of `QuantumGraph`
462 A list of graphs that are each connected
464 return tuple(self.
subset(connectedSet)
468 """Return a set of `QuantumNode` that are direct inputs to a specified
474 The node of the graph for which inputs are to be determined
479 All the nodes that are direct inputs to specified node
484 """Return a set of `QuantumNode` that are direct outputs of a specified
490 The node of the graph for which outputs are to be determined
495 All the nodes that are direct outputs to specified node
500 """Return a graph of `QuantumNode` that are direct inputs and outputs
506 The node of the graph for which connected nodes are to be
511 graph : graph of `QuantumNode`
512 All the nodes that are directly connected to specified node
519 """Return a graph of the specified node and all the ancestor nodes
520 directly reachable by walking edges.
525 The node for which all ansestors are to be determined
529 graph of `QuantumNode`
530 Graph of node and all of its ansestors
533 predecessorNodes.add(node)
534 return self.
subset(predecessorNodes)
536 def findCycle(self) -> List[Tuple[QuantumNode, QuantumNode]]:
537 """Check a graph for the presense of cycles and returns the edges of
538 any cycles found, or an empty list if there is no cycle.
542 result : list of tuple of `QuantumNode`, `QuantumNode`
543 A list of any graph edges that form a cycle, or an empty list if
544 there is no cycle. Empty list to so support if graph.find_cycle()
545 syntax as an empty list is falsy.
549 except nx.NetworkXNoCycle:
553 """Save QuantumGraph to a file.
554 Presently we store QuantumGraph in pickle format, this could
555 potentially change in the future if better format is found.
559 file : `io.BufferedIOBase`
560 File to write pickle data open in binary mode.
562 pickle.dump(self, file)
566 """Read QuantumGraph from a file that was made by `save`.
570 file : `io.BufferedIOBase`
571 File with pickle data open in binary mode.
572 universe: `~lsst.daf.butler.DimensionUniverse`
573 DimensionUniverse instance, not used by the method itself but
574 needed to ensure that registry data structures are initialized.
578 graph : `QuantumGraph`
579 Resulting QuantumGraph instance.
584 Raised if pickle contains instance of a type other than
588 Reading Quanta from pickle requires existence of singleton
589 DimensionUniverse which is usually instantiated during Registry
590 initialization. To make sure that DimensionUniverse exists this method
591 accepts dummy DimensionUniverse argument.
593 qgraph = pickle.load(file)
594 if not isinstance(qgraph, QuantumGraph):
595 raise TypeError(f
"QuantumGraph pickle file has contains unexpected object type: {type(qgraph)}")
599 """Iterate over the `taskGraph` attribute in topological order
604 `TaskDef` objects in topological order
606 yield from nx.topological_sort(self.
taskGraph)
608 def __iter__(self) -> Generator[QuantumNode, None, None]:
618 """Stores a compact form of the graph as a list of graph nodes, and a
619 tuple of task labels and task configs. The full graph can be
620 reconstructed with this information, and it preseves the ordering of
623 return {
"nodesList":
list(self)}
626 """Reconstructs the state of the graph from the information persisted
629 quanta: DefaultDict[TaskDef, Set[Quantum]] = defaultdict(set)
630 quantumToNodeId: Dict[Quantum, NodeId] = {}
631 quantumNode: QuantumNode
632 for quantumNode
in state[
'nodesList']:
633 quanta[quantumNode.taskDef].add(quantumNode.quantum)
634 quantumToNodeId[quantumNode.quantum] = quantumNode.nodeId
635 _buildId = quantumNode.nodeId.buildId
if state[
'nodesList']
else None
636 self.
_buildGraphs(quanta, _quantumToNodeId=quantumToNodeId, _buildId=_buildId)
639 if not isinstance(other, QuantumGraph):
641 if len(self) != len(other):
644 if node
not in other: