LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining GraphBuilder class and related methods.
24 """
25 
26 __all__ = ['GraphBuilder']
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 import itertools
32 from collections import ChainMap
33 from dataclasses import dataclass
34 from typing import Set, List, Dict, Optional, Iterable
35 import logging
36 
37 # -----------------------------
38 # Imports for other modules --
39 # -----------------------------
40 from .pipeline import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
41 from .graph import QuantumGraph, QuantumGraphTaskNodes
42 from lsst.daf.butler import (
43  DatasetRef,
44  DatasetType,
45  DimensionGraph,
46  DimensionUniverse,
47  ExpandedDataCoordinate,
48  Quantum,
49 )
50 from lsst.daf.butler.core.utils import NamedKeyDict
51 
52 # ----------------------------------
53 # Local non-exported definitions --
54 # ----------------------------------
55 
56 _LOG = logging.getLogger(__name__.partition(".")[2])
57 
58 
59 @dataclass
61  """Helper class aggregating information about a `DatasetType`, used when
62  constructing a `QuantumGraph`.
63 
64  `_DatasetScaffolding` does not hold the `DatasetType` instance itself
65  because it is usually used as the value type in `_DatasetScaffoldingDict`,
66  which uses `DatasetType` instances as keys.
67 
68  See `_PipelineScaffolding` for a top-down description of the full
69  scaffolding data structure.
70 
71  Parameters
72  ----------
73  dimensions : `DimensionGraph`
74  Dimensions of the `DatasetType`.
75  """
76  def __init__(self, dimensions: DimensionGraph):
77  self.dimensions = dimensions
78  self.producer = None
79  self.consumers = {}
80  self.dataIds = set()
81  self.refs = []
82 
83  __slots__ = ("dimensions", "producer", "consumers", "dataIds", "refs")
84 
85  dimensions: DimensionGraph
86  """The dimensions of the dataset type (`DimensionGraph`).
87 
88  Set during `_PipelineScaffolding` construction.
89  """
90 
91  producer: Optional[_TaskScaffolding]
92  """The scaffolding objects for the Task that produces this dataset.
93 
94  Set during `_PipelineScaffolding` construction.
95  """
96 
97  consumers: Dict[str, _TaskScaffolding]
98  """The scaffolding objects for the Tasks that consume this dataset,
99  keyed by their label in the `Pipeline`.
100 
101  Set during `_PipelineScaffolding` construction.
102  """
103 
104  dataIds: Set[ExpandedDataCoordinate]
105  """Data IDs for all instances of this dataset type in the graph.
106 
107  Populated after construction by `_PipelineScaffolding.fillDataIds`.
108  """
109 
110  refs: List[DatasetRef]
111  """References for all instances of this dataset type in the graph.
112 
113  Populated after construction by `_PipelineScaffolding.fillDatasetRefs`.
114  """
115 
116 
117 class _DatasetScaffoldingDict(NamedKeyDict):
118  """Custom dictionary that maps `DatasetType` to `_DatasetScaffolding`.
119 
120  See `_PipelineScaffolding` for a top-down description of the full
121  scaffolding data structure.
122 
123  Parameters
124  ----------
125  args
126  Positional arguments are forwarded to the `dict` constructor.
127  universe : `DimensionUniverse`
128  Universe of all possible dimensions.
129  """
130  def __init__(self, *args, universe: DimensionGraph):
131  super().__init__(*args)
132  self.universe = universe
133 
134  @classmethod
135  def fromDatasetTypes(cls, datasetTypes: Iterable[DatasetType], *,
136  universe: DimensionUniverse) -> _DatasetScaffoldingDict:
137  """Construct a a dictionary from a flat iterable of `DatasetType` keys.
138 
139  Parameters
140  ----------
141  datasetTypes : `iterable` of `DatasetType`
142  DatasetTypes to use as keys for the dict. Values will be
143  constructed from the dimensions of the keys.
144  universe : `DimensionUniverse`
145  Universe of all possible dimensions.
146 
147  Returns
148  -------
149  dictionary : `_DatasetScaffoldingDict`
150  A new dictionary instance.
151  """
152  return cls(((datasetType, _DatasetScaffolding(datasetType.dimensions))
153  for datasetType in datasetTypes),
154  universe=universe)
155 
156  @classmethod
157  def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetScaffoldingDict,
158  *rest) -> _DatasetScaffoldingDict:
159  """Return a new dictionary by extracting items corresponding to the
160  given keys from one or more existing dictionaries.
161 
162  Parameters
163  ----------
164  datasetTypes : `iterable` of `DatasetType`
165  DatasetTypes to use as keys for the dict. Values will be obtained
166  by lookups against ``first`` and ``rest``.
167  first : `_DatasetScaffoldingDict`
168  Another dictionary from which to extract values.
169  rest
170  Additional dictionaries from which to extract values.
171 
172  Returns
173  -------
174  dictionary : `_DatasetScaffoldingDict`
175  A new dictionary instance.
176  """
177  combined = ChainMap(first, *rest)
178  return cls(((datasetType, combined[datasetType]) for datasetType in datasetTypes),
179  universe=first.universe)
180 
181  @property
182  def dimensions(self) -> DimensionGraph:
183  """The union of all dimensions used by all dataset types in this
184  dictionary, including implied dependencies (`DimensionGraph`).
185  """
186  base = self.universe.empty
187  if len(self) == 0:
188  return base
189  return base.union(*[scaffolding.dimensions for scaffolding in self.values()])
190 
191  def unpackRefs(self) -> NamedKeyDict:
192  """Unpack nested single-element `DatasetRef` lists into a new
193  dictionary.
194 
195  This method assumes that each `_DatasetScaffolding.refs` list contains
196  exactly one `DatasetRef`, as is the case for all "init" datasets.
197 
198  Returns
199  -------
200  dictionary : `NamedKeyDict`
201  Dictionary mapping `DatasetType` to `DatasetRef`, with both
202  `DatasetType` instances and string names usable as keys.
203  """
204  return NamedKeyDict((datasetType, scaffolding.refs[0]) for datasetType, scaffolding in self.items())
205 
206 
207 @dataclass
209  """Helper class aggregating information about a `PipelineTask`, used when
210  constructing a `QuantumGraph`.
211 
212  See `_PipelineScaffolding` for a top-down description of the full
213  scaffolding data structure.
214 
215  Parameters
216  ----------
217  taskDef : `TaskDef`
218  Data structure that identifies the task class and its config.
219  parent : `_PipelineScaffolding`
220  The parent data structure that will hold the instance being
221  constructed.
222  datasetTypes : `TaskDatasetTypes`
223  Data structure that categorizes the dataset types used by this task.
224 
225  Raises
226  ------
227  GraphBuilderError
228  Raised if the task's dimensions are not a subset of the union of the
229  pipeline's dataset dimensions.
230  """
231  def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
232  universe = parent.dimensions.universe
233  self.taskDef = taskDef
234  self.dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
235  if not self.dimensions.issubset(parent.dimensions):
236  raise GraphBuilderError(f"Task with label '{taskDef.label}' has dimensions "
237  f"{self.dimensions} that are not a subset of "
238  f"the pipeline dimensions {parent.dimensions}.")
239 
240  # Initialize _DatasetScaffoldingDicts as subsets of the one or two
241  # corresponding dicts in the parent _PipelineScaffolding.
242  self.initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
243  parent.initInputs, parent.initIntermediates)
244  self.initOutputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initOutputs,
245  parent.initIntermediates, parent.initOutputs)
246  self.inputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.inputs,
247  parent.inputs, parent.intermediates)
248  self.outputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.outputs,
249  parent.intermediates, parent.outputs)
250  self.prerequisites = _DatasetScaffoldingDict.fromSubset(datasetTypes.prerequisites,
251  parent.prerequisites)
252  # Add backreferences to the _DatasetScaffolding objects that point to
253  # this Task.
254  for dataset in itertools.chain(self.initInputs.values(), self.inputs.values(),
255  self.prerequisites.values()):
256  dataset.consumers[self.taskDef.label] = self
257  for dataset in itertools.chain(self.initOutputs.values(), self.outputs.values()):
258  assert dataset.producer is None
259  dataset.producer = self
260  self.dataIds = set()
261  self.quanta = []
262 
263  taskDef: TaskDef
264  """Data structure that identifies the task class and its config
265  (`TaskDef`).
266  """
267 
268  dimensions: DimensionGraph
269  """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
270  """
271 
272  initInputs: _DatasetScaffoldingDict
273  """Dictionary containing information about datasets used to construct this
274  task (`_DatasetScaffoldingDict`).
275  """
276 
277  initOutputs: _DatasetScaffoldingDict
278  """Dictionary containing information about datasets produced as a
279  side-effect of constructing this task (`_DatasetScaffoldingDict`).
280  """
281 
282  inputs: _DatasetScaffoldingDict
283  """Dictionary containing information about datasets used as regular,
284  graph-constraining inputs to this task (`_DatasetScaffoldingDict`).
285  """
286 
287  outputs: _DatasetScaffoldingDict
288  """Dictionary containing information about datasets produced by this task
289  (`_DatasetScaffoldingDict`).
290  """
291 
292  prerequisites: _DatasetScaffoldingDict
293  """Dictionary containing information about input datasets that must be
294  present in the repository before any Pipeline containing this task is run
295  (`_DatasetScaffoldingDict`).
296  """
297 
298  dataIds: Set[ExpandedDataCoordinate]
299  """Data IDs for all quanta for this task in the graph (`set` of
300  `ExpandedDataCoordinate`).
301 
302  Populated after construction by `_PipelineScaffolding.fillDataIds`.
303  """
304 
305  quanta: List[Quantum]
306  """All quanta for this task in the graph (`list` of `Quantum`).
307 
308  Populated after construction by `_PipelineScaffolding.fillQuanta`.
309  """
310 
311  def addQuantum(self, quantum: Quantum):
312  config = self.taskDef.config
313  connectionClass = config.connections.ConnectionsClass
314  connectionInstance = connectionClass(config=config)
315  # This will raise if one of the check conditions is not met, which is the intended
316  # behavior
317  result = connectionInstance.adjustQuantum(quantum.predictedInputs)
318  quantum._predictedInputs = NamedKeyDict(result)
319 
320  # If this function has reached this far add the quantum
321  self.quanta.append(quantum)
322 
323  def makeQuantumGraphTaskNodes(self) -> QuantumGraphTaskNodes:
324  """Create a `QuantumGraphTaskNodes` instance from the information in
325  ``self``.
326 
327  Returns
328  -------
329  nodes : `QuantumGraphTaskNodes`
330  The `QuantumGraph` elements corresponding to this task.
331  """
332  return QuantumGraphTaskNodes(
333  taskDef=self.taskDef,
334  quanta=self.quanta,
335  initInputs=self.initInputs.unpackRefs(),
336  initOutputs=self.initOutputs.unpackRefs(),
337  )
338 
339 
340 @dataclass
342  """A helper data structure that organizes the information involved in
343  constructing a `QuantumGraph` for a `Pipeline`.
344 
345  Parameters
346  ----------
347  pipeline : `Pipeline`
348  Sequence of tasks from which a graph is to be constructed. Must
349  have nested task classes already imported.
350  universe : `DimensionUniverse`
351  Universe of all possible dimensions.
352 
353  Raises
354  ------
355  GraphBuilderError
356  Raised if the task's dimensions are not a subset of the union of the
357  pipeline's dataset dimensions.
358 
359  Notes
360  -----
361  The scaffolding data structure contains nested data structures for both
362  tasks (`_TaskScaffolding`) and datasets (`_DatasetScaffolding`), with the
363  latter held by `_DatasetScaffoldingDict`. The dataset data structures are
364  shared between the pipeline-level structure (which aggregates all datasets
365  and categorizes them from the perspective of the complete pipeline) and the
366  individual tasks that use them as inputs and outputs.
367 
368  `QuantumGraph` construction proceeds in five steps, with each corresponding
369  to a different `_PipelineScaffolding` method:
370 
371  1. When `_PipelineScaffolding` is constructed, we extract and categorize
372  the DatasetTypes used by the pipeline (delegating to
373  `PipelineDatasetTypes.fromPipeline`), then use these to construct the
374  nested `_TaskScaffolding` and `_DatasetScaffolding` objects.
375 
376  2. In `fillDataIds`, we construct and run the "Big Join Query", which
377  returns related tuples of all dimensions used to identify any regular
378  input, output, and intermediate datasets (not prerequisites). We then
379  iterate over these tuples of related dimensions, identifying the subsets
380  that correspond to distinct data IDs for each task and dataset type.
381 
382  3. In `fillDatasetRefs`, we run follow-up queries against all of the
383  dataset data IDs previously identified, populating the
384  `_DatasetScaffolding.refs` lists - except for those for prerequisite
385  datasets, which cannot be resolved until distinct quanta are
386  identified.
387 
388  4. In `fillQuanta`, we extract subsets from the lists of `DatasetRef` into
389  the inputs and outputs for each `Quantum` and search for prerequisite
390  datasets, populating `_TaskScaffolding.quanta`.
391 
392  5. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
393  per-task quanta identified in the previous step.
394  """
395  def __init__(self, pipeline, *, registry):
396  self.tasks = []
397  # Aggregate and categorize the DatasetTypes in the Pipeline.
398  datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
399  # Construct dictionaries that map those DatasetTypes to structures
400  # that will (later) hold addiitonal information about them.
401  for attr in ("initInputs", "initIntermediates", "initOutputs",
402  "inputs", "intermediates", "outputs", "prerequisites"):
403  setattr(self, attr, _DatasetScaffoldingDict.fromDatasetTypes(getattr(datasetTypes, attr),
404  universe=registry.dimensions))
405  # Aggregate all dimensions for all non-init, non-prerequisite
406  # DatasetTypes. These are the ones we'll include in the big join query.
407  self.dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
408  self.outputs.dimensions)
409  # Construct scaffolding nodes for each Task, and add backreferences
410  # to the Task from each DatasetScaffolding node.
411  # Note that there's only one scaffolding node for each DatasetType, shared by
412  # _PipelineScaffolding and all _TaskScaffoldings that reference it.
413  if isinstance(pipeline, Pipeline):
414  pipeline = pipeline.toExpandedPipeline()
415  self.tasks = [_TaskScaffolding(taskDef=taskDef, parent=self, datasetTypes=taskDatasetTypes)
416  for taskDef, taskDatasetTypes in zip(pipeline,
417  datasetTypes.byTask.values())]
418 
419  tasks: List[_TaskScaffolding]
420  """Scaffolding data structures for each task in the pipeline
421  (`list` of `_TaskScaffolding`).
422  """
423 
424  initInputs: _DatasetScaffoldingDict
425  """Datasets consumed but not produced when constructing the tasks in this
426  pipeline (`_DatasetScaffoldingDict`).
427  """
428 
429  initIntermediates: _DatasetScaffoldingDict
430  """Datasets that are both consumed and produced when constructing the tasks
431  in this pipeline (`_DatasetScaffoldingDict`).
432  """
433 
434  initOutputs: _DatasetScaffoldingDict
435  """Datasets produced but not consumed when constructing the tasks in this
436  pipeline (`_DatasetScaffoldingDict`).
437  """
438 
439  inputs: _DatasetScaffoldingDict
440  """Datasets that are consumed but not produced when running this pipeline
441  (`_DatasetScaffoldingDict`).
442  """
443 
444  intermediates: _DatasetScaffoldingDict
445  """Datasets that are both produced and consumed when running this pipeline
446  (`_DatasetScaffoldingDict`).
447  """
448 
449  outputs: _DatasetScaffoldingDict
450  """Datasets produced but not consumed when when running this pipeline
451  (`_DatasetScaffoldingDict`).
452  """
453 
454  prerequisites: _DatasetScaffoldingDict
455  """Datasets that are consumed when running this pipeline and looked up
456  per-Quantum when generating the graph (`_DatasetScaffoldingDict`).
457  """
458 
459  dimensions: DimensionGraph
460  """All dimensions used by any regular input, intermediate, or output
461  (not prerequisite) dataset; the set of dimension used in the "Big Join
462  Query" (`DimensionGraph`).
463 
464  This is required to be a superset of all task quantum dimensions.
465  """
466 
467  def fillDataIds(self, registry, inputCollections, userQuery):
468  """Query for the data IDs that connect nodes in the `QuantumGraph`.
469 
470  This method populates `_TaskScaffolding.dataIds` and
471  `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
472 
473  Parameters
474  ----------
475  registry : `lsst.daf.butler.Registry`
476  Registry for the data repository; used for all data ID queries.
477  inputCollections : `~collections.abc.Mapping`
478  Mapping from dataset type name to an ordered sequence of
479  collections to search for that dataset. A `defaultdict` is
480  recommended for the case where the same collections should be
481  used for most datasets.
482  userQuery : `str`, optional
483  User-provided expression to limit the data IDs processed.
484  """
485  # Initialization datasets always have empty data IDs.
486  emptyDataId = ExpandedDataCoordinate(registry.dimensions.empty, (), records={})
487  for scaffolding in itertools.chain(self.initInputs.values(),
488  self.initIntermediates.values(),
489  self.initOutputs.values()):
490  scaffolding.dataIds.add(emptyDataId)
491  # Run one big query for the data IDs for task dimensions and regular
492  # inputs and outputs. We limit the query to only dimensions that are
493  # associated with the input dataset types, but don't (yet) try to
494  # obtain the dataset_ids for those inputs.
495  resultIter = registry.queryDimensions(
496  self.dimensions,
497  datasets={
498  datasetType: inputCollections[datasetType.name]
499  for datasetType in self.inputs
500  },
501  where=userQuery,
502  )
503  # Iterate over query results and populate the data IDs in
504  # self._TaskScaffolding.refs, extracting the subsets of the common data
505  # ID from the query corresponding to the dimensions of each. By using
506  # sets, we remove duplicates caused by query rows in which the
507  # dimensions that change are not relevant for that task or dataset
508  # type. For example, if the Big Join Query involves the dimensions
509  # (instrument, visit, detector, skymap, tract, patch), we extract
510  # "calexp" data IDs from the instrument, visit, and detector values
511  # only, and rely on `set.add` to avoid duplications due to result rows
512  # in which only skymap, tract, and patch are varying. The Big Join
513  # Query is defined such that only visit+detector and tract+patch
514  # combinations that represent spatial overlaps are included in the
515  # results.
516  for commonDataId in resultIter:
517  for taskScaffolding in self.tasks:
518  taskScaffolding.dataIds.add(commonDataId.subset(taskScaffolding.dimensions))
519  for datasetType, scaffolding in itertools.chain(self.inputs.items(),
520  self.intermediates.items(),
521  self.outputs.items()):
522  scaffolding.dataIds.add(commonDataId.subset(scaffolding.dimensions))
523 
524  def fillDatasetRefs(self, registry, inputCollections, outputCollection, *,
525  skipExisting=True, clobberExisting=False):
526  """Perform follow up queries for each dataset data ID produced in
527  `fillDataIds`.
528 
529  This method populates `_DatasetScaffolding.refs` (except for those in
530  `prerequisites`).
531 
532  Parameters
533  ----------
534  registry : `lsst.daf.butler.Registry`
535  Registry for the data repository; used for all data ID queries.
536  inputCollections : `~collections.abc.Mapping`
537  Mapping from dataset type name to an ordered sequence of
538  collections to search for that dataset. A `defaultdict` is
539  recommended for the case where the same collections should be
540  used for most datasets.
541  outputCollection : `str`
542  Collection for all output datasets.
543  skipExisting : `bool`, optional
544  If `True` (default), a Quantum is not created if all its outputs
545  already exist.
546  clobberExisting : `bool`, optional
547  If `True`, overwrite any outputs that already exist. Cannot be
548  `True` if ``skipExisting`` is.
549 
550  Raises
551  ------
552  ValueError
553  Raised if both `skipExisting` and `clobberExisting` are `True`.
554  OutputExistsError
555  Raised if an output dataset already exists in the output collection
556  and both ``skipExisting`` and ``clobberExisting`` are `False`. The
557  case where some but not all of a quantum's outputs are present and
558  ``skipExisting`` is `True` cannot be identified at this stage, and
559  is handled by `fillQuanta` instead.
560  """
561  if clobberExisting and skipExisting:
562  raise ValueError("clobberExisting and skipExisting cannot both be true.")
563  # Look up input and initInput datasets in the input collection(s).
564  for datasetType, scaffolding in itertools.chain(self.initInputs.items(), self.inputs.items()):
565  for dataId in scaffolding.dataIds:
566  refs = list(
567  registry.queryDatasets(
568  datasetType,
569  collections=inputCollections[datasetType.name],
570  dataId=dataId,
571  deduplicate=True,
572  expand=True,
573  )
574  )
575  assert len(refs) == 1, "BJQ guarantees exactly one input for each data ID."
576  scaffolding.refs.extend(refs)
577  # Look up [init] intermediate and output datasets in the output collection,
578  # unless clobberExisting is True (in which case we don't care if these
579  # already exist).
580  for datasetType, scaffolding in itertools.chain(self.initIntermediates.items(),
581  self.initOutputs.items(),
582  self.intermediates.items(),
583  self.outputs.items()):
584  for dataId in scaffolding.dataIds:
585  # TODO: we could easily support per-DatasetType clobberExisting
586  # and skipExisting (it might make sense to put them in
587  # originInfo), and I could imagine that being useful - it's
588  # probably required in order to support writing initOutputs
589  # before QuantumGraph generation.
590  if clobberExisting:
591  ref = None
592  else:
593  ref = registry.find(collection=outputCollection, datasetType=datasetType, dataId=dataId)
594  if ref is None:
595  ref = DatasetRef(datasetType, dataId)
596  elif not skipExisting:
597  raise OutputExistsError(f"Output dataset {datasetType.name} already exists in "
598  f"output collection {outputCollection} with data ID {dataId}.")
599  scaffolding.refs.append(ref)
600  # Prerequisite dataset lookups are deferred until fillQuanta.
601 
602  def fillQuanta(self, registry, inputCollections, *, skipExisting=True):
603  """Define quanta for each task by splitting up the datasets associated
604  with each task data ID.
605 
606  This method populates `_TaskScaffolding.quanta`.
607 
608  Parameters
609  ----------
610  registry : `lsst.daf.butler.Registry`
611  Registry for the data repository; used for all data ID queries.
612  inputCollections : `~collections.abc.Mapping`
613  Mapping from dataset type name to an ordered sequence of
614  collections to search for that dataset. A `defaultdict` is
615  recommended for the case where the same collections should be
616  used for most datasets.
617  skipExisting : `bool`, optional
618  If `True` (default), a Quantum is not created if all its outputs
619  already exist.
620  """
621  for task in self.tasks:
622  for quantumDataId in task.dataIds:
623  # Identify the (regular) inputs that correspond to the Quantum
624  # with this data ID. These are those whose data IDs have the
625  # same values for all dimensions they have in common.
626  # We do this data IDs expanded to include implied dimensions,
627  # which is why _DatasetScaffolding.dimensions is thus expanded
628  # even though DatasetType.dimensions is not.
629  inputs = NamedKeyDict()
630  for datasetType, scaffolding in task.inputs.items():
631  inputs[datasetType] = [ref for ref, dataId in zip(scaffolding.refs, scaffolding.dataIds)
632  if quantumDataId.matches(dataId)]
633  # Same for outputs.
634  outputs = NamedKeyDict()
635  allOutputsPresent = True
636  for datasetType, scaffolding in task.outputs.items():
637  outputs[datasetType] = []
638  for ref, dataId in zip(scaffolding.refs, scaffolding.dataIds):
639  if quantumDataId.matches(dataId):
640  if ref.id is None:
641  allOutputsPresent = False
642  else:
643  assert skipExisting, "Existing outputs should have already been identified."
644  if not allOutputsPresent:
645  raise OutputExistsError(f"Output {datasetType.name} with data ID "
646  f"{dataId} already exists, but other outputs "
647  f"for task with label {task.taskDef.label} "
648  f"and data ID {quantumDataId} do not.")
649  outputs[datasetType].append(ref)
650  if allOutputsPresent and skipExisting:
651  continue
652 
653  # Look up prerequisite datasets in the input collection(s).
654  # These may have dimensions that extend beyond those we queried
655  # for originally, because we want to permit those data ID
656  # values to differ across quanta and dataset types.
657  # For example, the same quantum may have a flat and bias with
658  # a different calibration_label, or a refcat with a skypix
659  # value that overlaps the quantum's data ID's region, but not
660  # the user expression used for the initial query.
661  for datasetType, scaffolding in task.prerequisites.items():
662  refs = list(
663  registry.queryDatasets(
664  datasetType,
665  collections=inputCollections[datasetType.name],
666  dataId=quantumDataId,
667  deduplicate=True,
668  expand=True,
669  )
670  )
671  inputs[datasetType] = refs
672  task.addQuantum(
673  Quantum(
674  taskName=task.taskDef.taskName,
675  taskClass=task.taskDef.taskClass,
676  dataId=quantumDataId,
677  initInputs=task.initInputs.unpackRefs(),
678  predictedInputs=inputs,
679  outputs=outputs,
680  )
681  )
682 
683  def makeQuantumGraph(self):
684  """Create a `QuantumGraph` from the quanta already present in
685  the scaffolding data structure.
686  """
687  graph = QuantumGraph(task.makeQuantumGraphTaskNodes() for task in self.tasks)
688  graph.initInputs = self.initInputs.unpackRefs()
689  graph.initOutputs = self.initOutputs.unpackRefs()
690  graph.initIntermediates = self.initIntermediates.unpackRefs()
691  return graph
692 
693 
694 # ------------------------
695 # Exported definitions --
696 # ------------------------
697 
698 
699 class GraphBuilderError(Exception):
700  """Base class for exceptions generated by graph builder.
701  """
702  pass
703 
704 
705 class OutputExistsError(GraphBuilderError):
706  """Exception generated when output datasets already exist.
707  """
708  pass
709 
710 
712  """Exception generated when a prerequisite dataset does not exist.
713  """
714  pass
715 
716 
718  """GraphBuilder class is responsible for building task execution graph from
719  a Pipeline.
720 
721  Parameters
722  ----------
723  registry : `~lsst.daf.butler.Registry`
724  Data butler instance.
725  skipExisting : `bool`, optional
726  If `True` (default), a Quantum is not created if all its outputs
727  already exist.
728  clobberExisting : `bool`, optional
729  If `True`, overwrite any outputs that already exist. Cannot be
730  `True` if ``skipExisting`` is.
731  """
732 
733  def __init__(self, registry, skipExisting=True, clobberExisting=False):
734  self.registry = registry
735  self.dimensions = registry.dimensions
736  self.skipExisting = skipExisting
737  self.clobberExisting = clobberExisting
738 
739  def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery):
740  """Create execution graph for a pipeline.
741 
742  Parameters
743  ----------
744  pipeline : `Pipeline`
745  Pipeline definition, task names/classes and their configs.
746  inputCollections : `~collections.abc.Mapping`
747  Mapping from dataset type name to an ordered sequence of
748  collections to search for that dataset. A `defaultdict` is
749  recommended for the case where the same collections should be
750  used for most datasets.
751  outputCollection : `str`
752  Collection for all output datasets.
753  userQuery : `str`
754  String which defunes user-defined selection for registry, should be
755  empty or `None` if there is no restrictions on data selection.
756 
757  Returns
758  -------
759  graph : `QuantumGraph`
760 
761  Raises
762  ------
763  UserExpressionError
764  Raised when user expression cannot be parsed.
765  OutputExistsError
766  Raised when output datasets already exist.
767  Exception
768  Other exceptions types may be raised by underlying registry
769  classes.
770  """
771  scaffolding = _PipelineScaffolding(pipeline, registry=self.registry)
772 
773  scaffolding.fillDataIds(self.registry, inputCollections, userQuery)
774  scaffolding.fillDatasetRefs(self.registry, inputCollections, outputCollection,
775  skipExisting=self.skipExisting,
776  clobberExisting=self.clobberExisting)
777  scaffolding.fillQuanta(self.registry, inputCollections,
778  skipExisting=self.skipExisting)
779 
780  return scaffolding.makeQuantumGraph()
def fillDatasetRefs(self, registry, inputCollections, outputCollection, skipExisting=True, clobberExisting=False)
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:902
def __init__(self, registry, skipExisting=True, clobberExisting=False)
def fillDataIds(self, registry, inputCollections, userQuery)
def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery)
def fillQuanta(self, registry, inputCollections, skipExisting=True)
daf::base::PropertyList * list
Definition: fits.cc:903