LSST Applications  22.0.1,22.0.1+01bcf6a671,22.0.1+046ee49490,22.0.1+05c7de27da,22.0.1+0c6914dbf6,22.0.1+1220d50b50,22.0.1+12fd109e95,22.0.1+1a1dd69893,22.0.1+1c910dc348,22.0.1+1ef34551f5,22.0.1+30170c3d08,22.0.1+39153823fd,22.0.1+611137eacc,22.0.1+771eb1e3e8,22.0.1+94e66cc9ed,22.0.1+9a075d06e2,22.0.1+a5ff6e246e,22.0.1+a7db719c1a,22.0.1+ba0d97e778,22.0.1+bfe1ee9056,22.0.1+c4e1e0358a,22.0.1+cc34b8281e,22.0.1+d640e2c0fa,22.0.1+d72a2e677a,22.0.1+d9a6b571bd,22.0.1+e485e9761b,22.0.1+ebe8d3385e
LSST Data Management Base Package
Public Member Functions | List of all members
lsst.pipe.base.graph.graph.QuantumGraph Class Reference

Public Member Functions

def __init__ (self, Mapping[TaskDef, Set[Quantum]] quanta)
 
nx.DiGraph taskGraph (self)
 
nx.DiGraph graph (self)
 
Iterable[QuantumNodeinputQuanta (self)
 
Iterable[QuantumNodeoutputQuanta (self)
 
Tuple[DatasetTypeName,...] allDatasetTypes (self)
 
bool isConnected (self)
 
QuantumNode getQuantumNodeByNodeId (self, NodeId nodeId)
 
FrozenSet[Quantum] getQuantaForTask (self, TaskDef taskDef)
 
FrozenSet[QuantumNodegetNodesForTask (self, TaskDef taskDef)
 
Iterable[TaskDeffindTasksWithInput (self, DatasetTypeName datasetTypeName)
 
Optional[TaskDeffindTaskWithOutput (self, DatasetTypeName datasetTypeName)
 
Iterable[TaskDeftasksWithDSType (self, DatasetTypeName datasetTypeName)
 
List[TaskDeffindTaskDefByName (self, str taskName)
 
Optional[TaskDeffindTaskDefByLabel (self, str label)
 
Set[Quantum] findQuantaWithDSType (self, DatasetTypeName datasetTypeName)
 
bool checkQuantumInGraph (self, Quantum quantum)
 
def writeDotGraph (self, Union[str, io.BufferedIOBase] output)
 
_T subset (_T self, Union[QuantumNode, Iterable[QuantumNode]] nodes)
 
Tuple[_T,...] subsetToConnected (_T self)
 
Set[QuantumNodedetermineInputsToQuantumNode (self, QuantumNode node)
 
Set[QuantumNodedetermineOutputsOfQuantumNode (self, QuantumNode node)
 
_T determineConnectionsOfQuantumNode (_T self, QuantumNode node)
 
_T determineAncestorsOfQuantumNode (_T self, QuantumNode node)
 
List[Tuple[QuantumNode, QuantumNode]] findCycle (self)
 
def saveUri (self, uri)
 
QuantumGraph loadUri (cls, Union[ButlerURI, str] uri, DimensionUniverse universe, Optional[Iterable[int]] nodes=None, Optional[BuildId] graphID=None)
 
def save (self, io.IO[bytes] file)
 
QuantumGraph load (cls, io.IO[bytes] file, DimensionUniverse universe, Optional[Iterable[int]] nodes=None, Optional[BuildId] graphID=None)
 
Generator[TaskDef, None, None] iterTaskGraph (self)
 
def graphID (self)
 
Generator[QuantumNode, None, None] __iter__ (self)
 
int __len__ (self)
 
bool __contains__ (self, QuantumNode node)
 
dict __getstate__ (self)
 
def __setstate__ (self, dict state)
 
bool __eq__ (self, object other)
 

Detailed Description

QuantumGraph is a directed acyclic graph of `QuantumNode` objects

This data structure represents a concrete workflow generated from a
`Pipeline`.

Parameters
----------
quanta : Mapping of `TaskDef` to sets of `Quantum`
    This maps tasks (and their configs) to the sets of data they are to
    process.

Definition at line 73 of file graph.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.base.graph.graph.QuantumGraph.__init__ (   self,
Mapping[TaskDef, Set[Quantum]]  quanta 
)

Definition at line 85 of file graph.py.

85  def __init__(self, quanta: Mapping[TaskDef, Set[Quantum]]):
86  self._buildGraphs(quanta)
87 

Member Function Documentation

◆ __contains__()

bool lsst.pipe.base.graph.graph.QuantumGraph.__contains__ (   self,
QuantumNode  node 
)

Definition at line 843 of file graph.py.

843  def __contains__(self, node: QuantumNode) -> bool:
844  return self._connectedQuanta.has_node(node)
845 

◆ __eq__()

bool lsst.pipe.base.graph.graph.QuantumGraph.__eq__ (   self,
object  other 
)

Definition at line 867 of file graph.py.

867  def __eq__(self, other: object) -> bool:
868  if not isinstance(other, QuantumGraph):
869  return False
870  if len(self) != len(other):
871  return False
872  for node in self:
873  if node not in other:
874  return False
875  if self.determineInputsToQuantumNode(node) != other.determineInputsToQuantumNode(node):
876  return False
877  if self.determineOutputsOfQuantumNode(node) != other.determineOutputsOfQuantumNode(node):
878  return False
879  return list(self.taskGraph) == list(other.taskGraph)
daf::base::PropertyList * list
Definition: fits.cc:913

◆ __getstate__()

dict lsst.pipe.base.graph.graph.QuantumGraph.__getstate__ (   self)
Stores a compact form of the graph as a list of graph nodes, and a
tuple of task labels and task configs. The full graph can be
reconstructed with this information, and it preseves the ordering of
the graph ndoes.

Definition at line 846 of file graph.py.

846  def __getstate__(self) -> dict:
847  """Stores a compact form of the graph as a list of graph nodes, and a
848  tuple of task labels and task configs. The full graph can be
849  reconstructed with this information, and it preseves the ordering of
850  the graph ndoes.
851  """
852  return {"nodesList": list(self)}
853 

◆ __iter__()

Generator[QuantumNode, None, None] lsst.pipe.base.graph.graph.QuantumGraph.__iter__ (   self)

Definition at line 837 of file graph.py.

837  def __iter__(self) -> Generator[QuantumNode, None, None]:
838  yield from nx.topological_sort(self._connectedQuanta)
839 

◆ __len__()

int lsst.pipe.base.graph.graph.QuantumGraph.__len__ (   self)

Definition at line 840 of file graph.py.

840  def __len__(self) -> int:
841  return self._count
842 

◆ __setstate__()

def lsst.pipe.base.graph.graph.QuantumGraph.__setstate__ (   self,
dict  state 
)
Reconstructs the state of the graph from the information persisted
in getstate.

Definition at line 854 of file graph.py.

854  def __setstate__(self, state: dict):
855  """Reconstructs the state of the graph from the information persisted
856  in getstate.
857  """
858  quanta: DefaultDict[TaskDef, Set[Quantum]] = defaultdict(set)
859  quantumToNodeId: Dict[Quantum, NodeId] = {}
860  quantumNode: QuantumNode
861  for quantumNode in state['nodesList']:
862  quanta[quantumNode.taskDef].add(quantumNode.quantum)
863  quantumToNodeId[quantumNode.quantum] = quantumNode.nodeId
864  _buildId = quantumNode.nodeId.buildId if state['nodesList'] else None # type: ignore
865  self._buildGraphs(quanta, _quantumToNodeId=quantumToNodeId, _buildId=_buildId)
866 

◆ allDatasetTypes()

Tuple[DatasetTypeName, ...] lsst.pipe.base.graph.graph.QuantumGraph.allDatasetTypes (   self)
Return all the `DatasetTypeName` objects that are contained inside
the graph.

Returns
-------
tuple of `DatasetTypeName`
    All the data set type names that are present in the graph

Definition at line 213 of file graph.py.

213  def allDatasetTypes(self) -> Tuple[DatasetTypeName, ...]:
214  """Return all the `DatasetTypeName` objects that are contained inside
215  the graph.
216 
217  Returns
218  -------
219  tuple of `DatasetTypeName`
220  All the data set type names that are present in the graph
221  """
222  return tuple(self._datasetDict.keys())
223 

◆ checkQuantumInGraph()

bool lsst.pipe.base.graph.graph.QuantumGraph.checkQuantumInGraph (   self,
Quantum  quantum 
)
Check if specified quantum appears in the graph as part of a node.

Parameters
----------
quantum : `Quantum`
    The quantum to search for

Returns
-------
`bool`
    The result of searching for the quantum

Definition at line 438 of file graph.py.

438  def checkQuantumInGraph(self, quantum: Quantum) -> bool:
439  """Check if specified quantum appears in the graph as part of a node.
440 
441  Parameters
442  ----------
443  quantum : `Quantum`
444  The quantum to search for
445 
446  Returns
447  -------
448  `bool`
449  The result of searching for the quantum
450  """
451  for qset in self._quanta.values():
452  if quantum in qset:
453  return True
454  return False
455 

◆ determineAncestorsOfQuantumNode()

_T lsst.pipe.base.graph.graph.QuantumGraph.determineAncestorsOfQuantumNode ( _T  self,
QuantumNode  node 
)
Return a graph of the specified node and all the ancestor nodes
directly reachable by walking edges.

Parameters
----------
node : `QuantumNode`
    The node for which all ansestors are to be determined

Returns
-------
graph of `QuantumNode`
    Graph of node and all of its ansestors

Definition at line 555 of file graph.py.

555  def determineAncestorsOfQuantumNode(self: _T, node: QuantumNode) -> _T:
556  """Return a graph of the specified node and all the ancestor nodes
557  directly reachable by walking edges.
558 
559  Parameters
560  ----------
561  node : `QuantumNode`
562  The node for which all ansestors are to be determined
563 
564  Returns
565  -------
566  graph of `QuantumNode`
567  Graph of node and all of its ansestors
568  """
569  predecessorNodes = nx.ancestors(self._connectedQuanta, node)
570  predecessorNodes.add(node)
571  return self.subset(predecessorNodes)
572 

◆ determineConnectionsOfQuantumNode()

_T lsst.pipe.base.graph.graph.QuantumGraph.determineConnectionsOfQuantumNode ( _T  self,
QuantumNode  node 
)
Return a graph of `QuantumNode` that are direct inputs and outputs
of a specified node.

Parameters
----------
node : `QuantumNode`
    The node of the graph for which connected nodes are to be
    determined.

Returns
-------
graph : graph of `QuantumNode`
    All the nodes that are directly connected to specified node

Definition at line 536 of file graph.py.

536  def determineConnectionsOfQuantumNode(self: _T, node: QuantumNode) -> _T:
537  """Return a graph of `QuantumNode` that are direct inputs and outputs
538  of a specified node.
539 
540  Parameters
541  ----------
542  node : `QuantumNode`
543  The node of the graph for which connected nodes are to be
544  determined.
545 
546  Returns
547  -------
548  graph : graph of `QuantumNode`
549  All the nodes that are directly connected to specified node
550  """
551  nodes = self.determineInputsToQuantumNode(node).union(self.determineOutputsOfQuantumNode(node))
552  nodes.add(node)
553  return self.subset(nodes)
554 

◆ determineInputsToQuantumNode()

Set[QuantumNode] lsst.pipe.base.graph.graph.QuantumGraph.determineInputsToQuantumNode (   self,
QuantumNode  node 
)
Return a set of `QuantumNode` that are direct inputs to a specified
node.

Parameters
----------
node : `QuantumNode`
    The node of the graph for which inputs are to be determined

Returns
-------
set of `QuantumNode`
    All the nodes that are direct inputs to specified node

Definition at line 504 of file graph.py.

504  def determineInputsToQuantumNode(self, node: QuantumNode) -> Set[QuantumNode]:
505  """Return a set of `QuantumNode` that are direct inputs to a specified
506  node.
507 
508  Parameters
509  ----------
510  node : `QuantumNode`
511  The node of the graph for which inputs are to be determined
512 
513  Returns
514  -------
515  set of `QuantumNode`
516  All the nodes that are direct inputs to specified node
517  """
518  return set(pred for pred in self._connectedQuanta.predecessors(node))
519 
daf::base::PropertySet * set
Definition: fits.cc:912

◆ determineOutputsOfQuantumNode()

Set[QuantumNode] lsst.pipe.base.graph.graph.QuantumGraph.determineOutputsOfQuantumNode (   self,
QuantumNode  node 
)
Return a set of `QuantumNode` that are direct outputs of a specified
node.

Parameters
----------
node : `QuantumNode`
    The node of the graph for which outputs are to be determined

Returns
-------
set of `QuantumNode`
    All the nodes that are direct outputs to specified node

Definition at line 520 of file graph.py.

520  def determineOutputsOfQuantumNode(self, node: QuantumNode) -> Set[QuantumNode]:
521  """Return a set of `QuantumNode` that are direct outputs of a specified
522  node.
523 
524  Parameters
525  ----------
526  node : `QuantumNode`
527  The node of the graph for which outputs are to be determined
528 
529  Returns
530  -------
531  set of `QuantumNode`
532  All the nodes that are direct outputs to specified node
533  """
534  return set(succ for succ in self._connectedQuanta.successors(node))
535 

◆ findCycle()

List[Tuple[QuantumNode, QuantumNode]] lsst.pipe.base.graph.graph.QuantumGraph.findCycle (   self)
Check a graph for the presense of cycles and returns the edges of
any cycles found, or an empty list if there is no cycle.

Returns
-------
result : list of tuple of `QuantumNode`, `QuantumNode`
    A list of any graph edges that form a cycle, or an empty list if
    there is no cycle. Empty list to so support if graph.find_cycle()
    syntax as an empty list is falsy.

Definition at line 573 of file graph.py.

573  def findCycle(self) -> List[Tuple[QuantumNode, QuantumNode]]:
574  """Check a graph for the presense of cycles and returns the edges of
575  any cycles found, or an empty list if there is no cycle.
576 
577  Returns
578  -------
579  result : list of tuple of `QuantumNode`, `QuantumNode`
580  A list of any graph edges that form a cycle, or an empty list if
581  there is no cycle. Empty list to so support if graph.find_cycle()
582  syntax as an empty list is falsy.
583  """
584  try:
585  return nx.find_cycle(self._connectedQuanta)
586  except nx.NetworkXNoCycle:
587  return []
588 

◆ findQuantaWithDSType()

Set[Quantum] lsst.pipe.base.graph.graph.QuantumGraph.findQuantaWithDSType (   self,
DatasetTypeName  datasetTypeName 
)
Return all the `Quantum` that contain a specified `DatasetTypeName`.

Parameters
----------
datasetTypeName : `str`
    The name of the dataset type to search for as a string,
    can also accept a `DatasetTypeName` which is a `NewType` of str for
    type safety in static type checking.

Returns
-------
result : `set` of `QuantumNode` objects
    A `set` of `QuantumNode`s that contain specified `DatasetTypeName`

Raises
------
KeyError
    Raised if the `DatasetTypeName` is not part of the `QuantumGraph`

Definition at line 412 of file graph.py.

412  def findQuantaWithDSType(self, datasetTypeName: DatasetTypeName) -> Set[Quantum]:
413  """Return all the `Quantum` that contain a specified `DatasetTypeName`.
414 
415  Parameters
416  ----------
417  datasetTypeName : `str`
418  The name of the dataset type to search for as a string,
419  can also accept a `DatasetTypeName` which is a `NewType` of str for
420  type safety in static type checking.
421 
422  Returns
423  -------
424  result : `set` of `QuantumNode` objects
425  A `set` of `QuantumNode`s that contain specified `DatasetTypeName`
426 
427  Raises
428  ------
429  KeyError
430  Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
431 
432  """
433  tasks = self._datasetDict.getAll(datasetTypeName)
434  result: Set[Quantum] = set()
435  result = result.union(*(self._quanta[task] for task in tasks))
436  return result
437 

◆ findTaskDefByLabel()

Optional[TaskDef] lsst.pipe.base.graph.graph.QuantumGraph.findTaskDefByLabel (   self,
str  label 
)
Determine which `TaskDef` objects in this graph are associated
with a `str` representing a tasks label.

Parameters
----------
taskName : str
    Name of a task to search for

Returns
-------
result : `TaskDef`
    `TaskDef` objects that has the specified label.

Definition at line 393 of file graph.py.

393  def findTaskDefByLabel(self, label: str) -> Optional[TaskDef]:
394  """Determine which `TaskDef` objects in this graph are associated
395  with a `str` representing a tasks label.
396 
397  Parameters
398  ----------
399  taskName : str
400  Name of a task to search for
401 
402  Returns
403  -------
404  result : `TaskDef`
405  `TaskDef` objects that has the specified label.
406  """
407  for task in self._quanta.keys():
408  if label == task.label:
409  return task
410  return None
411 

◆ findTaskDefByName()

List[TaskDef] lsst.pipe.base.graph.graph.QuantumGraph.findTaskDefByName (   self,
str  taskName 
)
Determine which `TaskDef` objects in this graph are associated
with a `str` representing a task name (looks at the taskName property
of `TaskDef` objects).

Returns a list of `TaskDef` objects as a `PipelineTask` may appear
multiple times in a graph with different labels.

Parameters
----------
taskName : str
    Name of a task to search for

Returns
-------
result : list of `TaskDef`
    List of the `TaskDef` objects that have the name specified.
    Multiple values are returned in the case that a task is used
    multiple times with different labels.

Definition at line 366 of file graph.py.

366  def findTaskDefByName(self, taskName: str) -> List[TaskDef]:
367  """Determine which `TaskDef` objects in this graph are associated
368  with a `str` representing a task name (looks at the taskName property
369  of `TaskDef` objects).
370 
371  Returns a list of `TaskDef` objects as a `PipelineTask` may appear
372  multiple times in a graph with different labels.
373 
374  Parameters
375  ----------
376  taskName : str
377  Name of a task to search for
378 
379  Returns
380  -------
381  result : list of `TaskDef`
382  List of the `TaskDef` objects that have the name specified.
383  Multiple values are returned in the case that a task is used
384  multiple times with different labels.
385  """
386  results = []
387  for task in self._quanta.keys():
388  split = task.taskName.split('.')
389  if split[-1] == taskName:
390  results.append(task)
391  return results
392 

◆ findTasksWithInput()

Iterable[TaskDef] lsst.pipe.base.graph.graph.QuantumGraph.findTasksWithInput (   self,
DatasetTypeName  datasetTypeName 
)
Find all tasks that have the specified dataset type name as an
input.

Parameters
----------
datasetTypeName : `str`
    A string representing the name of a dataset type to be queried,
    can also accept a `DatasetTypeName` which is a `NewType` of str for
    type safety in static type checking.

Returns
-------
tasks : iterable of `TaskDef`
    `TaskDef` objects that have the specified `DatasetTypeName` as an
    input, list will be empty if no tasks use specified
    `DatasetTypeName` as an input.

Raises
------
KeyError
    Raised if the `DatasetTypeName` is not part of the `QuantumGraph`

Definition at line 289 of file graph.py.

289  def findTasksWithInput(self, datasetTypeName: DatasetTypeName) -> Iterable[TaskDef]:
290  """Find all tasks that have the specified dataset type name as an
291  input.
292 
293  Parameters
294  ----------
295  datasetTypeName : `str`
296  A string representing the name of a dataset type to be queried,
297  can also accept a `DatasetTypeName` which is a `NewType` of str for
298  type safety in static type checking.
299 
300  Returns
301  -------
302  tasks : iterable of `TaskDef`
303  `TaskDef` objects that have the specified `DatasetTypeName` as an
304  input, list will be empty if no tasks use specified
305  `DatasetTypeName` as an input.
306 
307  Raises
308  ------
309  KeyError
310  Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
311  """
312  return (c for c in self._datasetDict.getInputs(datasetTypeName))
313 

◆ findTaskWithOutput()

Optional[TaskDef] lsst.pipe.base.graph.graph.QuantumGraph.findTaskWithOutput (   self,
DatasetTypeName  datasetTypeName 
)
Find all tasks that have the specified dataset type name as an
output.

Parameters
----------
datasetTypeName : `str`
    A string representing the name of a dataset type to be queried,
    can also accept a `DatasetTypeName` which is a `NewType` of str for
    type safety in static type checking.

Returns
-------
`TaskDef` or `None`
    `TaskDef` that outputs `DatasetTypeName` as an output or None if
    none of the tasks produce this `DatasetTypeName`.

Raises
------
KeyError
    Raised if the `DatasetTypeName` is not part of the `QuantumGraph`

Definition at line 314 of file graph.py.

314  def findTaskWithOutput(self, datasetTypeName: DatasetTypeName) -> Optional[TaskDef]:
315  """Find all tasks that have the specified dataset type name as an
316  output.
317 
318  Parameters
319  ----------
320  datasetTypeName : `str`
321  A string representing the name of a dataset type to be queried,
322  can also accept a `DatasetTypeName` which is a `NewType` of str for
323  type safety in static type checking.
324 
325  Returns
326  -------
327  `TaskDef` or `None`
328  `TaskDef` that outputs `DatasetTypeName` as an output or None if
329  none of the tasks produce this `DatasetTypeName`.
330 
331  Raises
332  ------
333  KeyError
334  Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
335  """
336  return self._datasetDict.getOutput(datasetTypeName)
337 

◆ getNodesForTask()

FrozenSet[QuantumNode] lsst.pipe.base.graph.graph.QuantumGraph.getNodesForTask (   self,
TaskDef  taskDef 
)
Return all the `QuantumNodes` associated with a `TaskDef`.

Parameters
----------
taskDef : `TaskDef`
    The `TaskDef` for which `Quantum` are to be queried

Returns
-------
frozenset of `QuantumNodes`
    The `frozenset` of `QuantumNodes` that is associated with the
    specified `TaskDef`.

Definition at line 273 of file graph.py.

273  def getNodesForTask(self, taskDef: TaskDef) -> FrozenSet[QuantumNode]:
274  """Return all the `QuantumNodes` associated with a `TaskDef`.
275 
276  Parameters
277  ----------
278  taskDef : `TaskDef`
279  The `TaskDef` for which `Quantum` are to be queried
280 
281  Returns
282  -------
283  frozenset of `QuantumNodes`
284  The `frozenset` of `QuantumNodes` that is associated with the
285  specified `TaskDef`.
286  """
287  return frozenset(self._taskToQuantumNode[taskDef])
288 

◆ getQuantaForTask()

FrozenSet[Quantum] lsst.pipe.base.graph.graph.QuantumGraph.getQuantaForTask (   self,
TaskDef  taskDef 
)
Return all the `Quantum` associated with a `TaskDef`.

Parameters
----------
taskDef : `TaskDef`
    The `TaskDef` for which `Quantum` are to be queried

Returns
-------
frozenset of `Quantum`
    The `set` of `Quantum` that is associated with the specified
    `TaskDef`.

Definition at line 257 of file graph.py.

257  def getQuantaForTask(self, taskDef: TaskDef) -> FrozenSet[Quantum]:
258  """Return all the `Quantum` associated with a `TaskDef`.
259 
260  Parameters
261  ----------
262  taskDef : `TaskDef`
263  The `TaskDef` for which `Quantum` are to be queried
264 
265  Returns
266  -------
267  frozenset of `Quantum`
268  The `set` of `Quantum` that is associated with the specified
269  `TaskDef`.
270  """
271  return frozenset(self._quanta[taskDef])
272 

◆ getQuantumNodeByNodeId()

QuantumNode lsst.pipe.base.graph.graph.QuantumGraph.getQuantumNodeByNodeId (   self,
NodeId  nodeId 
)
Lookup a `QuantumNode` from an id associated with the node.

Parameters
----------
nodeId : `NodeId`
    The number associated with a node

Returns
-------
node : `QuantumNode`
    The node corresponding with input number

Raises
------
IndexError
    Raised if the requested nodeId is not in the graph.
IncompatibleGraphError
    Raised if the nodeId was built with a different graph than is not
    this instance (or a graph instance that produced this instance
    through and operation such as subset)

Definition at line 231 of file graph.py.

231  def getQuantumNodeByNodeId(self, nodeId: NodeId) -> QuantumNode:
232  """Lookup a `QuantumNode` from an id associated with the node.
233 
234  Parameters
235  ----------
236  nodeId : `NodeId`
237  The number associated with a node
238 
239  Returns
240  -------
241  node : `QuantumNode`
242  The node corresponding with input number
243 
244  Raises
245  ------
246  IndexError
247  Raised if the requested nodeId is not in the graph.
248  IncompatibleGraphError
249  Raised if the nodeId was built with a different graph than is not
250  this instance (or a graph instance that produced this instance
251  through and operation such as subset)
252  """
253  if nodeId.buildId != self._buildId:
254  raise IncompatibleGraphError("This node was built from a different, incompatible, graph instance")
255  return self._nodeIdMap[nodeId]
256 

◆ graph()

nx.DiGraph lsst.pipe.base.graph.graph.QuantumGraph.graph (   self)
Return a graph representing the relations between all the
`QuantumNode` objects. Largely it should be preferred to iterate
over, and use methods of this class, but sometimes direct access to
the networkx object may be helpful

Returns
-------
graph : `networkx.Digraph`
    Internal datastructure that holds relations of `QuantumNode`
    objects

Definition at line 172 of file graph.py.

172  def graph(self) -> nx.DiGraph:
173  """Return a graph representing the relations between all the
174  `QuantumNode` objects. Largely it should be preferred to iterate
175  over, and use methods of this class, but sometimes direct access to
176  the networkx object may be helpful
177 
178  Returns
179  -------
180  graph : `networkx.Digraph`
181  Internal datastructure that holds relations of `QuantumNode`
182  objects
183  """
184  return self._connectedQuanta
185 

◆ graphID()

def lsst.pipe.base.graph.graph.QuantumGraph.graphID (   self)
Returns the ID generated by the graph at construction time

Definition at line 832 of file graph.py.

832  def graphID(self):
833  """Returns the ID generated by the graph at construction time
834  """
835  return self._buildId
836 

◆ inputQuanta()

Iterable[QuantumNode] lsst.pipe.base.graph.graph.QuantumGraph.inputQuanta (   self)
Make a `list` of all `QuantumNode` objects that are 'input' nodes
to the graph, meaning those nodes to not depend on any other nodes in
the graph.

Returns
-------
inputNodes : iterable of `QuantumNode`
    A list of nodes that are inputs to the graph

Definition at line 187 of file graph.py.

187  def inputQuanta(self) -> Iterable[QuantumNode]:
188  """Make a `list` of all `QuantumNode` objects that are 'input' nodes
189  to the graph, meaning those nodes to not depend on any other nodes in
190  the graph.
191 
192  Returns
193  -------
194  inputNodes : iterable of `QuantumNode`
195  A list of nodes that are inputs to the graph
196  """
197  return (q for q, n in self._connectedQuanta.in_degree if n == 0)
198 

◆ isConnected()

bool lsst.pipe.base.graph.graph.QuantumGraph.isConnected (   self)
Return True if all of the nodes in the graph are connected, ignores
directionality of connections.

Definition at line 225 of file graph.py.

225  def isConnected(self) -> bool:
226  """Return True if all of the nodes in the graph are connected, ignores
227  directionality of connections.
228  """
229  return nx.is_weakly_connected(self._connectedQuanta)
230 

◆ iterTaskGraph()

Generator[TaskDef, None, None] lsst.pipe.base.graph.graph.QuantumGraph.iterTaskGraph (   self)
Iterate over the `taskGraph` attribute in topological order

Yields
------
taskDef : `TaskDef`
    `TaskDef` objects in topological order

Definition at line 821 of file graph.py.

821  def iterTaskGraph(self) -> Generator[TaskDef, None, None]:
822  """Iterate over the `taskGraph` attribute in topological order
823 
824  Yields
825  ------
826  taskDef : `TaskDef`
827  `TaskDef` objects in topological order
828  """
829  yield from nx.topological_sort(self.taskGraph)
830 

◆ load()

QuantumGraph lsst.pipe.base.graph.graph.QuantumGraph.load (   cls,
io.IO[bytes file,
DimensionUniverse  universe,
Optional[Iterable[int]]   nodes = None,
Optional[BuildId]   graphID = None 
)
Read QuantumGraph from a file that was made by `save`.

Parameters
----------
file : `io.IO` of bytes
    File with pickle data open in binary mode.
universe: `~lsst.daf.butler.DimensionUniverse`
    DimensionUniverse instance, not used by the method itself but
    needed to ensure that registry data structures are initialized.
nodes: iterable of `int` or None
    Numbers that correspond to nodes in the graph. If specified, only
    these nodes will be loaded. Defaults to None, in which case all
    nodes will be loaded.
graphID : `str` or `None`
    If specified this ID is verified against the loaded graph prior to
    loading any Nodes. This defaults to None in which case no
    validation is done.

Returns
-------
graph : `QuantumGraph`
    Resulting QuantumGraph instance.

Raises
------
TypeError
    Raised if pickle contains instance of a type other than
    QuantumGraph.
ValueError
    Raised if one or more of the nodes requested is not in the
    `QuantumGraph` or if graphID parameter does not match the graph
    being loaded or if the supplied uri does not point at a valid
    `QuantumGraph` save file.

Notes
-----
Reading Quanta from pickle requires existence of singleton
DimensionUniverse which is usually instantiated during Registry
initialization. To make sure that DimensionUniverse exists this method
accepts dummy DimensionUniverse argument.

Definition at line 764 of file graph.py.

767  ) -> QuantumGraph:
768  """Read QuantumGraph from a file that was made by `save`.
769 
770  Parameters
771  ----------
772  file : `io.IO` of bytes
773  File with pickle data open in binary mode.
774  universe: `~lsst.daf.butler.DimensionUniverse`
775  DimensionUniverse instance, not used by the method itself but
776  needed to ensure that registry data structures are initialized.
777  nodes: iterable of `int` or None
778  Numbers that correspond to nodes in the graph. If specified, only
779  these nodes will be loaded. Defaults to None, in which case all
780  nodes will be loaded.
781  graphID : `str` or `None`
782  If specified this ID is verified against the loaded graph prior to
783  loading any Nodes. This defaults to None in which case no
784  validation is done.
785 
786  Returns
787  -------
788  graph : `QuantumGraph`
789  Resulting QuantumGraph instance.
790 
791  Raises
792  ------
793  TypeError
794  Raised if pickle contains instance of a type other than
795  QuantumGraph.
796  ValueError
797  Raised if one or more of the nodes requested is not in the
798  `QuantumGraph` or if graphID parameter does not match the graph
799  being loaded or if the supplied uri does not point at a valid
800  `QuantumGraph` save file.
801 
802  Notes
803  -----
804  Reading Quanta from pickle requires existence of singleton
805  DimensionUniverse which is usually instantiated during Registry
806  initialization. To make sure that DimensionUniverse exists this method
807  accepts dummy DimensionUniverse argument.
808  """
809  # Try to see if the file handle contains pickle data, this will be
810  # removed in the future
811  try:
812  qgraph = pickle.load(file)
813  warnings.warn("Pickle graphs are deprecated, please re-save your graph with the save method")
814  except pickle.UnpicklingError:
815  with LoadHelper(file) as loader: # type: ignore # needed because we don't have Protocols yet
816  qgraph = loader.load(nodes, graphID)
817  if not isinstance(qgraph, QuantumGraph):
818  raise TypeError(f"QuantumGraph pickle file has contains unexpected object type: {type(qgraph)}")
819  return qgraph
820 

◆ loadUri()

QuantumGraph lsst.pipe.base.graph.graph.QuantumGraph.loadUri (   cls,
Union[ButlerURI, str]  uri,
DimensionUniverse  universe,
Optional[Iterable[int]]   nodes = None,
Optional[BuildId]   graphID = None 
)
Read `QuantumGraph` from a URI.

Parameters
----------
uri : `ButlerURI` or `str`
    URI from where to load the graph.
universe: `~lsst.daf.butler.DimensionUniverse`
    DimensionUniverse instance, not used by the method itself but
    needed to ensure that registry data structures are initialized.
nodes: iterable of `int` or None
    Numbers that correspond to nodes in the graph. If specified, only
    these nodes will be loaded. Defaults to None, in which case all
    nodes will be loaded.
graphID : `str` or `None`
    If specified this ID is verified against the loaded graph prior to
    loading any Nodes. This defaults to None in which case no
    validation is done.

Returns
-------
graph : `QuantumGraph`
    Resulting QuantumGraph instance.

Raises
------
TypeError
    Raised if pickle contains instance of a type other than
    QuantumGraph.
ValueError
    Raised if one or more of the nodes requested is not in the
    `QuantumGraph` or if graphID parameter does not match the graph
    being loaded or if the supplied uri does not point at a valid
    `QuantumGraph` save file.


Notes
-----
Reading Quanta from pickle requires existence of singleton
DimensionUniverse which is usually instantiated during Registry
initialization. To make sure that DimensionUniverse exists this method
accepts dummy DimensionUniverse argument.

Definition at line 604 of file graph.py.

607  ) -> QuantumGraph:
608  """Read `QuantumGraph` from a URI.
609 
610  Parameters
611  ----------
612  uri : `ButlerURI` or `str`
613  URI from where to load the graph.
614  universe: `~lsst.daf.butler.DimensionUniverse`
615  DimensionUniverse instance, not used by the method itself but
616  needed to ensure that registry data structures are initialized.
617  nodes: iterable of `int` or None
618  Numbers that correspond to nodes in the graph. If specified, only
619  these nodes will be loaded. Defaults to None, in which case all
620  nodes will be loaded.
621  graphID : `str` or `None`
622  If specified this ID is verified against the loaded graph prior to
623  loading any Nodes. This defaults to None in which case no
624  validation is done.
625 
626  Returns
627  -------
628  graph : `QuantumGraph`
629  Resulting QuantumGraph instance.
630 
631  Raises
632  ------
633  TypeError
634  Raised if pickle contains instance of a type other than
635  QuantumGraph.
636  ValueError
637  Raised if one or more of the nodes requested is not in the
638  `QuantumGraph` or if graphID parameter does not match the graph
639  being loaded or if the supplied uri does not point at a valid
640  `QuantumGraph` save file.
641 
642 
643  Notes
644  -----
645  Reading Quanta from pickle requires existence of singleton
646  DimensionUniverse which is usually instantiated during Registry
647  initialization. To make sure that DimensionUniverse exists this method
648  accepts dummy DimensionUniverse argument.
649  """
650  uri = ButlerURI(uri)
651  # With ButlerURI we have the choice of always using a local file
652  # or reading in the bytes directly. Reading in bytes can be more
653  # efficient for reasonably-sized pickle files when the resource
654  # is remote. For now use the local file variant. For a local file
655  # as_local() does nothing.
656 
657  if uri.getExtension() in (".pickle", ".pkl"):
658  with uri.as_local() as local, open(local.ospath, "rb") as fd:
659  warnings.warn("Pickle graphs are deprecated, please re-save your graph with the save method")
660  qgraph = pickle.load(fd)
661  elif uri.getExtension() in ('.qgraph'):
662  with LoadHelper(uri) as loader:
663  qgraph = loader.load(nodes, graphID)
664  else:
665  raise ValueError("Only know how to handle files saved as `pickle`, `pkl`, or `qgraph`")
666  if not isinstance(qgraph, QuantumGraph):
667  raise TypeError(f"QuantumGraph save file contains unexpected object type: {type(qgraph)}")
668  return qgraph
669 

◆ outputQuanta()

Iterable[QuantumNode] lsst.pipe.base.graph.graph.QuantumGraph.outputQuanta (   self)
Make a `list` of all `QuantumNode` objects that are 'output' nodes
to the graph, meaning those nodes have no nodes that depend them in
the graph.

Returns
-------
outputNodes : iterable of `QuantumNode`
    A list of nodes that are outputs of the graph

Definition at line 200 of file graph.py.

200  def outputQuanta(self) -> Iterable[QuantumNode]:
201  """Make a `list` of all `QuantumNode` objects that are 'output' nodes
202  to the graph, meaning those nodes have no nodes that depend them in
203  the graph.
204 
205  Returns
206  -------
207  outputNodes : iterable of `QuantumNode`
208  A list of nodes that are outputs of the graph
209  """
210  return [q for q, n in self._connectedQuanta.out_degree if n == 0]
211 

◆ save()

def lsst.pipe.base.graph.graph.QuantumGraph.save (   self,
io.IO[bytes file 
)
Save QuantumGraph to a file.

Presently we store QuantumGraph in pickle format, this could
potentially change in the future if better format is found.

Parameters
----------
file : `io.BufferedIOBase`
    File to write pickle data open in binary mode.

Definition at line 670 of file graph.py.

670  def save(self, file: io.IO[bytes]):
671  """Save QuantumGraph to a file.
672 
673  Presently we store QuantumGraph in pickle format, this could
674  potentially change in the future if better format is found.
675 
676  Parameters
677  ----------
678  file : `io.BufferedIOBase`
679  File to write pickle data open in binary mode.
680  """
681  buffer = self._buildSaveObject()
682  file.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes
683 

◆ saveUri()

def lsst.pipe.base.graph.graph.QuantumGraph.saveUri (   self,
  uri 
)
Save `QuantumGraph` to the specified URI.

Parameters
----------
uri : `ButlerURI` or `str`
    URI to where the graph should be saved.

Definition at line 589 of file graph.py.

589  def saveUri(self, uri):
590  """Save `QuantumGraph` to the specified URI.
591 
592  Parameters
593  ----------
594  uri : `ButlerURI` or `str`
595  URI to where the graph should be saved.
596  """
597  buffer = self._buildSaveObject()
598  butlerUri = ButlerURI(uri)
599  if butlerUri.getExtension() not in (".qgraph"):
600  raise TypeError(f"Can currently only save a graph in qgraph format not {uri}")
601  butlerUri.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes
602 

◆ subset()

_T lsst.pipe.base.graph.graph.QuantumGraph.subset ( _T  self,
Union[QuantumNode, Iterable[QuantumNode]]  nodes 
)
Create a new graph object that contains the subset of the nodes
specified as input. Node number is preserved.

Parameters
----------
nodes : `QuantumNode` or iterable of `QuantumNode`

Returns
-------
graph : instance of graph type
    An instance of the type from which the subset was created

Definition at line 466 of file graph.py.

466  def subset(self: _T, nodes: Union[QuantumNode, Iterable[QuantumNode]]) -> _T:
467  """Create a new graph object that contains the subset of the nodes
468  specified as input. Node number is preserved.
469 
470  Parameters
471  ----------
472  nodes : `QuantumNode` or iterable of `QuantumNode`
473 
474  Returns
475  -------
476  graph : instance of graph type
477  An instance of the type from which the subset was created
478  """
479  if not isinstance(nodes, Iterable):
480  nodes = (nodes, )
481  quantumSubgraph = self._connectedQuanta.subgraph(nodes).nodes
482  quantumMap = defaultdict(set)
483 
484  node: QuantumNode
485  for node in quantumSubgraph:
486  quantumMap[node.taskDef].add(node.quantum)
487  # Create an empty graph, and then populate it with custom mapping
488  newInst = type(self)({})
489  newInst._buildGraphs(quantumMap, _quantumToNodeId={n.quantum: n.nodeId for n in nodes},
490  _buildId=self._buildId)
491  return newInst
492 
table::Key< int > type
Definition: Detector.cc:163

◆ subsetToConnected()

Tuple[_T, ...] lsst.pipe.base.graph.graph.QuantumGraph.subsetToConnected ( _T  self)
Generate a list of subgraphs where each is connected.

Returns
-------
result : list of `QuantumGraph`
    A list of graphs that are each connected

Definition at line 493 of file graph.py.

493  def subsetToConnected(self: _T) -> Tuple[_T, ...]:
494  """Generate a list of subgraphs where each is connected.
495 
496  Returns
497  -------
498  result : list of `QuantumGraph`
499  A list of graphs that are each connected
500  """
501  return tuple(self.subset(connectedSet)
502  for connectedSet in nx.weakly_connected_components(self._connectedQuanta))
503 

◆ taskGraph()

nx.DiGraph lsst.pipe.base.graph.graph.QuantumGraph.taskGraph (   self)
Return a graph representing the relations between the tasks inside
the quantum graph.

Returns
-------
taskGraph : `networkx.Digraph`
    Internal datastructure that holds relations of `TaskDef` objects

Definition at line 160 of file graph.py.

160  def taskGraph(self) -> nx.DiGraph:
161  """Return a graph representing the relations between the tasks inside
162  the quantum graph.
163 
164  Returns
165  -------
166  taskGraph : `networkx.Digraph`
167  Internal datastructure that holds relations of `TaskDef` objects
168  """
169  return self._taskGraph
170 

◆ tasksWithDSType()

Iterable[TaskDef] lsst.pipe.base.graph.graph.QuantumGraph.tasksWithDSType (   self,
DatasetTypeName  datasetTypeName 
)
Find all tasks that are associated with the specified dataset type
name.

Parameters
----------
datasetTypeName : `str`
    A string representing the name of a dataset type to be queried,
    can also accept a `DatasetTypeName` which is a `NewType` of str for
    type safety in static type checking.

Returns
-------
result : iterable of `TaskDef`
    `TaskDef` objects that are associated with the specified
    `DatasetTypeName`

Raises
------
KeyError
    Raised if the `DatasetTypeName` is not part of the `QuantumGraph`

Definition at line 338 of file graph.py.

338  def tasksWithDSType(self, datasetTypeName: DatasetTypeName) -> Iterable[TaskDef]:
339  """Find all tasks that are associated with the specified dataset type
340  name.
341 
342  Parameters
343  ----------
344  datasetTypeName : `str`
345  A string representing the name of a dataset type to be queried,
346  can also accept a `DatasetTypeName` which is a `NewType` of str for
347  type safety in static type checking.
348 
349  Returns
350  -------
351  result : iterable of `TaskDef`
352  `TaskDef` objects that are associated with the specified
353  `DatasetTypeName`
354 
355  Raises
356  ------
357  KeyError
358  Raised if the `DatasetTypeName` is not part of the `QuantumGraph`
359  """
360  results = self.findTasksWithInput(datasetTypeName)
361  output = self.findTaskWithOutput(datasetTypeName)
362  if output is not None:
363  results = chain(results, (output,))
364  return results
365 

◆ writeDotGraph()

def lsst.pipe.base.graph.graph.QuantumGraph.writeDotGraph (   self,
Union[str, io.BufferedIOBase]  output 
)
Write out the graph as a dot graph.

Parameters
----------
output : str or `io.BufferedIOBase`
    Either a filesystem path to write to, or a file handle object

Definition at line 456 of file graph.py.

456  def writeDotGraph(self, output: Union[str, io.BufferedIOBase]):
457  """Write out the graph as a dot graph.
458 
459  Parameters
460  ----------
461  output : str or `io.BufferedIOBase`
462  Either a filesystem path to write to, or a file handle object
463  """
464  write_dot(self._connectedQuanta, output)
465 

The documentation for this class was generated from the following file: