21 from __future__ 
import annotations
 
   23 """Module defining quantum graph classes and related methods. 
   25 There could be different representations of the quantum graph depending 
   26 on the client needs. Presently this module contains graph implementation 
   27 which is based on requirements of command-line environment. In the future 
   28 we could add other implementations and methods to convert between those 
   33 __all__ = [
"QuantumGraph", 
"QuantumGraphTaskNodes", 
"QuantumIterData"]
 
   38 from itertools 
import chain
 
   39 from dataclasses 
import dataclass
 
   41 from typing 
import List, FrozenSet, Mapping
 
   46 from .pipeline 
import TaskDef
 
   47 from .pipeTools 
import orderPipeline
 
   48 from lsst.daf.butler 
import DatasetRef, DatasetType, NamedKeyDict, Quantum
 
   61     """Helper class for iterating over quanta in a graph. 
   63     The `QuantumGraph.traverse` method needs to return topologically ordered 
   64     Quanta together with their dependencies. This class is used as a value 
   65     for the iterator, it contains enumerated Quantum and its dependencies. 
   68     __slots__ = [
"index", 
"quantum", 
"taskDef", 
"dependencies"]
 
   71     """Index of this Quantum, a unique but arbitrary integer.""" 
   74     """Quantum corresponding to a graph node.""" 
   77     """Task class to be run on this quantum, and corresponding label and 
   81     dependencies: FrozenSet(int)
 
   82     """Possibly empty set of indices of dependencies for this Quantum. 
   83     Dependencies include other nodes in the graph; they do not reflect data 
   84     already in butler (there are no graph nodes for those). 
   90     """QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph 
   91     corresponding to a single task. 
   93     The node in quantum graph is represented by the `PipelineTask` and a 
   94     single `~lsst.daf.butler.Quantum` instance. One possible representation 
   95     of the graph is just a list of nodes without edges (edges can be deduced 
   96     from nodes' quantum inputs and outputs if needed). That representation can 
   97     be reduced to the list of PipelineTasks (or their corresponding TaskDefs) 
   98     and the corresponding list of Quanta. This class is used in this reduced 
   99     representation for a single task, and full `QuantumGraph` is a sequence of 
  100     tinstances of this class for one or more tasks. 
  102     Different frameworks may use different graph representation, this 
  103     representation was based mostly on requirements of command-line 
  104     executor which does not need explicit edges information. 
  108     """Task defintion for this set of nodes.""" 
  110     quanta: List[Quantum]
 
  111     """List of quanta corresponding to the task.""" 
  113     initInputs: Mapping[DatasetType, DatasetRef]
 
  114     """Datasets that must be loaded or created to construct this task.""" 
  116     initOutputs: Mapping[DatasetType, DatasetRef]
 
  117     """Datasets that may be written after constructing this task.""" 
  121     """QuantumGraph is a sequence of `QuantumGraphTaskNodes` objects. 
  123     Typically the order of the tasks in the list will be the same as the 
  124     order of tasks in a pipeline (obviously depends on the code which 
  129     iterable : iterable of `QuantumGraphTaskNodes`, optional 
  130         Initial sequence of per-task nodes. 
  133         list.__init__(self, iterable 
or [])
 
  138     initInputs: NamedKeyDict
 
  139     """Datasets that must be provided to construct one or more Tasks in this 
  140     graph, and must be obtained from the data repository. 
  142     This is disjoint with both `initIntermediates` and `initOutputs`. 
  145     initIntermediates: NamedKeyDict
 
  146     """Datasets that must be provided to construct one or more Tasks in this 
  147     graph, but are also produced after constructing a Task in this graph. 
  149     This is disjoint with both `initInputs` and `initOutputs`. 
  152     initOutputs: NamedKeyDict
 
  153     """Datasets that are produced after constructing a Task in this graph, 
  154     and are not used to construct any other Task in this graph. 
  156     This is disjoint from both `initInputs` and `initIntermediates`. 
  161         """Read QuantumGraph from a file that was made by `save`. 
  165         file : `io.BufferedIOBase` 
  166             File with pickle data open in binary mode. 
  167         universe: `~lsst.daf.butler.DimensionUniverse` 
  168             DimensionUniverse instance, not used by the method itself but 
  169             needed to ensure that registry data structures are initialized. 
  173         graph : `QuantumGraph` 
  174             Resulting QuantumGraph instance. 
  179             Raised if pickle contains instance of a type other than 
  184         Reading Quanta from pickle requires existence of singleton 
  185         DimensionUniverse which is usually instantiated during Registry 
  186         initializaion. To make sure that DimensionUniverse exists this method 
  187         accepts dummy DimensionUniverse argument. 
  189         qgraph = pickle.load(file)
 
  190         if not isinstance(qgraph, QuantumGraph):
 
  191             raise TypeError(f
"QuantumGraph pickle file has contains unexpected object type: {type(qgraph)}")
 
  195         """Save QuantumGraph to a file. 
  197         Presently we store QuantumGraph in pickle format, this could 
  198         potentially change in the future if better format is found. 
  202         file : `io.BufferedIOBase` 
  203             File to write pickle data open in binary mode. 
  205         pickle.dump(self, file)
 
  208         """Iterator over quanta in a graph. 
  210         Quanta are returned in unspecified order. 
  215             Task definition for a Quantum. 
  216         quantum : `~lsst.daf.butler.Quantum` 
  219         for taskNodes 
in self:
 
  220             taskDef = taskNodes.taskDef
 
  221             for quantum 
in taskNodes.quanta:
 
  222                 yield taskDef, quantum
 
  225         """Iterator over quanta in a graph. 
  227         QuantumGraph containing individual quanta are returned. 
  231         graph : `QuantumGraph` 
  233         for taskDef, quantum 
in self.
quanta():
 
  235                                          quantum.initInputs, quantum.outputs)
 
  240         """Return total count of quanta in a graph. 
  245             Number of quanta in a graph. 
  247         return sum(len(taskNodes.quanta) 
for taskNodes 
in self)
 
  250         """Return topologically ordered Quanta and their dependencies. 
  252         This method iterates over all Quanta in topological order, enumerating 
  253         them during iteration. Returned `QuantumIterData` object contains 
  254         Quantum instance, its ``index`` and the ``index`` of all its 
  255         prerequsites (Quanta that produce inputs for this Quantum): 
  257         - the ``index`` values are generated by an iteration of a 
  258           QuantumGraph, and are not intrinsic to the QuantumGraph 
  259         - during iteration, each ID will appear in index before it ever 
  260           appears in dependencies. 
  264         quantumData : `QuantumIterData` 
  267         def orderedTaskNodes(graph):
 
  268             """Return topologically ordered task nodes. 
  272             nodes : `QuantumGraphTaskNodes` 
  277             nodesMap = {
id(item.taskDef): item 
for item 
in graph}
 
  279             for taskDef 
in pipeline:
 
  280                 yield nodesMap[
id(taskDef)]
 
  284         for nodes 
in orderedTaskNodes(self):
 
  285             for quantum 
in nodes.quanta:
 
  289                 for dataRef 
in chain.from_iterable(quantum.predictedInputs.values()):
 
  291                     if dataRef.id 
is None:
 
  293                         name, component = dataRef.datasetType.nameAndComponent()
 
  294                         key = (name, dataRef.dataId)
 
  296                             prereq.append(outputs[key])
 
  304                             if not (len(self) == 1 
and len(self[0].quanta) == 1):
 
  308                 for dataRef 
in chain.from_iterable(quantum.outputs.values()):
 
  309                     key = (dataRef.datasetType.name, dataRef.dataId)
 
  312                 yield QuantumIterData(index=index, quantum=quantum, taskDef=nodes.taskDef,
 
  313                                       dependencies=frozenset(prereq))