21 from __future__
import annotations
23 """Module defining GraphBuilder class and related methods. 26 __all__ = [
'GraphBuilder']
32 from collections
import ChainMap
33 from dataclasses
import dataclass
34 from typing
import Set, List, Dict, Optional, Iterable
40 from .pipeline
import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
41 from .graph
import QuantumGraph, QuantumGraphTaskNodes
42 from lsst.daf.butler
import (
47 ExpandedDataCoordinate,
50 from lsst.daf.butler.core.utils
import NamedKeyDict
56 _LOG = logging.getLogger(__name__.partition(
".")[2])
61 """Helper class aggregating information about a `DatasetType`, used when 62 constructing a `QuantumGraph`. 64 `_DatasetScaffolding` does not hold the `DatasetType` instance itself 65 because it is usually used as the value type in `_DatasetScaffoldingDict`, 66 which uses `DatasetType` instances as keys. 68 See `_PipelineScaffolding` for a top-down description of the full 69 scaffolding data structure. 73 dimensions : `DimensionGraph` 74 Dimensions of the `DatasetType`. 76 def __init__(self, dimensions: DimensionGraph):
83 __slots__ = (
"dimensions",
"producer",
"consumers",
"dataIds",
"refs")
85 dimensions: DimensionGraph
86 """The dimensions of the dataset type (`DimensionGraph`). 88 Set during `_PipelineScaffolding` construction. 91 producer: Optional[_TaskScaffolding]
92 """The scaffolding objects for the Task that produces this dataset. 94 Set during `_PipelineScaffolding` construction. 97 consumers: Dict[str, _TaskScaffolding]
98 """The scaffolding objects for the Tasks that consume this dataset, 99 keyed by their label in the `Pipeline`. 101 Set during `_PipelineScaffolding` construction. 104 dataIds: Set[ExpandedDataCoordinate]
105 """Data IDs for all instances of this dataset type in the graph. 107 Populated after construction by `_PipelineScaffolding.fillDataIds`. 110 refs: List[DatasetRef]
111 """References for all instances of this dataset type in the graph. 113 Populated after construction by `_PipelineScaffolding.fillDatasetRefs`. 118 """Custom dictionary that maps `DatasetType` to `_DatasetScaffolding`. 120 See `_PipelineScaffolding` for a top-down description of the full 121 scaffolding data structure. 126 Positional arguments are forwarded to the `dict` constructor. 127 universe : `DimensionUniverse` 128 Universe of all possible dimensions. 130 def __init__(self, *args, universe: DimensionGraph):
136 universe: DimensionUniverse) -> _DatasetScaffoldingDict:
137 """Construct a a dictionary from a flat iterable of `DatasetType` keys. 141 datasetTypes : `iterable` of `DatasetType` 142 DatasetTypes to use as keys for the dict. Values will be 143 constructed from the dimensions of the keys. 144 universe : `DimensionUniverse` 145 Universe of all possible dimensions. 149 dictionary : `_DatasetScaffoldingDict` 150 A new dictionary instance. 153 for datasetType
in datasetTypes),
157 def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetScaffoldingDict,
158 *rest) -> _DatasetScaffoldingDict:
159 """Return a new dictionary by extracting items corresponding to the 160 given keys from one or more existing dictionaries. 164 datasetTypes : `iterable` of `DatasetType` 165 DatasetTypes to use as keys for the dict. Values will be obtained 166 by lookups against ``first`` and ``rest``. 167 first : `_DatasetScaffoldingDict` 168 Another dictionary from which to extract values. 170 Additional dictionaries from which to extract values. 174 dictionary : `_DatasetScaffoldingDict` 175 A new dictionary instance. 177 combined = ChainMap(first, *rest)
178 return cls(((datasetType, combined[datasetType])
for datasetType
in datasetTypes),
179 universe=first.universe)
183 """The union of all dimensions used by all dataset types in this 184 dictionary, including implied dependencies (`DimensionGraph`). 189 return base.union(*[scaffolding.dimensions
for scaffolding
in self.values()])
192 """Unpack nested single-element `DatasetRef` lists into a new 195 This method assumes that each `_DatasetScaffolding.refs` list contains 196 exactly one `DatasetRef`, as is the case for all "init" datasets. 200 dictionary : `NamedKeyDict` 201 Dictionary mapping `DatasetType` to `DatasetRef`, with both 202 `DatasetType` instances and string names usable as keys. 204 return NamedKeyDict((datasetType, scaffolding.refs[0])
for datasetType, scaffolding
in self.items())
209 """Helper class aggregating information about a `PipelineTask`, used when 210 constructing a `QuantumGraph`. 212 See `_PipelineScaffolding` for a top-down description of the full 213 scaffolding data structure. 218 Data structure that identifies the task class and its config. 219 parent : `_PipelineScaffolding` 220 The parent data structure that will hold the instance being 222 datasetTypes : `TaskDatasetTypes` 223 Data structure that categorizes the dataset types used by this task. 228 Raised if the task's dimensions are not a subset of the union of the 229 pipeline's dataset dimensions. 231 def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
232 universe = parent.dimensions.universe
234 self.
dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
235 if not self.
dimensions.issubset(parent.dimensions):
237 f
"{self.dimensions} that are not a subset of " 238 f
"the pipeline dimensions {parent.dimensions}.")
242 self.
initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
243 parent.initInputs, parent.initIntermediates)
244 self.
initOutputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initOutputs,
245 parent.initIntermediates, parent.initOutputs)
246 self.
inputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.inputs,
247 parent.inputs, parent.intermediates)
248 self.
outputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.outputs,
249 parent.intermediates, parent.outputs)
250 self.
prerequisites = _DatasetScaffoldingDict.fromSubset(datasetTypes.prerequisites,
251 parent.prerequisites)
254 for dataset
in itertools.chain(self.
initInputs.values(), self.
inputs.values(),
256 dataset.consumers[self.
taskDef.label] = self
258 assert dataset.producer
is None 259 dataset.producer = self
264 """Data structure that identifies the task class and its config 268 dimensions: DimensionGraph
269 """The dimensions of a single `Quantum` of this task (`DimensionGraph`). 272 initInputs: _DatasetScaffoldingDict
273 """Dictionary containing information about datasets used to construct this 274 task (`_DatasetScaffoldingDict`). 277 initOutputs: _DatasetScaffoldingDict
278 """Dictionary containing information about datasets produced as a 279 side-effect of constructing this task (`_DatasetScaffoldingDict`). 282 inputs: _DatasetScaffoldingDict
283 """Dictionary containing information about datasets used as regular, 284 graph-constraining inputs to this task (`_DatasetScaffoldingDict`). 287 outputs: _DatasetScaffoldingDict
288 """Dictionary containing information about datasets produced by this task 289 (`_DatasetScaffoldingDict`). 292 prerequisites: _DatasetScaffoldingDict
293 """Dictionary containing information about input datasets that must be 294 present in the repository before any Pipeline containing this task is run 295 (`_DatasetScaffoldingDict`). 298 dataIds: Set[ExpandedDataCoordinate]
299 """Data IDs for all quanta for this task in the graph (`set` of 300 `ExpandedDataCoordinate`). 302 Populated after construction by `_PipelineScaffolding.fillDataIds`. 305 quanta: List[Quantum]
306 """All quanta for this task in the graph (`list` of `Quantum`). 308 Populated after construction by `_PipelineScaffolding.fillQuanta`. 313 connectionClass = config.connections.ConnectionsClass
314 connectionInstance = connectionClass(config=config)
317 result = connectionInstance.adjustQuantum(quantum.predictedInputs)
318 quantum._predictedInputs = NamedKeyDict(result)
324 """Create a `QuantumGraphTaskNodes` instance from the information in 329 nodes : `QuantumGraphTaskNodes` 330 The `QuantumGraph` elements corresponding to this task. 342 """A helper data structure that organizes the information involved in 343 constructing a `QuantumGraph` for a `Pipeline`. 347 pipeline : `Pipeline` 348 Sequence of tasks from which a graph is to be constructed. Must 349 have nested task classes already imported. 350 universe : `DimensionUniverse` 351 Universe of all possible dimensions. 356 Raised if the task's dimensions are not a subset of the union of the 357 pipeline's dataset dimensions. 361 The scaffolding data structure contains nested data structures for both 362 tasks (`_TaskScaffolding`) and datasets (`_DatasetScaffolding`), with the 363 latter held by `_DatasetScaffoldingDict`. The dataset data structures are 364 shared between the pipeline-level structure (which aggregates all datasets 365 and categorizes them from the perspective of the complete pipeline) and the 366 individual tasks that use them as inputs and outputs. 368 `QuantumGraph` construction proceeds in five steps, with each corresponding 369 to a different `_PipelineScaffolding` method: 371 1. When `_PipelineScaffolding` is constructed, we extract and categorize 372 the DatasetTypes used by the pipeline (delegating to 373 `PipelineDatasetTypes.fromPipeline`), then use these to construct the 374 nested `_TaskScaffolding` and `_DatasetScaffolding` objects. 376 2. In `fillDataIds`, we construct and run the "Big Join Query", which 377 returns related tuples of all dimensions used to identify any regular 378 input, output, and intermediate datasets (not prerequisites). We then 379 iterate over these tuples of related dimensions, identifying the subsets 380 that correspond to distinct data IDs for each task and dataset type. 382 3. In `fillDatasetRefs`, we run follow-up queries against all of the 383 dataset data IDs previously identified, populating the 384 `_DatasetScaffolding.refs` lists - except for those for prerequisite 385 datasets, which cannot be resolved until distinct quanta are 388 4. In `fillQuanta`, we extract subsets from the lists of `DatasetRef` into 389 the inputs and outputs for each `Quantum` and search for prerequisite 390 datasets, populating `_TaskScaffolding.quanta`. 392 5. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of 393 per-task quanta identified in the previous step. 398 datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
401 for attr
in (
"initInputs",
"initIntermediates",
"initOutputs",
402 "inputs",
"intermediates",
"outputs",
"prerequisites"):
403 setattr(self, attr, _DatasetScaffoldingDict.fromDatasetTypes(getattr(datasetTypes, attr),
404 universe=registry.dimensions))
407 self.
dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
408 self.outputs.dimensions)
413 if isinstance(pipeline, Pipeline):
414 pipeline = pipeline.toExpandedPipeline()
416 for taskDef, taskDatasetTypes
in zip(pipeline,
417 datasetTypes.byTask.values())]
419 tasks: List[_TaskScaffolding]
420 """Scaffolding data structures for each task in the pipeline 421 (`list` of `_TaskScaffolding`). 424 initInputs: _DatasetScaffoldingDict
425 """Datasets consumed but not produced when constructing the tasks in this 426 pipeline (`_DatasetScaffoldingDict`). 429 initIntermediates: _DatasetScaffoldingDict
430 """Datasets that are both consumed and produced when constructing the tasks 431 in this pipeline (`_DatasetScaffoldingDict`). 434 initOutputs: _DatasetScaffoldingDict
435 """Datasets produced but not consumed when constructing the tasks in this 436 pipeline (`_DatasetScaffoldingDict`). 439 inputs: _DatasetScaffoldingDict
440 """Datasets that are consumed but not produced when running this pipeline 441 (`_DatasetScaffoldingDict`). 444 intermediates: _DatasetScaffoldingDict
445 """Datasets that are both produced and consumed when running this pipeline 446 (`_DatasetScaffoldingDict`). 449 outputs: _DatasetScaffoldingDict
450 """Datasets produced but not consumed when when running this pipeline 451 (`_DatasetScaffoldingDict`). 454 prerequisites: _DatasetScaffoldingDict
455 """Datasets that are consumed when running this pipeline and looked up 456 per-Quantum when generating the graph (`_DatasetScaffoldingDict`). 459 dimensions: DimensionGraph
460 """All dimensions used by any regular input, intermediate, or output 461 (not prerequisite) dataset; the set of dimension used in the "Big Join 462 Query" (`DimensionGraph`). 464 This is required to be a superset of all task quantum dimensions. 468 """Query for the data IDs that connect nodes in the `QuantumGraph`. 470 This method populates `_TaskScaffolding.dataIds` and 471 `_DatasetScaffolding.dataIds` (except for those in `prerequisites`). 475 registry : `lsst.daf.butler.Registry` 476 Registry for the data repository; used for all data ID queries. 477 inputCollections : `~collections.abc.Mapping` 478 Mapping from dataset type name to an ordered sequence of 479 collections to search for that dataset. A `defaultdict` is 480 recommended for the case where the same collections should be 481 used for most datasets. 482 userQuery : `str`, optional 483 User-provided expression to limit the data IDs processed. 486 emptyDataId = ExpandedDataCoordinate(registry.dimensions.empty, (), records={})
487 for scaffolding
in itertools.chain(self.initInputs.values(),
488 self.initIntermediates.values(),
489 self.initOutputs.values()):
490 scaffolding.dataIds.add(emptyDataId)
495 resultIter = registry.queryDimensions(
498 datasetType: inputCollections[datasetType.name]
499 for datasetType
in self.inputs
516 for commonDataId
in resultIter:
517 for taskScaffolding
in self.
tasks:
518 taskScaffolding.dataIds.add(commonDataId.subset(taskScaffolding.dimensions))
519 for datasetType, scaffolding
in itertools.chain(self.inputs.
items(),
520 self.intermediates.
items(),
521 self.outputs.
items()):
522 scaffolding.dataIds.add(commonDataId.subset(scaffolding.dimensions))
524 def fillDatasetRefs(self, registry, inputCollections, outputCollection, *,
525 skipExisting=True, clobberExisting=False):
526 """Perform follow up queries for each dataset data ID produced in 529 This method populates `_DatasetScaffolding.refs` (except for those in 534 registry : `lsst.daf.butler.Registry` 535 Registry for the data repository; used for all data ID queries. 536 inputCollections : `~collections.abc.Mapping` 537 Mapping from dataset type name to an ordered sequence of 538 collections to search for that dataset. A `defaultdict` is 539 recommended for the case where the same collections should be 540 used for most datasets. 541 outputCollection : `str` 542 Collection for all output datasets. 543 skipExisting : `bool`, optional 544 If `True` (default), a Quantum is not created if all its outputs 546 clobberExisting : `bool`, optional 547 If `True`, overwrite any outputs that already exist. Cannot be 548 `True` if ``skipExisting`` is. 553 Raised if both `skipExisting` and `clobberExisting` are `True`. 555 Raised if an output dataset already exists in the output collection 556 and both ``skipExisting`` and ``clobberExisting`` are `False`. The 557 case where some but not all of a quantum's outputs are present and 558 ``skipExisting`` is `True` cannot be identified at this stage, and 559 is handled by `fillQuanta` instead. 561 if clobberExisting
and skipExisting:
562 raise ValueError(
"clobberExisting and skipExisting cannot both be true.")
564 for datasetType, scaffolding
in itertools.chain(self.initInputs.
items(), self.inputs.
items()):
565 for dataId
in scaffolding.dataIds:
567 registry.queryDatasets(
569 collections=inputCollections[datasetType.name],
575 assert len(refs) == 1,
"BJQ guarantees exactly one input for each data ID." 576 scaffolding.refs.extend(refs)
580 for datasetType, scaffolding
in itertools.chain(self.initIntermediates.
items(),
581 self.initOutputs.
items(),
582 self.intermediates.
items(),
583 self.outputs.
items()):
584 for dataId
in scaffolding.dataIds:
593 ref = registry.find(collection=outputCollection, datasetType=datasetType, dataId=dataId)
595 ref = DatasetRef(datasetType, dataId)
596 elif not skipExisting:
598 f
"output collection {outputCollection} with data ID {dataId}.")
599 scaffolding.refs.append(ref)
602 def fillQuanta(self, registry, inputCollections, *, skipExisting=True):
603 """Define quanta for each task by splitting up the datasets associated 604 with each task data ID. 606 This method populates `_TaskScaffolding.quanta`. 610 registry : `lsst.daf.butler.Registry` 611 Registry for the data repository; used for all data ID queries. 612 inputCollections : `~collections.abc.Mapping` 613 Mapping from dataset type name to an ordered sequence of 614 collections to search for that dataset. A `defaultdict` is 615 recommended for the case where the same collections should be 616 used for most datasets. 617 skipExisting : `bool`, optional 618 If `True` (default), a Quantum is not created if all its outputs 621 for task
in self.
tasks:
622 for quantumDataId
in task.dataIds:
629 inputs = NamedKeyDict()
630 for datasetType, scaffolding
in task.inputs.items():
631 inputs[datasetType] = [ref
for ref, dataId
in zip(scaffolding.refs, scaffolding.dataIds)
632 if quantumDataId.matches(dataId)]
634 outputs = NamedKeyDict()
635 allOutputsPresent =
True 636 for datasetType, scaffolding
in task.outputs.items():
637 outputs[datasetType] = []
638 for ref, dataId
in zip(scaffolding.refs, scaffolding.dataIds):
639 if quantumDataId.matches(dataId):
641 allOutputsPresent =
False 643 assert skipExisting,
"Existing outputs should have already been identified." 644 if not allOutputsPresent:
646 f
"{dataId} already exists, but other outputs " 647 f
"for task with label {task.taskDef.label} " 648 f
"and data ID {quantumDataId} do not.")
649 outputs[datasetType].
append(ref)
650 if allOutputsPresent
and skipExisting:
661 for datasetType, scaffolding
in task.prerequisites.items():
663 registry.queryDatasets(
665 collections=inputCollections[datasetType.name],
666 dataId=quantumDataId,
671 inputs[datasetType] = refs
674 taskName=task.taskDef.taskName,
675 taskClass=task.taskDef.taskClass,
676 dataId=quantumDataId,
677 initInputs=task.initInputs.unpackRefs(),
678 predictedInputs=inputs,
684 """Create a `QuantumGraph` from the quanta already present in 685 the scaffolding data structure. 688 graph.initInputs = self.initInputs.unpackRefs()
689 graph.initOutputs = self.initOutputs.unpackRefs()
690 graph.initIntermediates = self.initIntermediates.unpackRefs()
700 """Base class for exceptions generated by graph builder. 705 class OutputExistsError(GraphBuilderError):
706 """Exception generated when output datasets already exist. 712 """Exception generated when a prerequisite dataset does not exist. 718 """GraphBuilder class is responsible for building task execution graph from 723 registry : `~lsst.daf.butler.Registry` 724 Data butler instance. 725 skipExisting : `bool`, optional 726 If `True` (default), a Quantum is not created if all its outputs 728 clobberExisting : `bool`, optional 729 If `True`, overwrite any outputs that already exist. Cannot be 730 `True` if ``skipExisting`` is. 733 def __init__(self, registry, skipExisting=True, clobberExisting=False):
739 def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery):
740 """Create execution graph for a pipeline. 744 pipeline : `Pipeline` 745 Pipeline definition, task names/classes and their configs. 746 inputCollections : `~collections.abc.Mapping` 747 Mapping from dataset type name to an ordered sequence of 748 collections to search for that dataset. A `defaultdict` is 749 recommended for the case where the same collections should be 750 used for most datasets. 751 outputCollection : `str` 752 Collection for all output datasets. 754 String which defunes user-defined selection for registry, should be 755 empty or `None` if there is no restrictions on data selection. 759 graph : `QuantumGraph` 764 Raised when user expression cannot be parsed. 766 Raised when output datasets already exist. 768 Other exceptions types may be raised by underlying registry 773 scaffolding.fillDataIds(self.
registry, inputCollections, userQuery)
774 scaffolding.fillDatasetRefs(self.
registry, inputCollections, outputCollection,
777 scaffolding.fillQuanta(self.
registry, inputCollections,
780 return scaffolding.makeQuantumGraph()
def fillDatasetRefs(self, registry, inputCollections, outputCollection, skipExisting=True, clobberExisting=False)
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def makeQuantumGraph(self)
def __init__(self, registry, skipExisting=True, clobberExisting=False)
def fillDataIds(self, registry, inputCollections, userQuery)
def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery)
def makeQuantumGraphTaskNodes(self)
def __init__(self, pipeline, registry)
def fillQuanta(self, registry, inputCollections, skipExisting=True)
daf::base::PropertyList * list