LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining GraphBuilder class and related methods.
24 """
25 
26 __all__ = ['GraphBuilder']
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 import itertools
32 from collections import ChainMap
33 from dataclasses import dataclass
34 from typing import Set, List, Dict, Optional, Iterable
35 import logging
36 
37 # -----------------------------
38 # Imports for other modules --
39 # -----------------------------
40 from .pipeline import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
41 from .graph import QuantumGraph, QuantumGraphTaskNodes
42 from lsst.daf.butler import (
43  DatasetRef,
44  DatasetType,
45  DimensionGraph,
46  DimensionUniverse,
47  ExpandedDataCoordinate,
48  Quantum,
49 )
50 from lsst.daf.butler.core.utils import NamedKeyDict
51 
52 # ----------------------------------
53 # Local non-exported definitions --
54 # ----------------------------------
55 
56 _LOG = logging.getLogger(__name__.partition(".")[2])
57 
58 
59 @dataclass
61  """Helper class aggregating information about a `DatasetType`, used when
62  constructing a `QuantumGraph`.
63 
64  `_DatasetScaffolding` does not hold the `DatasetType` instance itself
65  because it is usually used as the value type in `_DatasetScaffoldingDict`,
66  which uses `DatasetType` instances as keys.
67 
68  See `_PipelineScaffolding` for a top-down description of the full
69  scaffolding data structure.
70 
71  Parameters
72  ----------
73  dimensions : `DimensionGraph`
74  Dimensions of the `DatasetType`.
75  """
76  def __init__(self, dimensions: DimensionGraph):
77  self.dimensions = dimensions
78  self.producer = None
79  self.consumers = {}
80  self.dataIds = set()
81  self.refs = []
82 
83  __slots__ = ("dimensions", "producer", "consumers", "dataIds", "refs")
84 
85  dimensions: DimensionGraph
86  """The dimensions of the dataset type (`DimensionGraph`).
87 
88  Set during `_PipelineScaffolding` construction.
89  """
90 
91  producer: Optional[_TaskScaffolding]
92  """The scaffolding objects for the Task that produces this dataset.
93 
94  Set during `_PipelineScaffolding` construction.
95  """
96 
97  consumers: Dict[str, _TaskScaffolding]
98  """The scaffolding objects for the Tasks that consume this dataset,
99  keyed by their label in the `Pipeline`.
100 
101  Set during `_PipelineScaffolding` construction.
102  """
103 
104  dataIds: Set[ExpandedDataCoordinate]
105  """Data IDs for all instances of this dataset type in the graph.
106 
107  Populated after construction by `_PipelineScaffolding.fillDataIds`.
108  """
109 
110  refs: List[DatasetRef]
111  """References for all instances of this dataset type in the graph.
112 
113  Populated after construction by `_PipelineScaffolding.fillDatasetRefs`.
114  """
115 
116 
117 class _DatasetScaffoldingDict(NamedKeyDict):
118  """Custom dictionary that maps `DatasetType` to `_DatasetScaffolding`.
119 
120  See `_PipelineScaffolding` for a top-down description of the full
121  scaffolding data structure.
122 
123  Parameters
124  ----------
125  args
126  Positional arguments are forwarded to the `dict` constructor.
127  universe : `DimensionUniverse`
128  Universe of all possible dimensions.
129  """
130  def __init__(self, *args, universe: DimensionGraph):
131  super().__init__(*args)
132  self.universe = universe
133 
134  @classmethod
135  def fromDatasetTypes(cls, datasetTypes: Iterable[DatasetType], *,
136  universe: DimensionUniverse) -> _DatasetScaffoldingDict:
137  """Construct a a dictionary from a flat iterable of `DatasetType` keys.
138 
139  Parameters
140  ----------
141  datasetTypes : `iterable` of `DatasetType`
142  DatasetTypes to use as keys for the dict. Values will be
143  constructed from the dimensions of the keys.
144  universe : `DimensionUniverse`
145  Universe of all possible dimensions.
146 
147  Returns
148  -------
149  dictionary : `_DatasetScaffoldingDict`
150  A new dictionary instance.
151  """
152  return cls(((datasetType, _DatasetScaffolding(datasetType.dimensions))
153  for datasetType in datasetTypes),
154  universe=universe)
155 
156  @classmethod
157  def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetScaffoldingDict,
158  *rest) -> _DatasetScaffoldingDict:
159  """Return a new dictionary by extracting items corresponding to the
160  given keys from one or more existing dictionaries.
161 
162  Parameters
163  ----------
164  datasetTypes : `iterable` of `DatasetType`
165  DatasetTypes to use as keys for the dict. Values will be obtained
166  by lookups against ``first`` and ``rest``.
167  first : `_DatasetScaffoldingDict`
168  Another dictionary from which to extract values.
169  rest
170  Additional dictionaries from which to extract values.
171 
172  Returns
173  -------
174  dictionary : `_DatasetScaffoldingDict`
175  A new dictionary instance.
176  """
177  combined = ChainMap(first, *rest)
178  return cls(((datasetType, combined[datasetType]) for datasetType in datasetTypes),
179  universe=first.universe)
180 
181  @property
182  def dimensions(self) -> DimensionGraph:
183  """The union of all dimensions used by all dataset types in this
184  dictionary, including implied dependencies (`DimensionGraph`).
185  """
186  base = self.universe.empty
187  if len(self) == 0:
188  return base
189  return base.union(*[scaffolding.dimensions for scaffolding in self.values()])
190 
191  def unpackRefs(self) -> NamedKeyDict:
192  """Unpack nested single-element `DatasetRef` lists into a new
193  dictionary.
194 
195  This method assumes that each `_DatasetScaffolding.refs` list contains
196  exactly one `DatasetRef`, as is the case for all "init" datasets.
197 
198  Returns
199  -------
200  dictionary : `NamedKeyDict`
201  Dictionary mapping `DatasetType` to `DatasetRef`, with both
202  `DatasetType` instances and string names usable as keys.
203  """
204  return NamedKeyDict((datasetType, scaffolding.refs[0]) for datasetType, scaffolding in self.items())
205 
206 
207 @dataclass
209  """Helper class aggregating information about a `PipelineTask`, used when
210  constructing a `QuantumGraph`.
211 
212  See `_PipelineScaffolding` for a top-down description of the full
213  scaffolding data structure.
214 
215  Parameters
216  ----------
217  taskDef : `TaskDef`
218  Data structure that identifies the task class and its config.
219  parent : `_PipelineScaffolding`
220  The parent data structure that will hold the instance being
221  constructed.
222  datasetTypes : `TaskDatasetTypes`
223  Data structure that categorizes the dataset types used by this task.
224 
225  Raises
226  ------
227  GraphBuilderError
228  Raised if the task's dimensions are not a subset of the union of the
229  pipeline's dataset dimensions.
230  """
231  def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
232  universe = parent.dimensions.universe
233  self.taskDef = taskDef
234  self.dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
235  if not self.dimensions.issubset(parent.dimensions):
236  raise GraphBuilderError(f"Task with label '{taskDef.label}' has dimensions "
237  f"{self.dimensions} that are not a subset of "
238  f"the pipeline dimensions {parent.dimensions}.")
239  # Initialize _DatasetScaffoldingDicts as subsets of the one or two
240  # corresponding dicts in the parent _PipelineScaffolding.
241  self.initInputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initInputs,
242  parent.initInputs, parent.initIntermediates)
243  self.initOutputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.initOutputs,
244  parent.initIntermediates, parent.initOutputs)
245  self.inputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.inputs,
246  parent.inputs, parent.intermediates)
247  self.outputs = _DatasetScaffoldingDict.fromSubset(datasetTypes.outputs,
248  parent.intermediates, parent.outputs)
249  self.prerequisites = _DatasetScaffoldingDict.fromSubset(datasetTypes.prerequisites,
250  parent.prerequisites)
251  # Add backreferences to the _DatasetScaffolding objects that point to
252  # this Task.
253  for dataset in itertools.chain(self.initInputs.values(), self.inputs.values(),
254  self.prerequisites.values()):
255  dataset.consumers[self.taskDef.label] = self
256  for dataset in itertools.chain(self.initOutputs.values(), self.outputs.values()):
257  assert dataset.producer is None
258  dataset.producer = self
259  self.dataIds = set()
260  self.quanta = []
261 
262  taskDef: TaskDef
263  """Data structure that identifies the task class and its config
264  (`TaskDef`).
265  """
266 
267  dimensions: DimensionGraph
268  """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
269  """
270 
271  initInputs: _DatasetScaffoldingDict
272  """Dictionary containing information about datasets used to construct this
273  task (`_DatasetScaffoldingDict`).
274  """
275 
276  initOutputs: _DatasetScaffoldingDict
277  """Dictionary containing information about datasets produced as a
278  side-effect of constructing this task (`_DatasetScaffoldingDict`).
279  """
280 
281  inputs: _DatasetScaffoldingDict
282  """Dictionary containing information about datasets used as regular,
283  graph-constraining inputs to this task (`_DatasetScaffoldingDict`).
284  """
285 
286  outputs: _DatasetScaffoldingDict
287  """Dictionary containing information about datasets produced by this task
288  (`_DatasetScaffoldingDict`).
289  """
290 
291  prerequisites: _DatasetScaffoldingDict
292  """Dictionary containing information about input datasets that must be
293  present in the repository before any Pipeline containing this task is run
294  (`_DatasetScaffoldingDict`).
295  """
296 
297  dataIds: Set[ExpandedDataCoordinate]
298  """Data IDs for all quanta for this task in the graph (`set` of
299  `ExpandedDataCoordinate`).
300 
301  Populated after construction by `_PipelineScaffolding.fillDataIds`.
302  """
303 
304  quanta: List[Quantum]
305  """All quanta for this task in the graph (`list` of `Quantum`).
306 
307  Populated after construction by `_PipelineScaffolding.fillQuanta`.
308  """
309 
310  def addQuantum(self, quantum: Quantum):
311  config = self.taskDef.config
312  connectionClass = config.connections.ConnectionsClass
313  connectionInstance = connectionClass(config=config)
314  # This will raise if one of the check conditions is not met, which is the intended
315  # behavior
316  result = connectionInstance.adjustQuantum(quantum.predictedInputs)
317  quantum._predictedInputs = NamedKeyDict(result)
318 
319  # If this function has reached this far add the quantum
320  self.quanta.append(quantum)
321 
322  def makeQuantumGraphTaskNodes(self) -> QuantumGraphTaskNodes:
323  """Create a `QuantumGraphTaskNodes` instance from the information in
324  ``self``.
325 
326  Returns
327  -------
328  nodes : `QuantumGraphTaskNodes`
329  The `QuantumGraph` elements corresponding to this task.
330  """
331  return QuantumGraphTaskNodes(
332  taskDef=self.taskDef,
333  quanta=self.quanta,
334  initInputs=self.initInputs.unpackRefs(),
335  initOutputs=self.initOutputs.unpackRefs(),
336  )
337 
338 
339 @dataclass
341  """A helper data structure that organizes the information involved in
342  constructing a `QuantumGraph` for a `Pipeline`.
343 
344  Parameters
345  ----------
346  pipeline : `Pipeline`
347  Sequence of tasks from which a graph is to be constructed. Must
348  have nested task classes already imported.
349  universe : `DimensionUniverse`
350  Universe of all possible dimensions.
351 
352  Raises
353  ------
354  GraphBuilderError
355  Raised if the task's dimensions are not a subset of the union of the
356  pipeline's dataset dimensions.
357 
358  Notes
359  -----
360  The scaffolding data structure contains nested data structures for both
361  tasks (`_TaskScaffolding`) and datasets (`_DatasetScaffolding`), with the
362  latter held by `_DatasetScaffoldingDict`. The dataset data structures are
363  shared between the pipeline-level structure (which aggregates all datasets
364  and categorizes them from the perspective of the complete pipeline) and the
365  individual tasks that use them as inputs and outputs.
366 
367  `QuantumGraph` construction proceeds in five steps, with each corresponding
368  to a different `_PipelineScaffolding` method:
369 
370  1. When `_PipelineScaffolding` is constructed, we extract and categorize
371  the DatasetTypes used by the pipeline (delegating to
372  `PipelineDatasetTypes.fromPipeline`), then use these to construct the
373  nested `_TaskScaffolding` and `_DatasetScaffolding` objects.
374 
375  2. In `fillDataIds`, we construct and run the "Big Join Query", which
376  returns related tuples of all dimensions used to identify any regular
377  input, output, and intermediate datasets (not prerequisites). We then
378  iterate over these tuples of related dimensions, identifying the subsets
379  that correspond to distinct data IDs for each task and dataset type.
380 
381  3. In `fillDatasetRefs`, we run follow-up queries against all of the
382  dataset data IDs previously identified, populating the
383  `_DatasetScaffolding.refs` lists - except for those for prerequisite
384  datasets, which cannot be resolved until distinct quanta are
385  identified.
386 
387  4. In `fillQuanta`, we extract subsets from the lists of `DatasetRef` into
388  the inputs and outputs for each `Quantum` and search for prerequisite
389  datasets, populating `_TaskScaffolding.quanta`.
390 
391  5. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
392  per-task quanta identified in the previous step.
393  """
394  def __init__(self, pipeline, *, registry):
395  self.tasks = []
396  # Aggregate and categorize the DatasetTypes in the Pipeline.
397  datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
398  # Construct dictionaries that map those DatasetTypes to structures
399  # that will (later) hold addiitonal information about them.
400  for attr in ("initInputs", "initIntermediates", "initOutputs",
401  "inputs", "intermediates", "outputs", "prerequisites"):
402  setattr(self, attr, _DatasetScaffoldingDict.fromDatasetTypes(getattr(datasetTypes, attr),
403  universe=registry.dimensions))
404  # Aggregate all dimensions for all non-init, non-prerequisite
405  # DatasetTypes. These are the ones we'll include in the big join query.
406  self.dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
407  self.outputs.dimensions)
408  # Construct scaffolding nodes for each Task, and add backreferences
409  # to the Task from each DatasetScaffolding node.
410  # Note that there's only one scaffolding node for each DatasetType, shared by
411  # _PipelineScaffolding and all _TaskScaffoldings that reference it.
412  if isinstance(pipeline, Pipeline):
413  pipeline = pipeline.toExpandedPipeline()
414  self.tasks = [_TaskScaffolding(taskDef=taskDef, parent=self, datasetTypes=taskDatasetTypes)
415  for taskDef, taskDatasetTypes in zip(pipeline,
416  datasetTypes.byTask.values())]
417 
418  tasks: List[_TaskScaffolding]
419  """Scaffolding data structures for each task in the pipeline
420  (`list` of `_TaskScaffolding`).
421  """
422 
423  initInputs: _DatasetScaffoldingDict
424  """Datasets consumed but not produced when constructing the tasks in this
425  pipeline (`_DatasetScaffoldingDict`).
426  """
427 
428  initIntermediates: _DatasetScaffoldingDict
429  """Datasets that are both consumed and produced when constructing the tasks
430  in this pipeline (`_DatasetScaffoldingDict`).
431  """
432 
433  initOutputs: _DatasetScaffoldingDict
434  """Datasets produced but not consumed when constructing the tasks in this
435  pipeline (`_DatasetScaffoldingDict`).
436  """
437 
438  inputs: _DatasetScaffoldingDict
439  """Datasets that are consumed but not produced when running this pipeline
440  (`_DatasetScaffoldingDict`).
441  """
442 
443  intermediates: _DatasetScaffoldingDict
444  """Datasets that are both produced and consumed when running this pipeline
445  (`_DatasetScaffoldingDict`).
446  """
447 
448  outputs: _DatasetScaffoldingDict
449  """Datasets produced but not consumed when when running this pipeline
450  (`_DatasetScaffoldingDict`).
451  """
452 
453  prerequisites: _DatasetScaffoldingDict
454  """Datasets that are consumed when running this pipeline and looked up
455  per-Quantum when generating the graph (`_DatasetScaffoldingDict`).
456  """
457 
458  dimensions: DimensionGraph
459  """All dimensions used by any regular input, intermediate, or output
460  (not prerequisite) dataset; the set of dimension used in the "Big Join
461  Query" (`DimensionGraph`).
462 
463  This is required to be a superset of all task quantum dimensions.
464  """
465 
466  def fillDataIds(self, registry, inputCollections, userQuery):
467  """Query for the data IDs that connect nodes in the `QuantumGraph`.
468 
469  This method populates `_TaskScaffolding.dataIds` and
470  `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
471 
472  Parameters
473  ----------
474  registry : `lsst.daf.butler.Registry`
475  Registry for the data repository; used for all data ID queries.
476  inputCollections : `~collections.abc.Mapping`
477  Mapping from dataset type name to an ordered sequence of
478  collections to search for that dataset. A `defaultdict` is
479  recommended for the case where the same collections should be
480  used for most datasets.
481  userQuery : `str`, optional
482  User-provided expression to limit the data IDs processed.
483  """
484  # Initialization datasets always have empty data IDs.
485  emptyDataId = ExpandedDataCoordinate(registry.dimensions.empty, (), records={})
486  for scaffolding in itertools.chain(self.initInputs.values(),
487  self.initIntermediates.values(),
488  self.initOutputs.values()):
489  scaffolding.dataIds.add(emptyDataId)
490  # Run one big query for the data IDs for task dimensions and regular
491  # inputs and outputs. We limit the query to only dimensions that are
492  # associated with the input dataset types, but don't (yet) try to
493  # obtain the dataset_ids for those inputs.
494  resultIter = registry.queryDimensions(
495  self.dimensions,
496  datasets={
497  datasetType: inputCollections[datasetType.name]
498  for datasetType in self.inputs
499  },
500  where=userQuery,
501  )
502  # Iterate over query results and populate the data IDs in
503  # self._TaskScaffolding.refs, extracting the subsets of the common data
504  # ID from the query corresponding to the dimensions of each. By using
505  # sets, we remove duplicates caused by query rows in which the
506  # dimensions that change are not relevant for that task or dataset
507  # type. For example, if the Big Join Query involves the dimensions
508  # (instrument, visit, detector, skymap, tract, patch), we extract
509  # "calexp" data IDs from the instrument, visit, and detector values
510  # only, and rely on `set.add` to avoid duplications due to result rows
511  # in which only skymap, tract, and patch are varying. The Big Join
512  # Query is defined such that only visit+detector and tract+patch
513  # combinations that represent spatial overlaps are included in the
514  # results.
515  for commonDataId in resultIter:
516  for taskScaffolding in self.tasks:
517  taskScaffolding.dataIds.add(commonDataId.subset(taskScaffolding.dimensions))
518  for datasetType, scaffolding in itertools.chain(self.inputs.items(),
519  self.intermediates.items(),
520  self.outputs.items()):
521  scaffolding.dataIds.add(commonDataId.subset(scaffolding.dimensions))
522 
523  def fillDatasetRefs(self, registry, inputCollections, outputCollection, *,
524  skipExisting=True, clobberExisting=False):
525  """Perform follow up queries for each dataset data ID produced in
526  `fillDataIds`.
527 
528  This method populates `_DatasetScaffolding.refs` (except for those in
529  `prerequisites`).
530 
531  Parameters
532  ----------
533  registry : `lsst.daf.butler.Registry`
534  Registry for the data repository; used for all data ID queries.
535  inputCollections : `~collections.abc.Mapping`
536  Mapping from dataset type name to an ordered sequence of
537  collections to search for that dataset. A `defaultdict` is
538  recommended for the case where the same collections should be
539  used for most datasets.
540  outputCollection : `str`
541  Collection for all output datasets.
542  skipExisting : `bool`, optional
543  If `True` (default), a Quantum is not created if all its outputs
544  already exist.
545  clobberExisting : `bool`, optional
546  If `True`, overwrite any outputs that already exist. Cannot be
547  `True` if ``skipExisting`` is.
548 
549  Raises
550  ------
551  ValueError
552  Raised if both `skipExisting` and `clobberExisting` are `True`.
553  OutputExistsError
554  Raised if an output dataset already exists in the output collection
555  and both ``skipExisting`` and ``clobberExisting`` are `False`. The
556  case where some but not all of a quantum's outputs are present and
557  ``skipExisting`` is `True` cannot be identified at this stage, and
558  is handled by `fillQuanta` instead.
559  """
560  if clobberExisting and skipExisting:
561  raise ValueError("clobberExisting and skipExisting cannot both be true.")
562  # Look up input and initInput datasets in the input collection(s).
563  for datasetType, scaffolding in itertools.chain(self.initInputs.items(), self.inputs.items()):
564  for dataId in scaffolding.dataIds:
565  refs = list(
566  registry.queryDatasets(
567  datasetType,
568  collections=inputCollections[datasetType.name],
569  dataId=dataId,
570  deduplicate=True,
571  expand=True,
572  )
573  )
574  assert len(refs) == 1, "BJQ guarantees exactly one input for each data ID."
575  scaffolding.refs.extend(refs)
576  # Look up [init] intermediate and output datasets in the output collection,
577  # unless clobberExisting is True (in which case we don't care if these
578  # already exist).
579  for datasetType, scaffolding in itertools.chain(self.initIntermediates.items(),
580  self.initOutputs.items(),
581  self.intermediates.items(),
582  self.outputs.items()):
583  for dataId in scaffolding.dataIds:
584  # TODO: we could easily support per-DatasetType clobberExisting
585  # and skipExisting (it might make sense to put them in
586  # originInfo), and I could imagine that being useful - it's
587  # probably required in order to support writing initOutputs
588  # before QuantumGraph generation.
589  if clobberExisting:
590  ref = None
591  else:
592  ref = registry.find(collection=outputCollection, datasetType=datasetType, dataId=dataId)
593  if ref is None:
594  ref = DatasetRef(datasetType, dataId)
595  elif not skipExisting:
596  raise OutputExistsError(f"Output dataset {datasetType.name} already exists in "
597  f"output collection {outputCollection} with data ID {dataId}.")
598  scaffolding.refs.append(ref)
599  # Prerequisite dataset lookups are deferred until fillQuanta.
600 
601  def fillQuanta(self, registry, inputCollections, *, skipExisting=True):
602  """Define quanta for each task by splitting up the datasets associated
603  with each task data ID.
604 
605  This method populates `_TaskScaffolding.quanta`.
606 
607  Parameters
608  ----------
609  registry : `lsst.daf.butler.Registry`
610  Registry for the data repository; used for all data ID queries.
611  inputCollections : `~collections.abc.Mapping`
612  Mapping from dataset type name to an ordered sequence of
613  collections to search for that dataset. A `defaultdict` is
614  recommended for the case where the same collections should be
615  used for most datasets.
616  skipExisting : `bool`, optional
617  If `True` (default), a Quantum is not created if all its outputs
618  already exist.
619  """
620  for task in self.tasks:
621  for quantumDataId in task.dataIds:
622  # Identify the (regular) inputs that correspond to the Quantum
623  # with this data ID. These are those whose data IDs have the
624  # same values for all dimensions they have in common.
625  # We do this data IDs expanded to include implied dimensions,
626  # which is why _DatasetScaffolding.dimensions is thus expanded
627  # even though DatasetType.dimensions is not.
628  inputs = NamedKeyDict()
629  for datasetType, scaffolding in task.inputs.items():
630  inputs[datasetType] = [ref for ref, dataId in zip(scaffolding.refs, scaffolding.dataIds)
631  if quantumDataId.matches(dataId)]
632  # Same for outputs.
633  outputs = NamedKeyDict()
634  allOutputsPresent = True
635  for datasetType, scaffolding in task.outputs.items():
636  outputs[datasetType] = []
637  for ref, dataId in zip(scaffolding.refs, scaffolding.dataIds):
638  if quantumDataId.matches(dataId):
639  if ref.id is None:
640  allOutputsPresent = False
641  else:
642  assert skipExisting, "Existing outputs should have already been identified."
643  if not allOutputsPresent:
644  raise OutputExistsError(f"Output {datasetType.name} with data ID "
645  f"{dataId} already exists, but other outputs "
646  f"for task with label {task.taskDef.label} "
647  f"and data ID {quantumDataId} do not.")
648  outputs[datasetType].append(ref)
649  if allOutputsPresent and skipExisting:
650  continue
651 
652  # Look up prerequisite datasets in the input collection(s).
653  # These may have dimensions that extend beyond those we queried
654  # for originally, because we want to permit those data ID
655  # values to differ across quanta and dataset types.
656  # For example, the same quantum may have a flat and bias with
657  # a different calibration_label, or a refcat with a skypix
658  # value that overlaps the quantum's data ID's region, but not
659  # the user expression used for the initial query.
660  for datasetType, scaffolding in task.prerequisites.items():
661  refs = list(
662  registry.queryDatasets(
663  datasetType,
664  collections=inputCollections[datasetType.name],
665  dataId=quantumDataId,
666  deduplicate=True,
667  expand=True,
668  )
669  )
670  inputs[datasetType] = refs
671  task.addQuantum(
672  Quantum(
673  taskName=task.taskDef.taskName,
674  taskClass=task.taskDef.taskClass,
675  dataId=quantumDataId,
676  initInputs=task.initInputs.unpackRefs(),
677  predictedInputs=inputs,
678  outputs=outputs,
679  )
680  )
681 
682  def makeQuantumGraph(self):
683  """Create a `QuantumGraph` from the quanta already present in
684  the scaffolding data structure.
685  """
686  graph = QuantumGraph(task.makeQuantumGraphTaskNodes() for task in self.tasks)
687  graph.initInputs = self.initInputs.unpackRefs()
688  graph.initOutputs = self.initOutputs.unpackRefs()
689  graph.initIntermediates = self.initIntermediates.unpackRefs()
690  return graph
691 
692 
693 # ------------------------
694 # Exported definitions --
695 # ------------------------
696 
697 
698 class GraphBuilderError(Exception):
699  """Base class for exceptions generated by graph builder.
700  """
701  pass
702 
703 
704 class OutputExistsError(GraphBuilderError):
705  """Exception generated when output datasets already exist.
706  """
707  pass
708 
709 
711  """Exception generated when a prerequisite dataset does not exist.
712  """
713  pass
714 
715 
717  """GraphBuilder class is responsible for building task execution graph from
718  a Pipeline.
719 
720  Parameters
721  ----------
722  registry : `~lsst.daf.butler.Registry`
723  Data butler instance.
724  skipExisting : `bool`, optional
725  If `True` (default), a Quantum is not created if all its outputs
726  already exist.
727  clobberExisting : `bool`, optional
728  If `True`, overwrite any outputs that already exist. Cannot be
729  `True` if ``skipExisting`` is.
730  """
731 
732  def __init__(self, registry, skipExisting=True, clobberExisting=False):
733  self.registry = registry
734  self.dimensions = registry.dimensions
735  self.skipExisting = skipExisting
736  self.clobberExisting = clobberExisting
737 
738  def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery):
739  """Create execution graph for a pipeline.
740 
741  Parameters
742  ----------
743  pipeline : `Pipeline`
744  Pipeline definition, task names/classes and their configs.
745  inputCollections : `~collections.abc.Mapping`
746  Mapping from dataset type name to an ordered sequence of
747  collections to search for that dataset. A `defaultdict` is
748  recommended for the case where the same collections should be
749  used for most datasets.
750  outputCollection : `str`
751  Collection for all output datasets.
752  userQuery : `str`
753  String which defunes user-defined selection for registry, should be
754  empty or `None` if there is no restrictions on data selection.
755 
756  Returns
757  -------
758  graph : `QuantumGraph`
759 
760  Raises
761  ------
762  UserExpressionError
763  Raised when user expression cannot be parsed.
764  OutputExistsError
765  Raised when output datasets already exist.
766  Exception
767  Other exceptions types may be raised by underlying registry
768  classes.
769  """
770  scaffolding = _PipelineScaffolding(pipeline, registry=self.registry)
771 
772  scaffolding.fillDataIds(self.registry, inputCollections, userQuery)
773  scaffolding.fillDatasetRefs(self.registry, inputCollections, outputCollection,
774  skipExisting=self.skipExisting,
775  clobberExisting=self.clobberExisting)
776  scaffolding.fillQuanta(self.registry, inputCollections,
777  skipExisting=self.skipExisting)
778 
779  return scaffolding.makeQuantumGraph()
def fillDatasetRefs(self, registry, inputCollections, outputCollection, skipExisting=True, clobberExisting=False)
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:902
def __init__(self, registry, skipExisting=True, clobberExisting=False)
def fillDataIds(self, registry, inputCollections, userQuery)
def makeGraph(self, pipeline, inputCollections, outputCollection, userQuery)
def fillQuanta(self, registry, inputCollections, skipExisting=True)
daf::base::PropertyList * list
Definition: fits.cc:903