21 from __future__
import annotations
23 """Module defining GraphBuilder class and related methods.
26 __all__ = [
'GraphBuilder']
32 from collections
import ChainMap
33 from contextlib
import contextmanager
34 from dataclasses
import dataclass
35 from typing
import Dict, Iterable, Iterator, List, Set
42 from .connections
import iterConnections
43 from .pipeline
import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
44 from .graph
import QuantumGraph
45 from lsst.daf.butler
import (
60 _LOG = logging.getLogger(__name__.partition(
".")[2])
63 class _DatasetDict(NamedKeyDict[DatasetType, Dict[DataCoordinate, DatasetRef]]):
64 """A custom dictionary that maps `DatasetType` to a nested dictionary of
65 the known `DatasetRef` instances of that type.
70 Positional arguments are forwarded to the `dict` constructor.
71 universe : `DimensionUniverse`
72 Universe of all possible dimensions.
74 def __init__(self, *args, universe: DimensionGraph):
80 universe: DimensionUniverse) -> _DatasetDict:
81 """Construct a dictionary from a flat iterable of `DatasetType` keys.
85 datasetTypes : `iterable` of `DatasetType`
86 DatasetTypes to use as keys for the dict. Values will be empty
88 universe : `DimensionUniverse`
89 Universe of all possible dimensions.
93 dictionary : `_DatasetDict`
94 A new `_DatasetDict` instance.
96 return cls({datasetType: {}
for datasetType
in datasetTypes}, universe=universe)
99 def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetDict, *rest: _DatasetDict
101 """Return a new dictionary by extracting items corresponding to the
102 given keys from one or more existing dictionaries.
106 datasetTypes : `iterable` of `DatasetType`
107 DatasetTypes to use as keys for the dict. Values will be obtained
108 by lookups against ``first`` and ``rest``.
109 first : `_DatasetDict`
110 Another dictionary from which to extract values.
112 Additional dictionaries from which to extract values.
116 dictionary : `_DatasetDict`
117 A new dictionary instance.
119 combined = ChainMap(first, *rest)
120 return cls({datasetType: combined[datasetType]
for datasetType
in datasetTypes},
121 universe=first.universe)
125 """The union of all dimensions used by all dataset types in this
126 dictionary, including implied dependencies (`DimensionGraph`).
131 return base.union(*[datasetType.dimensions
for datasetType
in self.keys()])
134 """Unpack nested single-element `DatasetRef` dicts into a new
135 mapping with `DatasetType` keys and `DatasetRef` values.
137 This method assumes that each nest contains exactly one item, as is the
138 case for all "init" datasets.
142 dictionary : `NamedKeyDict`
143 Dictionary mapping `DatasetType` to `DatasetRef`, with both
144 `DatasetType` instances and string names usable as keys.
146 def getOne(refs: Dict[DataCoordinate, DatasetRef]) -> DatasetRef:
149 return NamedKeyDict({datasetType: getOne(refs)
for datasetType, refs
in self.items()})
152 """Unpack nested multi-element `DatasetRef` dicts into a new
153 mapping with `DatasetType` keys and `set` of `DatasetRef` values.
157 dictionary : `NamedKeyDict`
158 Dictionary mapping `DatasetType` to `DatasetRef`, with both
159 `DatasetType` instances and string names usable as keys.
161 return NamedKeyDict({datasetType:
list(refs.values())
for datasetType, refs
in self.items()})
163 def extract(self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
164 ) -> Iterator[DatasetRef]:
165 """Iterate over the contained `DatasetRef` instances that match the
166 given `DatasetType` and data IDs.
170 datasetType : `DatasetType`
171 Dataset type to match.
172 dataIds : `Iterable` [ `DataCoordinate` ]
177 refs : `Iterator` [ `DatasetRef` ]
178 DatasetRef instances for which ``ref.datasetType == datasetType``
179 and ``ref.dataId`` is in ``dataIds``.
181 refs = self[datasetType]
182 return (refs[dataId]
for dataId
in dataIds)
186 """Helper class aggregating information about a `Quantum`, used when
187 constructing a `QuantumGraph`.
189 See `_PipelineScaffolding` for a top-down description of the full
190 scaffolding data structure.
194 task : _TaskScaffolding
195 Back-reference to the helper object for the `PipelineTask` this quantum
196 represents an execution of.
197 dataId : `DataCoordinate`
198 Data ID for this quantum.
200 def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate):
203 self.
inputsinputs = _DatasetDict.fromDatasetTypes(task.inputs.keys(), universe=dataId.universe)
204 self.
outputsoutputs = _DatasetDict.fromDatasetTypes(task.outputs.keys(), universe=dataId.universe)
205 self.
prerequisitesprerequisites = _DatasetDict.fromDatasetTypes(task.prerequisites.keys(),
206 universe=dataId.universe)
208 __slots__ = (
"task",
"dataId",
"inputs",
"outputs",
"prerequisites")
211 return f
"_QuantumScaffolding(taskDef={self.task.taskDef}, dataId={self.dataId}, ...)"
213 task: _TaskScaffolding
214 """Back-reference to the helper object for the `PipelineTask` this quantum
215 represents an execution of.
218 dataId: DataCoordinate
219 """Data ID for this quantum.
223 """Nested dictionary containing `DatasetRef` inputs to this quantum.
225 This is initialized to map each `DatasetType` to an empty dictionary at
226 construction. Those nested dictionaries are populated (with data IDs as
227 keys) with unresolved `DatasetRef` instances in
228 `_PipelineScaffolding.connectDataIds`.
231 outputs: _DatasetDict
232 """Nested dictionary containing `DatasetRef` outputs this quantum.
235 prerequisites: _DatasetDict
236 """Nested dictionary containing `DatasetRef` prerequisite inputs to this
241 """Transform the scaffolding object into a true `Quantum` instance.
246 An actual `Quantum` instance.
248 allInputs = self.
inputsinputs.unpackMultiRefs()
249 allInputs.update(self.
prerequisitesprerequisites.unpackMultiRefs())
254 allInputs = self.
tasktask.taskDef.connections.adjustQuantum(allInputs)
256 taskName=self.
tasktask.taskDef.taskName,
257 taskClass=self.
tasktask.taskDef.taskClass,
259 initInputs=self.
tasktask.initInputs.unpackSingleRefs(),
261 outputs=self.
outputsoutputs.unpackMultiRefs(),
267 """Helper class aggregating information about a `PipelineTask`, used when
268 constructing a `QuantumGraph`.
270 See `_PipelineScaffolding` for a top-down description of the full
271 scaffolding data structure.
276 Data structure that identifies the task class and its config.
277 parent : `_PipelineScaffolding`
278 The parent data structure that will hold the instance being
280 datasetTypes : `TaskDatasetTypes`
281 Data structure that categorizes the dataset types used by this task.
283 def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
284 universe = parent.dimensions.universe
286 self.
dimensionsdimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
287 assert self.
dimensionsdimensions.issubset(parent.dimensions)
290 self.
initInputsinitInputs = _DatasetDict.fromSubset(datasetTypes.initInputs, parent.initInputs,
291 parent.initIntermediates)
292 self.
initOutputsinitOutputs = _DatasetDict.fromSubset(datasetTypes.initOutputs, parent.initIntermediates,
294 self.
inputsinputs = _DatasetDict.fromSubset(datasetTypes.inputs, parent.inputs, parent.intermediates)
295 self.
outputsoutputs = _DatasetDict.fromSubset(datasetTypes.outputs, parent.intermediates, parent.outputs)
296 self.
prerequisitesprerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
303 return f
"_TaskScaffolding(taskDef={self.taskDef}, ...)"
306 """Data structure that identifies the task class and its config
310 dimensions: DimensionGraph
311 """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
314 initInputs: _DatasetDict
315 """Dictionary containing information about datasets used to construct this
316 task (`_DatasetDict`).
319 initOutputs: _DatasetDict
320 """Dictionary containing information about datasets produced as a
321 side-effect of constructing this task (`_DatasetDict`).
325 """Dictionary containing information about datasets used as regular,
326 graph-constraining inputs to this task (`_DatasetDict`).
329 outputs: _DatasetDict
330 """Dictionary containing information about datasets produced by this task
334 prerequisites: _DatasetDict
335 """Dictionary containing information about input datasets that must be
336 present in the repository before any Pipeline containing this task is run
340 quanta: Dict[DataCoordinate, _QuantumScaffolding]
341 """Dictionary mapping data ID to a scaffolding object for the Quantum of
342 this task with that data ID.
346 """Create a `set` of `Quantum` from the information in ``self``.
350 nodes : `set` of `Quantum
351 The `Quantum` elements corresponding to this task.
353 return set(q.makeQuantum()
for q
in self.
quantaquanta.values())
358 """A helper data structure that organizes the information involved in
359 constructing a `QuantumGraph` for a `Pipeline`.
363 pipeline : `Pipeline`
364 Sequence of tasks from which a graph is to be constructed. Must
365 have nested task classes already imported.
366 universe : `DimensionUniverse`
367 Universe of all possible dimensions.
371 The scaffolding data structure contains nested data structures for both
372 tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`). The dataset
373 data structures are shared between the pipeline-level structure (which
374 aggregates all datasets and categorizes them from the perspective of the
375 complete pipeline) and the individual tasks that use them as inputs and
378 `QuantumGraph` construction proceeds in four steps, with each corresponding
379 to a different `_PipelineScaffolding` method:
381 1. When `_PipelineScaffolding` is constructed, we extract and categorize
382 the DatasetTypes used by the pipeline (delegating to
383 `PipelineDatasetTypes.fromPipeline`), then use these to construct the
384 nested `_TaskScaffolding` and `_DatasetDict` objects.
386 2. In `connectDataIds`, we construct and run the "Big Join Query", which
387 returns related tuples of all dimensions used to identify any regular
388 input, output, and intermediate datasets (not prerequisites). We then
389 iterate over these tuples of related dimensions, identifying the subsets
390 that correspond to distinct data IDs for each task and dataset type,
391 and then create `_QuantumScaffolding` objects.
393 3. In `resolveDatasetRefs`, we run follow-up queries against all of the
394 dataset data IDs previously identified, transforming unresolved
395 DatasetRefs into resolved DatasetRefs where appropriate. We then look
396 up prerequisite datasets for all quanta.
398 4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
399 per-task `_QuantumScaffolding` objects.
402 _LOG.debug(
"Initializing data structures for QuantumGraph generation.")
405 datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
408 for attr
in (
"initInputs",
"initIntermediates",
"initOutputs",
409 "inputs",
"intermediates",
"outputs",
"prerequisites"):
410 setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
411 universe=registry.dimensions))
415 self.
dimensionsdimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
416 self.outputs.dimensions)
422 if isinstance(pipeline, Pipeline):
423 pipeline = pipeline.toExpandedPipeline()
425 for taskDef, taskDatasetTypes
in zip(pipeline,
426 datasetTypes.byTask.values())]
431 return f
"_PipelineScaffolding(tasks={self.tasks}, ...)"
433 tasks: List[_TaskScaffolding]
434 """Scaffolding data structures for each task in the pipeline
435 (`list` of `_TaskScaffolding`).
438 initInputs: _DatasetDict
439 """Datasets consumed but not produced when constructing the tasks in this
440 pipeline (`_DatasetDict`).
443 initIntermediates: _DatasetDict
444 """Datasets that are both consumed and produced when constructing the tasks
445 in this pipeline (`_DatasetDict`).
448 initOutputs: _DatasetDict
449 """Datasets produced but not consumed when constructing the tasks in this
450 pipeline (`_DatasetDict`).
454 """Datasets that are consumed but not produced when running this pipeline
458 intermediates: _DatasetDict
459 """Datasets that are both produced and consumed when running this pipeline
463 outputs: _DatasetDict
464 """Datasets produced but not consumed when when running this pipeline
468 prerequisites: _DatasetDict
469 """Datasets that are consumed when running this pipeline and looked up
470 per-Quantum when generating the graph (`_DatasetDict`).
473 dimensions: DimensionGraph
474 """All dimensions used by any regular input, intermediate, or output
475 (not prerequisite) dataset; the set of dimension used in the "Big Join
476 Query" (`DimensionGraph`).
478 This is required to be a superset of all task quantum dimensions.
483 """Query for the data IDs that connect nodes in the `QuantumGraph`.
485 This method populates `_TaskScaffolding.dataIds` and
486 `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
490 registry : `lsst.daf.butler.Registry`
491 Registry for the data repository; used for all data ID queries.
493 Expressions representing the collections to search for input
494 datasets. May be any of the types accepted by
495 `lsst.daf.butler.CollectionSearch.fromExpression`.
496 userQuery : `str` or `None`
497 User-provided expression to limit the data IDs processed.
498 externalDataId : `DataCoordinate`
499 Externally-provided data ID that should be used to restrict the
500 results, just as if these constraints had been included via ``AND``
501 in ``userQuery``. This includes (at least) any instrument named
502 in the pipeline definition.
507 `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
508 An interface to a database temporary table containing all data IDs
509 that will appear in this `QuantumGraph`. Returned inside a
510 context manager, which will drop the temporary table at the end of
511 the `with` block in which this method is called.
513 _LOG.debug(
"Building query for data IDs.")
515 emptyDataId = DataCoordinate.makeEmpty(registry.dimensions)
516 for datasetType, refs
in itertools.chain(self.initInputs.
items(),
517 self.initIntermediates.
items(),
518 self.initOutputs.
items()):
519 refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
524 _LOG.debug(
"Submitting data ID query and materializing results.")
525 with registry.queryDataIds(self.
dimensionsdimensions,
526 datasets=
list(self.inputs),
527 collections=collections,
529 dataId=externalDataId,
530 ).materialize()
as commonDataIds:
531 _LOG.debug(
"Expanding data IDs.")
532 commonDataIds = commonDataIds.expanded()
533 _LOG.debug(
"Iterating over query results to associate quanta with datasets.")
537 for n, commonDataId
in enumerate(commonDataIds):
543 for datasetType, refs
in itertools.chain(self.inputs.
items(), self.intermediates.
items(),
544 self.outputs.
items()):
545 datasetDataId = commonDataId.subset(datasetType.dimensions)
546 ref = refs.get(datasetDataId)
548 ref = DatasetRef(datasetType, datasetDataId)
549 refs[datasetDataId] = ref
550 refsForRow[datasetType.name] = ref
553 for task
in self.
taskstasks:
554 quantumDataId = commonDataId.subset(task.dimensions)
555 quantum = task.quanta.get(quantumDataId)
558 task.quanta[quantumDataId] = quantum
568 for datasetType
in task.inputs:
569 ref = refsForRow[datasetType.name]
570 quantum.inputs[datasetType.name][ref.dataId] = ref
571 for datasetType
in task.outputs:
572 ref = refsForRow[datasetType.name]
573 quantum.outputs[datasetType.name][ref.dataId] = ref
574 _LOG.debug(
"Finished processing %d rows from data ID query.", n)
578 """Perform follow up queries for each dataset data ID produced in
581 This method populates `_DatasetScaffolding.refs` (except for those in
586 registry : `lsst.daf.butler.Registry`
587 Registry for the data repository; used for all data ID queries.
589 Expressions representing the collections to search for input
590 datasets. May be any of the types accepted by
591 `lsst.daf.butler.CollectionSearch.fromExpression`.
592 run : `str`, optional
593 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
594 output datasets, if it already exists.
596 `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
597 Result of a previous call to `connectDataIds`.
598 skipExisting : `bool`, optional
599 If `True` (default), a Quantum is not created if all its outputs
600 already exist in ``run``. Ignored if ``run`` is `None`.
605 Raised if an output dataset already exists in the output run
606 and ``skipExisting`` is `False`. The case where some but not all
607 of a quantum's outputs are present and ``skipExisting`` is `True`
608 cannot be identified at this stage, and is handled by `fillQuanta`
614 for datasetType, refs
in itertools.chain(self.initIntermediates.
items(),
615 self.initOutputs.
items(),
616 self.intermediates.
items(),
617 self.outputs.
items()):
618 _LOG.debug(
"Resolving %d datasets for intermediate and/or output dataset %s.",
619 len(refs), datasetType.name)
620 isInit = datasetType
in self.initIntermediates
or datasetType
in self.initOutputs
621 resolvedRefQueryResults = commonDataIds.subset(
622 datasetType.dimensions,
629 for resolvedRef
in resolvedRefQueryResults:
634 assert resolvedRef.dataId
in refs
635 if skipExisting
or isInit:
636 refs[resolvedRef.dataId] = resolvedRef
639 f
"output RUN collection '{run}' with data ID"
640 f
" {resolvedRef.dataId}.")
642 for datasetType, refs
in itertools.chain(self.initInputs.
items(), self.inputs.
items()):
643 _LOG.debug(
"Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
644 resolvedRefQueryResults = commonDataIds.subset(
645 datasetType.dimensions,
649 collections=collections,
652 dataIdsNotFoundYet =
set(refs.keys())
653 for resolvedRef
in resolvedRefQueryResults:
654 dataIdsNotFoundYet.discard(resolvedRef.dataId)
655 refs[resolvedRef.dataId] = resolvedRef
656 if dataIdsNotFoundYet:
658 f
"{len(dataIdsNotFoundYet)} dataset(s) of type "
659 f
"'{datasetType.name}' was/were present in a previous "
660 f
"query, but could not be found now."
661 f
"This is either a logic bug in QuantumGraph generation "
662 f
"or the input collections have been modified since "
663 f
"QuantumGraph generation began."
667 for task
in self.
taskstasks:
669 "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
674 c.name: c.lookupFunction
675 for c
in iterConnections(task.taskDef.connections,
"prerequisiteInputs")
676 if c.lookupFunction
is not None
679 for quantum
in task.quanta.values():
686 if run
is not None and skipExisting:
689 for datasetType, originalRefs
in quantum.outputs.items():
690 for ref
in task.outputs.extract(datasetType, originalRefs.keys()):
691 if ref.id
is not None:
692 resolvedRefs.append(ref)
694 unresolvedRefs.append(ref)
698 f
"Quantum {quantum.dataId} of task with label "
699 f
"'{quantum.task.taskDef.label}' has some outputs that exist "
701 f
"and others that don't ({unresolvedRefs})."
706 dataIdsToSkip.append(quantum.dataId)
710 for datasetType, refs
in quantum.inputs.items():
711 for ref
in task.inputs.extract(datasetType, refs.keys()):
712 refs[ref.dataId] = ref
717 for datasetType
in task.prerequisites:
718 lookupFunction = lookupFunctions.get(datasetType.name)
719 if lookupFunction
is not None:
723 lookupFunction(datasetType, registry, quantum.dataId, collections)
725 elif (datasetType.isCalibration()
726 and datasetType.dimensions <= quantum.dataId.graph
727 and quantum.dataId.graph.temporal):
731 timespan = quantum.dataId.timespan
733 refs = [registry.findDataset(datasetType, quantum.dataId,
734 collections=collections,
742 refs =
list(registry.queryDatasets(datasetType,
743 collections=collections,
744 dataId=quantum.dataId,
745 findFirst=
True).expanded())
746 quantum.prerequisites[datasetType].update({ref.dataId: ref
for ref
in refs
750 _LOG.debug(
"Pruning %d quanta for task with label '%s' because all of their outputs exist.",
751 len(dataIdsToSkip), task.taskDef.label)
752 for dataId
in dataIdsToSkip:
753 del task.quanta[dataId]
756 """Create a `QuantumGraph` from the quanta already present in
757 the scaffolding data structure.
761 graph : `QuantumGraph`
762 The full `QuantumGraph`.
764 graph =
QuantumGraph({task.taskDef: task.makeQuantumSet()
for task
in self.
taskstasks})
774 """Base class for exceptions generated by graph builder.
779 class OutputExistsError(GraphBuilderError):
780 """Exception generated when output datasets already exist.
786 """Exception generated when a prerequisite dataset does not exist.
792 """GraphBuilder class is responsible for building task execution graph from
797 registry : `~lsst.daf.butler.Registry`
798 Data butler instance.
799 skipExisting : `bool`, optional
800 If `True` (default), a Quantum is not created if all its outputs
809 def makeGraph(self, pipeline, collections, run, userQuery):
810 """Create execution graph for a pipeline.
814 pipeline : `Pipeline`
815 Pipeline definition, task names/classes and their configs.
817 Expressions representing the collections to search for input
818 datasets. May be any of the types accepted by
819 `lsst.daf.butler.CollectionSearch.fromExpression`.
820 run : `str`, optional
821 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
822 output datasets, if it already exists.
824 String which defines user-defined selection for registry, should be
825 empty or `None` if there is no restrictions on data selection.
829 graph : `QuantumGraph`
834 Raised when user expression cannot be parsed.
836 Raised when output datasets already exist.
838 Other exceptions types may be raised by underlying registry
842 if not collections
and (scaffolding.initInputs
or scaffolding.inputs
or scaffolding.prerequisites):
843 raise ValueError(
"Pipeline requires input datasets but no input collections provided.")
844 instrument = pipeline.getInstrument()
845 if isinstance(instrument, str):
847 if instrument
is not None:
848 dataId = DataCoordinate.standardize(instrument=instrument.getName(),
849 universe=self.
registryregistry.dimensions)
851 dataId = DataCoordinate.makeEmpty(self.
registryregistry.dimensions)
852 with scaffolding.connectDataIds(self.
registryregistry, collections, userQuery, dataId)
as commonDataIds:
853 scaffolding.resolveDatasetRefs(self.
registryregistry, collections, run, commonDataIds,
855 return scaffolding.makeQuantumGraph()
std::vector< SchemaItem< Flag > > * items
def __init__(self, *args, DimensionGraph universe)
_DatasetDict fromDatasetTypes(cls, Iterable[DatasetType] datasetTypes, *DimensionUniverse universe)
DimensionGraph dimensions(self)
NamedKeyDict[DatasetType, DatasetRef] unpackSingleRefs(self)
NamedKeyDict[DatasetType, DatasetRef] unpackMultiRefs(self)
Iterator[DatasetRef] extract(self, DatasetType datasetType, Iterable[DataCoordinate] dataIds)
_DatasetDict fromSubset(cls, Iterable[DatasetType] datasetTypes, _DatasetDict first, *_DatasetDict rest)
def makeQuantumGraph(self)
def __init__(self, pipeline, *registry)
def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *skipExisting=True)
def connectDataIds(self, registry, collections, userQuery, externalDataId)
def __init__(self, _TaskScaffolding task, DataCoordinate dataId)
Quantum makeQuantum(self)
Set[Quantum] makeQuantumSet(self)
def __init__(self, TaskDef taskDef, _PipelineScaffolding parent, TaskDatasetTypes datasetTypes)
def makeGraph(self, pipeline, collections, run, userQuery)
def __init__(self, registry, skipExisting=True)
daf::base::PropertyList * list
daf::base::PropertySet * set
typing.Generator[BaseConnection, None, None] iterConnections(PipelineTaskConnections connections, Union[str, Iterable[str]] connectionType)