21 from __future__
import annotations
23 """Module defining GraphBuilder class and related methods.
26 __all__ = [
'GraphBuilder']
32 from collections
import ChainMap
33 from contextlib
import contextmanager
34 from dataclasses
import dataclass
35 from typing
import Dict, Iterable, Iterator, List, Set
42 from .connections
import iterConnections
43 from .pipeline
import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
44 from .graph
import QuantumGraph
45 from lsst.daf.butler
import (
54 from lsst.daf.butler.registry.queries.exprParser
import ParseError, ParserYacc, TreeVisitor
61 _LOG = logging.getLogger(__name__.partition(
".")[2])
64 class _DatasetDict(NamedKeyDict[DatasetType, Dict[DataCoordinate, DatasetRef]]):
65 """A custom dictionary that maps `DatasetType` to a nested dictionary of
66 the known `DatasetRef` instances of that type.
71 Positional arguments are forwarded to the `dict` constructor.
72 universe : `DimensionUniverse`
73 Universe of all possible dimensions.
75 def __init__(self, *args, universe: DimensionGraph):
81 universe: DimensionUniverse) -> _DatasetDict:
82 """Construct a dictionary from a flat iterable of `DatasetType` keys.
86 datasetTypes : `iterable` of `DatasetType`
87 DatasetTypes to use as keys for the dict. Values will be empty
89 universe : `DimensionUniverse`
90 Universe of all possible dimensions.
94 dictionary : `_DatasetDict`
95 A new `_DatasetDict` instance.
97 return cls({datasetType: {}
for datasetType
in datasetTypes}, universe=universe)
100 def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetDict, *rest: _DatasetDict
102 """Return a new dictionary by extracting items corresponding to the
103 given keys from one or more existing dictionaries.
107 datasetTypes : `iterable` of `DatasetType`
108 DatasetTypes to use as keys for the dict. Values will be obtained
109 by lookups against ``first`` and ``rest``.
110 first : `_DatasetDict`
111 Another dictionary from which to extract values.
113 Additional dictionaries from which to extract values.
117 dictionary : `_DatasetDict`
118 A new dictionary instance.
120 combined = ChainMap(first, *rest)
121 return cls({datasetType: combined[datasetType]
for datasetType
in datasetTypes},
122 universe=first.universe)
126 """The union of all dimensions used by all dataset types in this
127 dictionary, including implied dependencies (`DimensionGraph`).
132 return base.union(*[datasetType.dimensions
for datasetType
in self.keys()])
135 """Unpack nested single-element `DatasetRef` dicts into a new
136 mapping with `DatasetType` keys and `DatasetRef` values.
138 This method assumes that each nest contains exactly one item, as is the
139 case for all "init" datasets.
143 dictionary : `NamedKeyDict`
144 Dictionary mapping `DatasetType` to `DatasetRef`, with both
145 `DatasetType` instances and string names usable as keys.
147 def getOne(refs: Dict[DataCoordinate, DatasetRef]) -> DatasetRef:
150 return NamedKeyDict({datasetType: getOne(refs)
for datasetType, refs
in self.items()})
153 """Unpack nested multi-element `DatasetRef` dicts into a new
154 mapping with `DatasetType` keys and `set` of `DatasetRef` values.
158 dictionary : `NamedKeyDict`
159 Dictionary mapping `DatasetType` to `DatasetRef`, with both
160 `DatasetType` instances and string names usable as keys.
162 return NamedKeyDict({datasetType:
list(refs.values())
for datasetType, refs
in self.items()})
164 def extract(self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
165 ) -> Iterator[DatasetRef]:
166 """Iterate over the contained `DatasetRef` instances that match the
167 given `DatasetType` and data IDs.
171 datasetType : `DatasetType`
172 Dataset type to match.
173 dataIds : `Iterable` [ `DataCoordinate` ]
178 refs : `Iterator` [ `DatasetRef` ]
179 DatasetRef instances for which ``ref.datasetType == datasetType``
180 and ``ref.dataId`` is in ``dataIds``.
182 refs = self[datasetType]
183 return (refs[dataId]
for dataId
in dataIds)
187 """Helper class aggregating information about a `Quantum`, used when
188 constructing a `QuantumGraph`.
190 See `_PipelineScaffolding` for a top-down description of the full
191 scaffolding data structure.
195 task : _TaskScaffolding
196 Back-reference to the helper object for the `PipelineTask` this quantum
197 represents an execution of.
198 dataId : `DataCoordinate`
199 Data ID for this quantum.
201 def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate):
204 self.
inputs = _DatasetDict.fromDatasetTypes(task.inputs.keys(), universe=dataId.universe)
205 self.
outputs = _DatasetDict.fromDatasetTypes(task.outputs.keys(), universe=dataId.universe)
206 self.
prerequisites = _DatasetDict.fromDatasetTypes(task.prerequisites.keys(),
207 universe=dataId.universe)
209 __slots__ = (
"task",
"dataId",
"inputs",
"outputs",
"prerequisites")
212 return f
"_QuantumScaffolding(taskDef={self.task.taskDef}, dataId={self.dataId}, ...)"
214 task: _TaskScaffolding
215 """Back-reference to the helper object for the `PipelineTask` this quantum
216 represents an execution of.
219 dataId: DataCoordinate
220 """Data ID for this quantum.
224 """Nested dictionary containing `DatasetRef` inputs to this quantum.
226 This is initialized to map each `DatasetType` to an empty dictionary at
227 construction. Those nested dictionaries are populated (with data IDs as
228 keys) with unresolved `DatasetRef` instances in
229 `_PipelineScaffolding.connectDataIds`.
232 outputs: _DatasetDict
233 """Nested dictionary containing `DatasetRef` outputs this quantum.
236 prerequisites: _DatasetDict
237 """Nested dictionary containing `DatasetRef` prerequisite inputs to this
242 """Transform the scaffolding object into a true `Quantum` instance.
247 An actual `Quantum` instance.
249 allInputs = self.
inputs.unpackMultiRefs()
255 allInputs = self.
task.taskDef.connections.adjustQuantum(allInputs)
257 taskName=self.
task.taskDef.taskName,
258 taskClass=self.
task.taskDef.taskClass,
260 initInputs=self.
task.initInputs.unpackSingleRefs(),
262 outputs=self.
outputs.unpackMultiRefs(),
268 """Helper class aggregating information about a `PipelineTask`, used when
269 constructing a `QuantumGraph`.
271 See `_PipelineScaffolding` for a top-down description of the full
272 scaffolding data structure.
277 Data structure that identifies the task class and its config.
278 parent : `_PipelineScaffolding`
279 The parent data structure that will hold the instance being
281 datasetTypes : `TaskDatasetTypes`
282 Data structure that categorizes the dataset types used by this task.
284 def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
285 universe = parent.dimensions.universe
287 self.
dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
288 assert self.
dimensions.issubset(parent.dimensions)
291 self.
initInputs = _DatasetDict.fromSubset(datasetTypes.initInputs, parent.initInputs,
292 parent.initIntermediates)
293 self.
initOutputs = _DatasetDict.fromSubset(datasetTypes.initOutputs, parent.initIntermediates,
295 self.
inputs = _DatasetDict.fromSubset(datasetTypes.inputs, parent.inputs, parent.intermediates)
296 self.
outputs = _DatasetDict.fromSubset(datasetTypes.outputs, parent.intermediates, parent.outputs)
297 self.
prerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
304 return f
"_TaskScaffolding(taskDef={self.taskDef}, ...)"
307 """Data structure that identifies the task class and its config
311 dimensions: DimensionGraph
312 """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
315 initInputs: _DatasetDict
316 """Dictionary containing information about datasets used to construct this
317 task (`_DatasetDict`).
320 initOutputs: _DatasetDict
321 """Dictionary containing information about datasets produced as a
322 side-effect of constructing this task (`_DatasetDict`).
326 """Dictionary containing information about datasets used as regular,
327 graph-constraining inputs to this task (`_DatasetDict`).
330 outputs: _DatasetDict
331 """Dictionary containing information about datasets produced by this task
335 prerequisites: _DatasetDict
336 """Dictionary containing information about input datasets that must be
337 present in the repository before any Pipeline containing this task is run
341 quanta: Dict[DataCoordinate, _QuantumScaffolding]
342 """Dictionary mapping data ID to a scaffolding object for the Quantum of
343 this task with that data ID.
347 """Create a `set` of `Quantum` from the information in ``self``.
351 nodes : `set` of `Quantum
352 The `Quantum` elements corresponding to this task.
354 return set(q.makeQuantum()
for q
in self.
quanta.values())
359 """A helper data structure that organizes the information involved in
360 constructing a `QuantumGraph` for a `Pipeline`.
364 pipeline : `Pipeline`
365 Sequence of tasks from which a graph is to be constructed. Must
366 have nested task classes already imported.
367 universe : `DimensionUniverse`
368 Universe of all possible dimensions.
372 The scaffolding data structure contains nested data structures for both
373 tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`). The dataset
374 data structures are shared between the pipeline-level structure (which
375 aggregates all datasets and categorizes them from the perspective of the
376 complete pipeline) and the individual tasks that use them as inputs and
379 `QuantumGraph` construction proceeds in four steps, with each corresponding
380 to a different `_PipelineScaffolding` method:
382 1. When `_PipelineScaffolding` is constructed, we extract and categorize
383 the DatasetTypes used by the pipeline (delegating to
384 `PipelineDatasetTypes.fromPipeline`), then use these to construct the
385 nested `_TaskScaffolding` and `_DatasetDict` objects.
387 2. In `connectDataIds`, we construct and run the "Big Join Query", which
388 returns related tuples of all dimensions used to identify any regular
389 input, output, and intermediate datasets (not prerequisites). We then
390 iterate over these tuples of related dimensions, identifying the subsets
391 that correspond to distinct data IDs for each task and dataset type,
392 and then create `_QuantumScaffolding` objects.
394 3. In `resolveDatasetRefs`, we run follow-up queries against all of the
395 dataset data IDs previously identified, transforming unresolved
396 DatasetRefs into resolved DatasetRefs where appropriate. We then look
397 up prerequisite datasets for all quanta.
399 4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
400 per-task `_QuantumScaffolding` objects.
403 _LOG.debug(
"Initializing data structures for QuantumGraph generation.")
406 datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
409 for attr
in (
"initInputs",
"initIntermediates",
"initOutputs",
410 "inputs",
"intermediates",
"outputs",
"prerequisites"):
411 setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
412 universe=registry.dimensions))
416 self.
dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
417 self.outputs.dimensions)
423 if isinstance(pipeline, Pipeline):
424 pipeline = pipeline.toExpandedPipeline()
426 for taskDef, taskDatasetTypes
in zip(pipeline,
427 datasetTypes.byTask.values())]
432 return f
"_PipelineScaffolding(tasks={self.tasks}, ...)"
434 tasks: List[_TaskScaffolding]
435 """Scaffolding data structures for each task in the pipeline
436 (`list` of `_TaskScaffolding`).
439 initInputs: _DatasetDict
440 """Datasets consumed but not produced when constructing the tasks in this
441 pipeline (`_DatasetDict`).
444 initIntermediates: _DatasetDict
445 """Datasets that are both consumed and produced when constructing the tasks
446 in this pipeline (`_DatasetDict`).
449 initOutputs: _DatasetDict
450 """Datasets produced but not consumed when constructing the tasks in this
451 pipeline (`_DatasetDict`).
455 """Datasets that are consumed but not produced when running this pipeline
459 intermediates: _DatasetDict
460 """Datasets that are both produced and consumed when running this pipeline
464 outputs: _DatasetDict
465 """Datasets produced but not consumed when when running this pipeline
469 prerequisites: _DatasetDict
470 """Datasets that are consumed when running this pipeline and looked up
471 per-Quantum when generating the graph (`_DatasetDict`).
474 dimensions: DimensionGraph
475 """All dimensions used by any regular input, intermediate, or output
476 (not prerequisite) dataset; the set of dimension used in the "Big Join
477 Query" (`DimensionGraph`).
479 This is required to be a superset of all task quantum dimensions.
484 """Query for the data IDs that connect nodes in the `QuantumGraph`.
486 This method populates `_TaskScaffolding.dataIds` and
487 `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
491 registry : `lsst.daf.butler.Registry`
492 Registry for the data repository; used for all data ID queries.
494 Expressions representing the collections to search for input
495 datasets. May be any of the types accepted by
496 `lsst.daf.butler.CollectionSearch.fromExpression`.
497 userQuery : `str`, optional
498 User-provided expression to limit the data IDs processed.
503 `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
504 An interface to a database temporary table containing all data IDs
505 that will appear in this `QuantumGraph`. Returned inside a
506 context manager, which will drop the temporary table at the end of
507 the `with` block in which this method is called.
509 _LOG.debug(
"Building query for data IDs.")
511 emptyDataId = DataCoordinate.makeEmpty(registry.dimensions)
512 for datasetType, refs
in itertools.chain(self.initInputs.
items(),
513 self.initIntermediates.
items(),
514 self.initOutputs.
items()):
515 refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
520 _LOG.debug(
"Submitting data ID query and materializing results.")
522 datasets=
list(self.inputs),
523 collections=collections,
525 ).materialize()
as commonDataIds:
526 _LOG.debug(
"Expanding data IDs.")
527 commonDataIds = commonDataIds.expanded()
528 _LOG.debug(
"Iterating over query results to associate quanta with datasets.")
532 for n, commonDataId
in enumerate(commonDataIds):
538 for datasetType, refs
in itertools.chain(self.inputs.
items(), self.intermediates.
items(),
539 self.outputs.
items()):
540 datasetDataId = commonDataId.subset(datasetType.dimensions)
541 ref = refs.get(datasetDataId)
543 ref = DatasetRef(datasetType, datasetDataId)
544 refs[datasetDataId] = ref
545 refsForRow[datasetType.name] = ref
548 for task
in self.
tasks:
549 quantumDataId = commonDataId.subset(task.dimensions)
550 quantum = task.quanta.get(quantumDataId)
553 task.quanta[quantumDataId] = quantum
563 for datasetType
in task.inputs:
564 ref = refsForRow[datasetType.name]
565 quantum.inputs[datasetType.name][ref.dataId] = ref
566 for datasetType
in task.outputs:
567 ref = refsForRow[datasetType.name]
568 quantum.outputs[datasetType.name][ref.dataId] = ref
569 _LOG.debug(
"Finished processing %d rows from data ID query.", n)
573 """Perform follow up queries for each dataset data ID produced in
576 This method populates `_DatasetScaffolding.refs` (except for those in
581 registry : `lsst.daf.butler.Registry`
582 Registry for the data repository; used for all data ID queries.
584 Expressions representing the collections to search for input
585 datasets. May be any of the types accepted by
586 `lsst.daf.butler.CollectionSearch.fromExpression`.
587 run : `str`, optional
588 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
589 output datasets, if it already exists.
591 `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
592 Result of a previous call to `connectDataIds`.
593 skipExisting : `bool`, optional
594 If `True` (default), a Quantum is not created if all its outputs
595 already exist in ``run``. Ignored if ``run`` is `None`.
600 Raised if an output dataset already exists in the output run
601 and ``skipExisting`` is `False`. The case where some but not all
602 of a quantum's outputs are present and ``skipExisting`` is `True`
603 cannot be identified at this stage, and is handled by `fillQuanta`
609 for datasetType, refs
in itertools.chain(self.initIntermediates.
items(),
610 self.initOutputs.
items(),
611 self.intermediates.
items(),
612 self.outputs.
items()):
613 _LOG.debug(
"Resolving %d datasets for intermediate and/or output dataset %s.",
614 len(refs), datasetType.name)
615 isInit = datasetType
in self.initIntermediates
or datasetType
in self.initOutputs
616 resolvedRefQueryResults = commonDataIds.subset(
617 datasetType.dimensions,
624 for resolvedRef
in resolvedRefQueryResults:
629 assert resolvedRef.dataId
in refs
630 if skipExisting
or isInit:
631 refs[resolvedRef.dataId] = resolvedRef
634 f
"output RUN collection '{run}' with data ID"
635 f
" {resolvedRef.dataId}.")
637 for datasetType, refs
in itertools.chain(self.initInputs.
items(), self.inputs.
items()):
638 _LOG.debug(
"Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
639 resolvedRefQueryResults = commonDataIds.subset(
640 datasetType.dimensions,
644 collections=collections,
647 dataIdsNotFoundYet =
set(refs.keys())
648 for resolvedRef
in resolvedRefQueryResults:
649 dataIdsNotFoundYet.discard(resolvedRef.dataId)
650 refs[resolvedRef.dataId] = resolvedRef
651 if dataIdsNotFoundYet:
653 f
"{len(dataIdsNotFoundYet)} dataset(s) of type "
654 f
"'{datasetType.name}' was/were present in a previous "
655 f
"query, but could not be found now."
656 f
"This is either a logic bug in QuantumGraph generation "
657 f
"or the input collections have been modified since "
658 f
"QuantumGraph generation began."
662 for task
in self.
tasks:
664 "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
669 c.name: c.lookupFunction
670 for c
in iterConnections(task.taskDef.connections,
"prerequisiteInputs")
671 if c.lookupFunction
is not None
674 for quantum
in task.quanta.values():
681 if run
is not None and skipExisting:
684 for datasetType, originalRefs
in quantum.outputs.items():
685 for ref
in task.outputs.extract(datasetType, originalRefs.keys()):
686 if ref.id
is not None:
687 resolvedRefs.append(ref)
689 unresolvedRefs.append(ref)
693 f
"Quantum {quantum.dataId} of task with label "
694 f
"'{quantum.task.taskDef.label}' has some outputs that exist "
696 f
"and others that don't ({unresolvedRefs})."
701 dataIdsToSkip.append(quantum.dataId)
705 for datasetType, refs
in quantum.inputs.items():
706 for ref
in task.inputs.extract(datasetType, refs.keys()):
707 refs[ref.dataId] = ref
712 for datasetType
in task.prerequisites:
713 lookupFunction = lookupFunctions.get(datasetType.name)
714 if lookupFunction
is not None:
718 lookupFunction(datasetType, registry, quantum.dataId, collections)
720 elif (datasetType.isCalibration()
721 and datasetType.dimensions <= quantum.dataId.graph
722 and quantum.dataId.graph.temporal):
726 timespan = quantum.dataId.timespan
728 refs = [registry.findDataset(datasetType, quantum.dataId,
729 collections=collections,
737 refs =
list(registry.queryDatasets(datasetType,
738 collections=collections,
739 dataId=quantum.dataId,
740 findFirst=
True).expanded())
741 quantum.prerequisites[datasetType].update({ref.dataId: ref
for ref
in refs
745 _LOG.debug(
"Pruning %d quanta for task with label '%s' because all of their outputs exist.",
746 len(dataIdsToSkip), task.taskDef.label)
747 for dataId
in dataIdsToSkip:
748 del task.quanta[dataId]
751 """Create a `QuantumGraph` from the quanta already present in
752 the scaffolding data structure.
756 graph : `QuantumGraph`
757 The full `QuantumGraph`.
759 graph =
QuantumGraph({task.taskDef: task.makeQuantumSet()
for task
in self.
tasks})
764 """Implementation of TreeVisitor which looks for instrument name
766 Instrument should be specified as a boolean expression
768 instrument = 'string'
769 'string' = instrument
771 so we only need to find a binary operator where operator is "=",
772 one side is a string literal and other side is an identifier.
773 All visit methods return tuple of (type, value), non-useful nodes
774 return None for both type and value.
785 return (
"str", value)
796 if name.lower() ==
"instrument":
797 return (
"id",
"instrument")
806 if lhs == (
"id",
"instrument")
and rhs[0] ==
"str":
808 elif rhs == (
"id",
"instrument")
and lhs[0] ==
"str":
821 def _findInstruments(queryStr):
822 """Get the names of any instrument named in the query string by searching
823 for "instrument = <value>" and similar patterns.
827 queryStr : `str` or None
828 The query string to search, or None if there is no query.
832 instruments : `list` [`str`]
833 The list of instrument names found in the query.
838 If the query expression can not be parsed.
842 parser = ParserYacc()
845 tree = parser.parse(queryStr)
846 except ParseError
as exc:
847 raise ValueError(f
"failed to parse query expression: {queryStr}")
from exc
849 return finder.instruments
858 """Base class for exceptions generated by graph builder.
863 class OutputExistsError(GraphBuilderError):
864 """Exception generated when output datasets already exist.
870 """Exception generated when a prerequisite dataset does not exist.
876 """GraphBuilder class is responsible for building task execution graph from
881 registry : `~lsst.daf.butler.Registry`
882 Data butler instance.
883 skipExisting : `bool`, optional
884 If `True` (default), a Quantum is not created if all its outputs
893 def makeGraph(self, pipeline, collections, run, userQuery):
894 """Create execution graph for a pipeline.
898 pipeline : `Pipeline`
899 Pipeline definition, task names/classes and their configs.
901 Expressions representing the collections to search for input
902 datasets. May be any of the types accepted by
903 `lsst.daf.butler.CollectionSearch.fromExpression`.
904 run : `str`, optional
905 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
906 output datasets, if it already exists.
908 String which defines user-defined selection for registry, should be
909 empty or `None` if there is no restrictions on data selection.
913 graph : `QuantumGraph`
918 Raised when user expression cannot be parsed.
920 Raised when output datasets already exist.
922 Other exceptions types may be raised by underlying registry
927 instrument = pipeline.getInstrument()
928 if isinstance(instrument, str):
930 instrumentName = instrument.getName()
if instrument
else None
933 with scaffolding.connectDataIds(self.
registry, collections, userQuery)
as commonDataIds:
934 scaffolding.resolveDatasetRefs(self.
registry, collections, run, commonDataIds,
936 return scaffolding.makeQuantumGraph()
939 def _verifyInstrumentRestriction(instrumentName, query):
940 """Add an instrument restriction to the query if it does not have one,
941 and verify that if given an instrument name that there are no other
942 instrument restrictions in the query.
946 instrumentName : `str`
947 The name of the instrument that should appear in the query.
954 The query string with the instrument added to it if needed.
959 If the pipeline names an instrument and the query contains more
960 than one instrument or the name of the instrument in the query does
961 not match the instrument named by the pipeline.
963 if not instrumentName:
965 queryInstruments = _findInstruments(query)
966 if len(queryInstruments) > 1:
967 raise RuntimeError(f
"When the pipeline has an instrument (\"{instrumentName}\") the query must "
968 "have zero instruments or one instrument that matches the pipeline. "
969 f
"Found these instruments in the query: {queryInstruments}.")
970 if not queryInstruments:
972 restriction = f
"instrument = '{instrumentName}'"
973 _LOG.debug(f
"Adding restriction \"{restriction}\" to query.")
974 query = f
"{restriction} AND ({query})" if query
else restriction
975 elif queryInstruments[0] != instrumentName:
978 raise RuntimeError(f
"The instrument named in the query (\"{queryInstruments[0]}\") does not "
979 f
"match the instrument named by the pipeline (\"{instrumentName}\")")