21 from __future__ 
import annotations
 
   23 """Module defining GraphBuilder class and related methods. 
   26 __all__ = [
'GraphBuilder']
 
   32 from collections 
import ChainMap
 
   33 from dataclasses 
import dataclass
 
   34 from typing 
import Dict, Iterable, Iterator, List
 
   40 from .connections 
import iterConnections
 
   41 from .pipeline 
import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
 
   42 from .graph 
import QuantumGraph, QuantumGraphTaskNodes
 
   43 from lsst.daf.butler 
import (
 
   49     ExpandedDataCoordinate,
 
   58 _LOG = logging.getLogger(__name__.partition(
".")[2])
 
   61 class _DatasetDict(NamedKeyDict[DatasetType, Dict[DataCoordinate, DatasetRef]]):
 
   62     """A custom dictionary that maps `DatasetType` to a nested dictionary of 
   63     the known `DatasetRef` instances of that type. 
   68         Positional arguments are forwarded to the `dict` constructor. 
   69     universe : `DimensionUniverse` 
   70         Universe of all possible dimensions. 
   72     def __init__(self, *args, universe: DimensionGraph):
 
   78                          universe: DimensionUniverse) -> _DatasetDict:
 
   79         """Construct a dictionary from a flat iterable of `DatasetType` keys. 
   83         datasetTypes : `iterable` of `DatasetType` 
   84             DatasetTypes to use as keys for the dict.  Values will be empty 
   86         universe : `DimensionUniverse` 
   87             Universe of all possible dimensions. 
   91         dictionary : `_DatasetDict` 
   92             A new `_DatasetDict` instance. 
   94         return cls({datasetType: {} 
for datasetType 
in datasetTypes}, universe=universe)
 
   97     def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetDict, *rest: _DatasetDict
 
   99         """Return a new dictionary by extracting items corresponding to the 
  100         given keys from one or more existing dictionaries. 
  104         datasetTypes : `iterable` of `DatasetType` 
  105             DatasetTypes to use as keys for the dict.  Values will be obtained 
  106             by lookups against ``first`` and ``rest``. 
  107         first : `_DatasetDict` 
  108             Another dictionary from which to extract values. 
  110             Additional dictionaries from which to extract values. 
  114         dictionary : `_DatasetDict` 
  115             A new dictionary instance. 
  117         combined = ChainMap(first, *rest)
 
  118         return cls({datasetType: combined[datasetType] 
for datasetType 
in datasetTypes},
 
  119                    universe=first.universe)
 
  123         """The union of all dimensions used by all dataset types in this 
  124         dictionary, including implied dependencies (`DimensionGraph`). 
  129         return base.union(*[datasetType.dimensions 
for datasetType 
in self.keys()])
 
  132         """Unpack nested single-element `DatasetRef` dicts into a new 
  133         mapping with `DatasetType` keys and `DatasetRef` values. 
  135         This method assumes that each nest contains exactly one item, as is the 
  136         case for all "init" datasets. 
  140         dictionary : `NamedKeyDict` 
  141             Dictionary mapping `DatasetType` to `DatasetRef`, with both 
  142             `DatasetType` instances and string names usable as keys. 
  144         def getOne(refs: Dict[DataCoordinate, DatasetRef]) -> DatasetRef:
 
  147         return NamedKeyDict({datasetType: getOne(refs) 
for datasetType, refs 
in self.items()})
 
  150         """Unpack nested multi-element `DatasetRef` dicts into a new 
  151         mapping with `DatasetType` keys and `set` of `DatasetRef` values. 
  155         dictionary : `NamedKeyDict` 
  156             Dictionary mapping `DatasetType` to `DatasetRef`, with both 
  157             `DatasetType` instances and string names usable as keys. 
  159         return NamedKeyDict({datasetType: 
list(refs.values()) 
for datasetType, refs 
in self.items()})
 
  161     def extract(self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
 
  162                 ) -> Iterator[DatasetRef]:
 
  163         """Iterate over the contained `DatasetRef` instances that match the 
  164         given `DatasetType` and data IDs. 
  168         datasetType : `DatasetType` 
  169             Dataset type to match. 
  170         dataIds : `Iterable` [ `DataCoordinate` ] 
  175         refs : `Iterator` [ `DatasetRef` ] 
  176             DatasetRef instances for which ``ref.datasetType == datasetType`` 
  177             and ``ref.dataId`` is in ``dataIds``. 
  179         refs = self[datasetType]
 
  180         return (refs[dataId] 
for dataId 
in dataIds)
 
  184     """Helper class aggregating information about a `Quantum`, used when 
  185     constructing a `QuantumGraph`. 
  187     See `_PipelineScaffolding` for a top-down description of the full 
  188     scaffolding data structure. 
  192     task : _TaskScaffolding 
  193         Back-reference to the helper object for the `PipelineTask` this quantum 
  194         represents an execution of. 
  195     dataId : `DataCoordinate` 
  196         Data ID for this quantum. 
  198     def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate):
 
  201         self.
inputs = _DatasetDict.fromDatasetTypes(task.inputs.keys(), universe=dataId.universe)
 
  202         self.
outputs = _DatasetDict.fromDatasetTypes(task.outputs.keys(), universe=dataId.universe)
 
  203         self.
prerequisites = _DatasetDict.fromDatasetTypes(task.prerequisites.keys(),
 
  204                                                            universe=dataId.universe)
 
  206     __slots__ = (
"task", 
"dataId", 
"inputs", 
"outputs", 
"prerequisites")
 
  209         return f
"_QuantumScaffolding(taskDef={self.taskDef}, dataId={self.dataId}, ...)" 
  211     task: _TaskScaffolding
 
  212     """Back-reference to the helper object for the `PipelineTask` this quantum 
  213     represents an execution of. 
  216     dataId: DataCoordinate
 
  217     """Data ID for this quantum. 
  221     """Nested dictionary containing `DatasetRef` inputs to this quantum. 
  223     This is initialized to map each `DatasetType` to an empty dictionary at 
  224     construction.  Those nested dictionaries are populated (with data IDs as 
  225     keys) with unresolved `DatasetRef` instances in 
  226     `_PipelineScaffolding.connectDataIds`. 
  229     outputs: _DatasetDict
 
  230     """Nested dictionary containing `DatasetRef` outputs this quantum. 
  233     prerequisites: _DatasetDict
 
  234     """Nested dictionary containing `DatasetRef` prerequisite inputs to this 
  239         """Transform the scaffolding object into a true `Quantum` instance. 
  244             An actual `Quantum` instance. 
  246         allInputs = self.
inputs.unpackMultiRefs()
 
  250         config = self.
task.taskDef.config
 
  251         connections = config.connections.ConnectionsClass(config=config)
 
  254         allInputs = connections.adjustQuantum(allInputs)
 
  256             taskName=self.
task.taskDef.taskName,
 
  257             taskClass=self.
task.taskDef.taskClass,
 
  259             initInputs=self.
task.initInputs.unpackSingleRefs(),
 
  260             predictedInputs=allInputs,
 
  261             outputs=self.
outputs.unpackMultiRefs(),
 
  267     """Helper class aggregating information about a `PipelineTask`, used when 
  268     constructing a `QuantumGraph`. 
  270     See `_PipelineScaffolding` for a top-down description of the full 
  271     scaffolding data structure. 
  276         Data structure that identifies the task class and its config. 
  277     parent : `_PipelineScaffolding` 
  278         The parent data structure that will hold the instance being 
  280     datasetTypes : `TaskDatasetTypes` 
  281         Data structure that categorizes the dataset types used by this task. 
  283     def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
 
  284         universe = parent.dimensions.universe
 
  286         self.
dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
 
  287         assert self.
dimensions.issubset(parent.dimensions)
 
  290         self.
initInputs = _DatasetDict.fromSubset(datasetTypes.initInputs, parent.initInputs,
 
  291                                                   parent.initIntermediates)
 
  292         self.
initOutputs = _DatasetDict.fromSubset(datasetTypes.initOutputs, parent.initIntermediates,
 
  294         self.
inputs = _DatasetDict.fromSubset(datasetTypes.inputs, parent.inputs, parent.intermediates)
 
  295         self.
outputs = _DatasetDict.fromSubset(datasetTypes.outputs, parent.intermediates, parent.outputs)
 
  296         self.
prerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
 
  303         return f
"_TaskScaffolding(taskDef={self.taskDef}, ...)" 
  306     """Data structure that identifies the task class and its config 
  310     dimensions: DimensionGraph
 
  311     """The dimensions of a single `Quantum` of this task (`DimensionGraph`). 
  314     initInputs: _DatasetDict
 
  315     """Dictionary containing information about datasets used to construct this 
  316     task (`_DatasetDict`). 
  319     initOutputs: _DatasetDict
 
  320     """Dictionary containing information about datasets produced as a 
  321     side-effect of constructing this task (`_DatasetDict`). 
  325     """Dictionary containing information about datasets used as regular, 
  326     graph-constraining inputs to this task (`_DatasetDict`). 
  329     outputs: _DatasetDict
 
  330     """Dictionary containing information about datasets produced by this task 
  334     prerequisites: _DatasetDict
 
  335     """Dictionary containing information about input datasets that must be 
  336     present in the repository before any Pipeline containing this task is run 
  340     quanta: Dict[DataCoordinate, _QuantumScaffolding]
 
  341     """Dictionary mapping data ID to a scaffolding object for the Quantum of 
  342     this task with that data ID. 
  346         """Create a `QuantumGraphTaskNodes` instance from the information in 
  351         nodes : `QuantumGraphTaskNodes` 
  352             The `QuantumGraph` elements corresponding to this task. 
  356             quanta=[q.makeQuantum() 
for q 
in self.
quanta.values()],
 
  357             initInputs=self.
initInputs.unpackSingleRefs(),
 
  364     """A helper data structure that organizes the information involved in 
  365     constructing a `QuantumGraph` for a `Pipeline`. 
  369     pipeline : `Pipeline` 
  370         Sequence of tasks from which a graph is to be constructed.  Must 
  371         have nested task classes already imported. 
  372     universe : `DimensionUniverse` 
  373         Universe of all possible dimensions. 
  377     The scaffolding data structure contains nested data structures for both 
  378     tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`).  The dataset 
  379     data structures are shared between the pipeline-level structure (which 
  380     aggregates all datasets and categorizes them from the perspective of the 
  381     complete pipeline) and the individual tasks that use them as inputs and 
  384     `QuantumGraph` construction proceeds in four steps, with each corresponding 
  385     to a different `_PipelineScaffolding` method: 
  387     1. When `_PipelineScaffolding` is constructed, we extract and categorize 
  388        the DatasetTypes used by the pipeline (delegating to 
  389        `PipelineDatasetTypes.fromPipeline`), then use these to construct the 
  390        nested `_TaskScaffolding` and `_DatasetDict` objects. 
  392     2. In `connectDataIds`, we construct and run the "Big Join Query", which 
  393        returns related tuples of all dimensions used to identify any regular 
  394        input, output, and intermediate datasets (not prerequisites).  We then 
  395        iterate over these tuples of related dimensions, identifying the subsets 
  396        that correspond to distinct data IDs for each task and dataset type, 
  397        and then create `_QuantumScaffolding` objects. 
  399     3. In `resolveDatasetRefs`, we run follow-up queries against all of the 
  400        dataset data IDs previously identified, transforming unresolved 
  401        DatasetRefs into resolved DatasetRefs where appropriate.  We then look 
  402        up prerequisite datasets for all quanta. 
  404     4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of 
  405        per-task `_QuantumScaffolding` objects. 
  408         _LOG.debug(
"Initializing data structures for QuantumGraph generation.")
 
  411         datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
 
  414         for attr 
in (
"initInputs", 
"initIntermediates", 
"initOutputs",
 
  415                      "inputs", 
"intermediates", 
"outputs", 
"prerequisites"):
 
  416             setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
 
  417                                                               universe=registry.dimensions))
 
  420         self.
dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
 
  421                                                        self.outputs.dimensions)
 
  426         if isinstance(pipeline, Pipeline):
 
  427             pipeline = pipeline.toExpandedPipeline()
 
  429                       for taskDef, taskDatasetTypes 
in zip(pipeline,
 
  430                       datasetTypes.byTask.values())]
 
  435         return f
"_PipelineScaffolding(tasks={self.tasks}, ...)" 
  437     tasks: List[_TaskScaffolding]
 
  438     """Scaffolding data structures for each task in the pipeline 
  439     (`list` of `_TaskScaffolding`). 
  442     initInputs: _DatasetDict
 
  443     """Datasets consumed but not produced when constructing the tasks in this 
  444     pipeline (`_DatasetDict`). 
  447     initIntermediates: _DatasetDict
 
  448     """Datasets that are both consumed and produced when constructing the tasks 
  449     in this pipeline (`_DatasetDict`). 
  452     initOutputs: _DatasetDict
 
  453     """Datasets produced but not consumed when constructing the tasks in this 
  454     pipeline (`_DatasetDict`). 
  458     """Datasets that are consumed but not produced when running this pipeline 
  462     intermediates: _DatasetDict
 
  463     """Datasets that are both produced and consumed when running this pipeline 
  467     outputs: _DatasetDict
 
  468     """Datasets produced but not consumed when when running this pipeline 
  472     prerequisites: _DatasetDict
 
  473     """Datasets that are consumed when running this pipeline and looked up 
  474     per-Quantum when generating the graph (`_DatasetDict`). 
  477     dimensions: DimensionGraph
 
  478     """All dimensions used by any regular input, intermediate, or output 
  479     (not prerequisite) dataset; the set of dimension used in the "Big Join 
  480     Query" (`DimensionGraph`). 
  482     This is required to be a superset of all task quantum dimensions. 
  486         """Query for the data IDs that connect nodes in the `QuantumGraph`. 
  488         This method populates `_TaskScaffolding.dataIds` and 
  489         `_DatasetScaffolding.dataIds` (except for those in `prerequisites`). 
  493         registry : `lsst.daf.butler.Registry` 
  494             Registry for the data repository; used for all data ID queries. 
  495         collections : `lsst.daf.butler.CollectionSearch` 
  496             Object representing the collections to search for input datasets. 
  497         userQuery : `str`, optional 
  498             User-provided expression to limit the data IDs processed. 
  500         _LOG.debug(
"Building query for data IDs.")
 
  502         emptyDataId = ExpandedDataCoordinate(registry.dimensions.empty, (), records={})
 
  503         for datasetType, refs 
in itertools.chain(self.initInputs.
items(),
 
  504                                                  self.initIntermediates.
items(),
 
  505                                                  self.initOutputs.
items()):
 
  506             refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
 
  511         _LOG.debug(
"Submitting data ID query and processing results.")
 
  512         resultIter = registry.queryDimensions(
 
  514             datasets=
list(self.inputs),
 
  515             collections=collections,
 
  520         for n, commonDataId 
in enumerate(resultIter):
 
  526             for datasetType, refs 
in itertools.chain(self.inputs.
items(), self.intermediates.
items(),
 
  527                                                      self.outputs.
items()):
 
  528                 datasetDataId = commonDataId.subset(datasetType.dimensions)
 
  529                 ref = refs.get(datasetDataId)
 
  531                     ref = DatasetRef(datasetType, datasetDataId)
 
  532                     refs[datasetDataId] = ref
 
  533                 refsForRow[datasetType.name] = ref
 
  536             for task 
in self.
tasks:
 
  537                 quantumDataId = commonDataId.subset(task.dimensions)
 
  538                 quantum = task.quanta.get(quantumDataId)
 
  541                     task.quanta[quantumDataId] = quantum
 
  550                 for datasetType 
in task.inputs:
 
  551                     ref = refsForRow[datasetType.name]
 
  552                     quantum.inputs[datasetType.name][ref.dataId] = ref
 
  553                 for datasetType 
in task.outputs:
 
  554                     ref = refsForRow[datasetType.name]
 
  555                     quantum.outputs[datasetType.name][ref.dataId] = ref
 
  556         _LOG.debug(
"Finished processing %d rows from data ID query.", n)
 
  559         """Perform follow up queries for each dataset data ID produced in 
  562         This method populates `_DatasetScaffolding.refs` (except for those in 
  567         registry : `lsst.daf.butler.Registry` 
  568             Registry for the data repository; used for all data ID queries. 
  569         collections : `lsst.daf.butler.CollectionSearch` 
  570             Object representing the collections to search for input datasets. 
  571         run : `str`, optional 
  572             Name of the `~lsst.daf.butler.CollectionType.RUN` collection for 
  573             output datasets, if it already exists. 
  574         skipExisting : `bool`, optional 
  575             If `True` (default), a Quantum is not created if all its outputs 
  576             already exist in ``run``.  Ignored if ``run`` is `None`. 
  581             Raised if an output dataset already exists in the output run 
  582             and ``skipExisting`` is `False`.  The case where some but not all 
  583             of a quantum's outputs are present and ``skipExisting`` is `True` 
  584             cannot be identified at this stage, and is handled by `fillQuanta` 
  590             for datasetType, refs 
in itertools.chain(self.initIntermediates.
items(),
 
  591                                                      self.initOutputs.
items(),
 
  592                                                      self.intermediates.
items(),
 
  593                                                      self.outputs.
items()):
 
  594                 _LOG.debug(
"Resolving %d datasets for intermediate and/or output dataset %s.",
 
  595                            len(refs), datasetType.name)
 
  596                 for dataId, unresolvedRef 
in refs.items():
 
  601                     ref = registry.findDataset(datasetType=datasetType, dataId=dataId, collections=run)
 
  607                                                     f
"output RUN collection '{run}' with data ID {dataId}.")
 
  609         for datasetType, refs 
in itertools.chain(self.initInputs.
items(), self.inputs.
items()):
 
  610             _LOG.debug(
"Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
 
  612                 refs[dataId] = registry.findDataset(datasetType, dataId=dataId, collections=collections)
 
  613             if any(ref 
is None for ref 
in refs.values()):
 
  615                     f
"One or more dataset of type '{datasetType.name}' was " 
  616                     f
"present in a previous query, but could not be found now." 
  617                     f
"This is either a logic bug in QuantumGraph generation, " 
  618                     f
"or the input collections have been modified since " 
  619                     f
"QuantumGraph generation began." 
  623         for task 
in self.
tasks:
 
  625                 "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
 
  630                 c.name: c.lookupFunction
 
  631                 for c 
in iterConnections(task.taskDef.connections, 
"prerequisiteInputs")
 
  632                 if c.lookupFunction 
is not None 
  635             for quantum 
in task.quanta.values():
 
  642                 if run 
is not None and skipExisting:
 
  645                     for datasetType, originalRefs 
in quantum.outputs.items():
 
  646                         for ref 
in task.outputs.extract(datasetType, originalRefs.keys()):
 
  647                             if ref.id 
is not None:
 
  648                                 resolvedRefs.append(ref)
 
  650                                 unresolvedRefs.append(ref)
 
  654                                 f
"Quantum {quantum.dataId} of task with label " 
  655                                 f
"'{quantum.taskDef.label}' has some outputs that exist ({resolvedRefs}) " 
  656                                 f
"and others that don't ({unresolvedRefs})." 
  661                             dataIdsToSkip.append(quantum.dataId)
 
  665                 for datasetType, refs 
in quantum.inputs.items():
 
  666                     for ref 
in task.inputs.extract(datasetType, refs.keys()):
 
  667                         refs[ref.dataId] = ref
 
  676                 for datasetType 
in task.prerequisites:
 
  677                     lookupFunction = lookupFunctions.get(datasetType.name)
 
  678                     if lookupFunction 
is not None:
 
  680                             lookupFunction(datasetType, registry, quantum.dataId, collections)
 
  684                             registry.queryDatasets(
 
  686                                 collections=collections,
 
  687                                 dataId=quantum.dataId,
 
  692                     quantum.prerequisites[datasetType].update({ref.dataId: ref 
for ref 
in refs})
 
  695                 _LOG.debug(
"Pruning %d quanta for task with label '%s' because all of their outputs exist.",
 
  696                            len(dataIdsToSkip), task.taskDef.label)
 
  697                 for dataId 
in dataIdsToSkip:
 
  698                     del task.quanta[dataId]
 
  701         """Create a `QuantumGraph` from the quanta already present in 
  702         the scaffolding data structure. 
  706         graph : `QuantumGraph` 
  707             The full `QuantumGraph`. 
  710         graph.initInputs = self.initInputs.unpackSingleRefs()
 
  711         graph.initOutputs = self.initOutputs.unpackSingleRefs()
 
  712         graph.initIntermediates = self.initIntermediates.unpackSingleRefs()
 
  722     """Base class for exceptions generated by graph builder. 
  727 class OutputExistsError(GraphBuilderError):
 
  728     """Exception generated when output datasets already exist. 
  734     """Exception generated when a prerequisite dataset does not exist. 
  740     """GraphBuilder class is responsible for building task execution graph from 
  745     registry : `~lsst.daf.butler.Registry` 
  746         Data butler instance. 
  747     skipExisting : `bool`, optional 
  748         If `True` (default), a Quantum is not created if all its outputs 
  757     def makeGraph(self, pipeline, collections, run, userQuery):
 
  758         """Create execution graph for a pipeline. 
  762         pipeline : `Pipeline` 
  763             Pipeline definition, task names/classes and their configs. 
  764         collections : `lsst.daf.butler.CollectionSearch` 
  765             Object representing the collections to search for input datasets. 
  766         run : `str`, optional 
  767             Name of the `~lsst.daf.butler.CollectionType.RUN` collection for 
  768             output datasets, if it already exists. 
  770             String which defunes user-defined selection for registry, should be 
  771             empty or `None` if there is no restrictions on data selection. 
  775         graph : `QuantumGraph` 
  780             Raised when user expression cannot be parsed. 
  782             Raised when output datasets already exist. 
  784             Other exceptions types may be raised by underlying registry 
  788         scaffolding.connectDataIds(self.
registry, collections, userQuery)
 
  790         return scaffolding.makeQuantumGraph()