21 from __future__
import annotations
23 """Module defining GraphBuilder class and related methods. 26 __all__ = [
'GraphBuilder']
32 from collections
import ChainMap
33 from dataclasses
import dataclass
34 from typing
import Set, List, Dict, Optional, Iterable
40 from .pipeline
import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
41 from .graph
import QuantumGraph, QuantumGraphTaskNodes
42 from lsst.daf.butler
import (
47 ExpandedDataCoordinate,
50 from lsst.daf.butler.core.utils
import NamedKeyDict
56 _LOG = logging.getLogger(__name__.partition(
".")[2])
61 """Helper class aggregating information about a `DatasetType`, used when 62 constructing a `QuantumGraph`. 64 `_DatasetScaffolding` does not hold the `DatasetType` instance itself 65 because it is usually used as the value type in `_DatasetScaffoldingDict`, 66 which uses `DatasetType` instances as keys. 68 See `_PipelineScaffolding` for a top-down description of the full 69 scaffolding data structure. 73 dimensions : `DimensionGraph` 74 Dimensions of the `DatasetType`. 76 def __init__(self, dimensions: DimensionGraph):
83 __slots__ = (
"dimensions",
"producer",
"consumers",
"dataIds",
"refs")
85 dimensions: DimensionGraph
86 """The dimensions of the dataset type (`DimensionGraph`). 88 Set during `_PipelineScaffolding` construction. 91 producer: Optional[_TaskScaffolding]
92 """The scaffolding objects for the Task that produces this dataset. 94 Set during `_PipelineScaffolding` construction. 97 consumers: Dict[str, _TaskScaffolding]
98 """The scaffolding objects for the Tasks that consume this dataset, 99 keyed by their label in the `Pipeline`. 101 Set during `_PipelineScaffolding` construction. 104 dataIds: Set[ExpandedDataCoordinate]
105 """Data IDs for all instances of this dataset type in the graph. 107 Populated after construction by `_PipelineScaffolding.fillDataIds`. 110 refs: List[DatasetRef]
111 """References for all instances of this dataset type in the graph. 113 Populated after construction by `_PipelineScaffolding.fillDatasetRefs`. 118 """Custom dictionary that maps `DatasetType` to `_DatasetScaffolding`. 120 See `_PipelineScaffolding` for a top-down description of the full 121 scaffolding data structure. 126 Positional arguments are forwarded to the `dict` constructor. 127 universe : `DimensionUniverse` 128 Universe of all possible dimensions. 130 def __init__(self, *args, universe: DimensionGraph):
136 universe: DimensionUniverse) -> _DatasetScaffoldingDict:
137 """Construct a a dictionary from a flat iterable of `DatasetType` keys. 141 datasetTypes : `iterable` of `DatasetType` 142 DatasetTypes to use as keys for the dict. Values will be 143 constructed from the dimensions of the keys. 144 universe : `DimensionUniverse` 145 Universe of all possible dimensions. 149 dictionary : `_DatasetScaffoldingDict` 150 A new dictionary instance. 153 for datasetType
in datasetTypes),
157 def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetScaffoldingDict,
158 *rest) -> _DatasetScaffoldingDict:
159 """Return a new dictionary by extracting items corresponding to the 160 given keys from one or more existing dictionaries. 164 datasetTypes : `iterable` of `DatasetType` 165 DatasetTypes to use as keys for the dict. Values will be obtained 166 by lookups against ``first`` and ``rest``. 167 first : `_DatasetScaffoldingDict` 168 Another dictionary from which to extract values. 170 Additional dictionaries from which to extract values. 174 dictionary : `_DatasetScaffoldingDict` 175 A new dictionary instance. 177 combined = ChainMap(first, *rest)
178 return cls(((datasetType, combined[datasetType])
for datasetType
in datasetTypes),
179 universe=first.universe)
183 """The union of all dimensions used by all dataset types in this 184 dictionary, including implied dependencies (`DimensionGraph`). 189 return base.union(*[scaffolding.dimensions
for scaffolding
in self.values()])
192 """Unpack nested single-element `DatasetRef` lists into a new 195 This method assumes that each `_DatasetScaffolding.refs` list contains 196 exactly one `DatasetRef`, as is the case for all "init" datasets. 200 dictionary : `NamedKeyDict` 201 Dictionary mapping `DatasetType` to `DatasetRef`, with both 202 `DatasetType` instances and string names usable as keys. 204 return NamedKeyDict((datasetType, scaffolding.refs[0])
for datasetType, scaffolding
in self.items())
209 """Helper class aggregating information about a `PipelineTask`, used when 210 constructing a `QuantumGraph`. 212 See `_PipelineScaffolding` for a top-down description of the full 213 scaffolding data structure. 218 Data structure that identifies the task class and its config. 219 parent : `_PipelineScaffolding` 220 The parent data structure that will hold the instance being 222 datasetTypes : `TaskDatasetTypes` 223 Data structure that categorizes the dataset types used by this task. 228 Raised if the task's dimensions are not a subset of the union of the 229 pipeline's dataset dimensions. 231 def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
232 universe = parent.dimensions.universe
234 self.
dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
235 if not self.
dimensions.issubset(parent.dimensions):
237 f
"{self.dimensions} that are not a subset of " 238 f
"the pipeline dimensions {parent.dimensions}.")
241 self.
initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
242 parent.initInputs, parent.initIntermediates)
243 self.
initOutputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initOutputs,
244 parent.initIntermediates, parent.initOutputs)
245 self.
inputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.inputs,
246 parent.inputs, parent.intermediates)
247 self.
outputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.outputs,
248 parent.intermediates, parent.outputs)
249 self.
prerequisites = _DatasetScaffoldingDict.fromSubset(datasetTypes.prerequisites,
250 parent.prerequisites)
253 for dataset
in itertools.chain(self.
initInputs.values(), self.
inputs.values(),
255 dataset.consumers[self.
taskDef.label] = self
257 assert dataset.producer
is None 258 dataset.producer = self
263 """Data structure that identifies the task class and its config 267 dimensions: DimensionGraph
268 """The dimensions of a single `Quantum` of this task (`DimensionGraph`). 271 initInputs: _DatasetScaffoldingDict
272 """Dictionary containing information about datasets used to construct this 273 task (`_DatasetScaffoldingDict`). 276 initOutputs: _DatasetScaffoldingDict
277 """Dictionary containing information about datasets produced as a 278 side-effect of constructing this task (`_DatasetScaffoldingDict`). 281 inputs: _DatasetScaffoldingDict
282 """Dictionary containing information about datasets used as regular, 283 graph-constraining inputs to this task (`_DatasetScaffoldingDict`). 286 outputs: _DatasetScaffoldingDict
287 """Dictionary containing information about datasets produced by this task 288 (`_DatasetScaffoldingDict`). 291 prerequisites: _DatasetScaffoldingDict
292 """Dictionary containing information about input datasets that must be 293 present in the repository before any Pipeline containing this task is run 294 (`_DatasetScaffoldingDict`). 297 dataIds: Set[ExpandedDataCoordinate]
298 """Data IDs for all quanta for this task in the graph (`set` of 299 `ExpandedDataCoordinate`). 301 Populated after construction by `_PipelineScaffolding.fillDataIds`. 304 quanta: List[Quantum]
305 """All quanta for this task in the graph (`list` of `Quantum`). 307 Populated after construction by `_PipelineScaffolding.fillQuanta`. 312 connectionClass = config.connections.ConnectionsClass
313 connectionInstance = connectionClass(config=config)
316 result = connectionInstance.adjustQuantum(quantum.predictedInputs)
317 quantum._predictedInputs = NamedKeyDict(result)
323 """Create a `QuantumGraphTaskNodes` instance from the information in 328 nodes : `QuantumGraphTaskNodes` 329 The `QuantumGraph` elements corresponding to this task. 341 """A helper data structure that organizes the information involved in 342 constructing a `QuantumGraph` for a `Pipeline`. 346 pipeline : `Pipeline` 347 Sequence of tasks from which a graph is to be constructed. Must 348 have nested task classes already imported. 349 universe : `DimensionUniverse` 350 Universe of all possible dimensions. 355 Raised if the task's dimensions are not a subset of the union of the 356 pipeline's dataset dimensions. 360 The scaffolding data structure contains nested data structures for both 361 tasks (`_TaskScaffolding`) and datasets (`_DatasetScaffolding`), with the 362 latter held by `_DatasetScaffoldingDict`. The dataset data structures are 363 shared between the pipeline-level structure (which aggregates all datasets 364 and categorizes them from the perspective of the complete pipeline) and the 365 individual tasks that use them as inputs and outputs. 367 `QuantumGraph` construction proceeds in five steps, with each corresponding 368 to a different `_PipelineScaffolding` method: 370 1. When `_PipelineScaffolding` is constructed, we extract and categorize 371 the DatasetTypes used by the pipeline (delegating to 372 `PipelineDatasetTypes.fromPipeline`), then use these to construct the 373 nested `_TaskScaffolding` and `_DatasetScaffolding` objects. 375 2. In `fillDataIds`, we construct and run the "Big Join Query", which 376 returns related tuples of all dimensions used to identify any regular 377 input, output, and intermediate datasets (not prerequisites). We then 378 iterate over these tuples of related dimensions, identifying the subsets 379 that correspond to distinct data IDs for each task and dataset type. 381 3. In `fillDatasetRefs`, we run follow-up queries against all of the 382 dataset data IDs previously identified, populating the 383 `_DatasetScaffolding.refs` lists - except for those for prerequisite 384 datasets, which cannot be resolved until distinct quanta are 387 4. In `fillQuanta`, we extract subsets from the lists of `DatasetRef` into 388 the inputs and outputs for each `Quantum` and search for prerequisite 389 datasets, populating `_TaskScaffolding.quanta`. 391 5. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of 392 per-task quanta identified in the previous step. 397 datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
400 for attr
in (
"initInputs",
"initIntermediates",
"initOutputs",
401 "inputs",
"intermediates",
"outputs",
"prerequisites"):
402 setattr(self, attr, _DatasetScaffoldingDict.fromDatasetTypes(getattr(datasetTypes, attr),
403 universe=registry.dimensions))
406 self.
dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
407 self.outputs.dimensions)
412 if isinstance(pipeline, Pipeline):
413 pipeline = pipeline.toExpandedPipeline()
415 for taskDef, taskDatasetTypes
in zip(pipeline,
416 datasetTypes.byTask.values())]
418 tasks: List[_TaskScaffolding]
419 """Scaffolding data structures for each task in the pipeline 420 (`list` of `_TaskScaffolding`). 423 initInputs: _DatasetScaffoldingDict
424 """Datasets consumed but not produced when constructing the tasks in this 425 pipeline (`_DatasetScaffoldingDict`). 428 initIntermediates: _DatasetScaffoldingDict
429 """Datasets that are both consumed and produced when constructing the tasks 430 in this pipeline (`_DatasetScaffoldingDict`). 433 initOutputs: _DatasetScaffoldingDict
434 """Datasets produced but not consumed when constructing the tasks in this 435 pipeline (`_DatasetScaffoldingDict`). 438 inputs: _DatasetScaffoldingDict
439 """Datasets that are consumed but not produced when running this pipeline 440 (`_DatasetScaffoldingDict`). 443 intermediates: _DatasetScaffoldingDict
444 """Datasets that are both produced and consumed when running this pipeline 445 (`_DatasetScaffoldingDict`). 448 outputs: _DatasetScaffoldingDict
449 """Datasets produced but not consumed when when running this pipeline 450 (`_DatasetScaffoldingDict`). 453 prerequisites: _DatasetScaffoldingDict
454 """Datasets that are consumed when running this pipeline and looked up 455 per-Quantum when generating the graph (`_DatasetScaffoldingDict`). 458 dimensions: DimensionGraph
459 """All dimensions used by any regular input, intermediate, or output 460 (not prerequisite) dataset; the set of dimension used in the "Big Join 461 Query" (`DimensionGraph`). 463 This is required to be a superset of all task quantum dimensions. 467 """Query for the data IDs that connect nodes in the `QuantumGraph`. 469 This method populates `_TaskScaffolding.dataIds` and 470 `_DatasetScaffolding.dataIds` (except for those in `prerequisites`). 474 registry : `lsst.daf.butler.Registry` 475 Registry for the data repository; used for all data ID queries. 476 inputCollections : `~collections.abc.Mapping` 477 Mapping from dataset type name to an ordered sequence of 478 collections to search for that dataset. A `defaultdict` is 479 recommended for the case where the same collections should be 480 used for most datasets. 481 userQuery : `str`, optional 482 User-provided expression to limit the data IDs processed. 485 emptyDataId = ExpandedDataCoordinate(registry.dimensions.empty, (), records={})
486 for scaffolding
in itertools.chain(self.initInputs.values(),
487 self.initIntermediates.values(),
488 self.initOutputs.values()):
489 scaffolding.dataIds.add(emptyDataId)
494 resultIter = registry.queryDimensions(
497 datasetType: inputCollections[datasetType.name]
498 for datasetType
in self.inputs
515 for commonDataId
in resultIter:
516 for taskScaffolding
in self.
tasks:
517 taskScaffolding.dataIds.add(commonDataId.subset(taskScaffolding.dimensions))
518 for datasetType, scaffolding
in itertools.chain(self.inputs.
items(),
519 self.intermediates.
items(),
520 self.outputs.
items()):
521 scaffolding.dataIds.add(commonDataId.subset(scaffolding.dimensions))
523 def fillDatasetRefs(self, registry, inputCollections, outputCollection, *,
524 skipExisting=True, clobberExisting=False):
525 """Perform follow up queries for each dataset data ID produced in 528 This method populates `_DatasetScaffolding.refs` (except for those in 533 registry : `lsst.daf.butler.Registry` 534 Registry for the data repository; used for all data ID queries. 535 inputCollections : `~collections.abc.Mapping` 536 Mapping from dataset type name to an ordered sequence of 537 collections to search for that dataset. A `defaultdict` is 538 recommended for the case where the same collections should be 539 used for most datasets. 540 outputCollection : `str` 541 Collection for all output datasets. 542 skipExisting : `bool`, optional 543 If `True` (default), a Quantum is not created if all its outputs 545 clobberExisting : `bool`, optional 546 If `True`, overwrite any outputs that already exist. Cannot be 547 `True` if ``skipExisting`` is. 552 Raised if both `skipExisting` and `clobberExisting` are `True`. 554 Raised if an output dataset already exists in the output collection 555 and both ``skipExisting`` and ``clobberExisting`` are `False`. The 556 case where some but not all of a quantum's outputs are present and 557 ``skipExisting`` is `True` cannot be identified at this stage, and 558 is handled by `fillQuanta` instead. 560 if clobberExisting
and skipExisting:
561 raise ValueError(
"clobberExisting and skipExisting cannot both be true.")
563 for datasetType, scaffolding
in itertools.chain(self.initInputs.
items(), self.inputs.
items()):
564 for dataId
in scaffolding.dataIds:
566 registry.queryDatasets(
568 collections=inputCollections[datasetType.name],
574 assert len(refs) == 1,
"BJQ guarantees exactly one input for each data ID." 575 scaffolding.refs.extend(refs)
579 for datasetType, scaffolding
in itertools.chain(self.initIntermediates.
items(),
580 self.initOutputs.
items(),
581 self.intermediates.
items(),
582 self.outputs.
items()):
583 for dataId
in scaffolding.dataIds:
592 ref = registry.find(collection=outputCollection, datasetType=datasetType, dataId=dataId)
594 ref = DatasetRef(datasetType, dataId)
595 elif not skipExisting:
597 f
"output collection {outputCollection} with data ID {dataId}.")
598 scaffolding.refs.append(ref)
601 def fillQuanta(self, registry, inputCollections, *, skipExisting=True):
602 """Define quanta for each task by splitting up the datasets associated 603 with each task data ID. 605 This method populates `_TaskScaffolding.quanta`. 609 registry : `lsst.daf.butler.Registry` 610 Registry for the data repository; used for all data ID queries. 611 inputCollections : `~collections.abc.Mapping` 612 Mapping from dataset type name to an ordered sequence of 613 collections to search for that dataset. A `defaultdict` is 614 recommended for the case where the same collections should be 615 used for most datasets. 616 skipExisting : `bool`, optional 617 If `True` (default), a Quantum is not created if all its outputs 620 for task
in self.
tasks:
621 for quantumDataId
in task.dataIds:
628 inputs = NamedKeyDict()
629 for datasetType, scaffolding
in task.inputs.items():
630 inputs[datasetType] = [ref
for ref, dataId
in zip(scaffolding.refs, scaffolding.dataIds)
631 if quantumDataId.matches(dataId)]
633 outputs = NamedKeyDict()
634 allOutputsPresent =
True 635 for datasetType, scaffolding
in task.outputs.items():
636 outputs[datasetType] = []
637 for ref, dataId
in zip(scaffolding.refs, scaffolding.dataIds):
638 if quantumDataId.matches(dataId):
640 allOutputsPresent =
False 642 assert skipExisting,
"Existing outputs should have already been identified." 643 if not allOutputsPresent:
645 f
"{dataId} already exists, but other outputs " 646 f
"for task with label {task.taskDef.label} " 647 f
"and data ID {quantumDataId} do not.")
648 outputs[datasetType].
append(ref)
649 if allOutputsPresent
and skipExisting:
660 for datasetType, scaffolding
in task.prerequisites.items():
662 registry.queryDatasets(
664 collections=inputCollections[datasetType.name],
665 dataId=quantumDataId,
670 inputs[datasetType] = refs
673 taskName=task.taskDef.taskName,
674 taskClass=task.taskDef.taskClass,
675 dataId=quantumDataId,
676 initInputs=task.initInputs.unpackRefs(),
677 predictedInputs=inputs,
683 """Create a `QuantumGraph` from the quanta already present in 684 the scaffolding data structure. 687 graph.initInputs = self.initInputs.unpackRefs()
688 graph.initOutputs = self.initOutputs.unpackRefs()
689 graph.initIntermediates = self.initIntermediates.unpackRefs()
699 """Base class for exceptions generated by graph builder. 704 class OutputExistsError(GraphBuilderError):
705 """Exception generated when output datasets already exist. 711 """Exception generated when a prerequisite dataset does not exist. 717 """GraphBuilder class is responsible for building task execution graph from 722 registry : `~lsst.daf.butler.Registry` 723 Data butler instance. 724 skipExisting : `bool`, optional 725 If `True` (default), a Quantum is not created if all its outputs 727 clobberExisting : `bool`, optional 728 If `True`, overwrite any outputs that already exist. Cannot be 729 `True` if ``skipExisting`` is. 732 def __init__(self, registry, skipExisting=True, clobberExisting=False):
738 def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery):
739 """Create execution graph for a pipeline. 743 pipeline : `Pipeline` 744 Pipeline definition, task names/classes and their configs. 745 inputCollections : `~collections.abc.Mapping` 746 Mapping from dataset type name to an ordered sequence of 747 collections to search for that dataset. A `defaultdict` is 748 recommended for the case where the same collections should be 749 used for most datasets. 750 outputCollection : `str` 751 Collection for all output datasets. 753 String which defunes user-defined selection for registry, should be 754 empty or `None` if there is no restrictions on data selection. 758 graph : `QuantumGraph` 763 Raised when user expression cannot be parsed. 765 Raised when output datasets already exist. 767 Other exceptions types may be raised by underlying registry 772 scaffolding.fillDataIds(self.
registry, inputCollections, userQuery)
773 scaffolding.fillDatasetRefs(self.
registry, inputCollections, outputCollection,
776 scaffolding.fillQuanta(self.
registry, inputCollections,
779 return scaffolding.makeQuantumGraph()
def fillDatasetRefs(self, registry, inputCollections, outputCollection, skipExisting=True, clobberExisting=False)
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def makeQuantumGraph(self)
def __init__(self, registry, skipExisting=True, clobberExisting=False)
def fillDataIds(self, registry, inputCollections, userQuery)
def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery)
def makeQuantumGraphTaskNodes(self)
def __init__(self, pipeline, registry)
def fillQuanta(self, registry, inputCollections, skipExisting=True)
daf::base::PropertyList * list