LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining GraphBuilder class and related methods.
24 """
25 
26 __all__ = ['GraphBuilder']
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 import itertools
32 from collections import ChainMap
33 from contextlib import contextmanager
34 from dataclasses import dataclass
35 from typing import Dict, Iterable, Iterator, List, Set
36 import logging
37 
38 
39 # -----------------------------
40 # Imports for other modules --
41 # -----------------------------
42 from .connections import iterConnections
43 from .pipeline import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
44 from .graph import QuantumGraph
45 from lsst.daf.butler import (
46  DataCoordinate,
47  DatasetRef,
48  DatasetType,
49  DimensionGraph,
50  DimensionUniverse,
51  NamedKeyDict,
52  Quantum,
53 )
54 from lsst.utils import doImport
55 
56 # ----------------------------------
57 # Local non-exported definitions --
58 # ----------------------------------
59 
60 _LOG = logging.getLogger(__name__.partition(".")[2])
61 
62 
63 class _DatasetDict(NamedKeyDict[DatasetType, Dict[DataCoordinate, DatasetRef]]):
64  """A custom dictionary that maps `DatasetType` to a nested dictionary of
65  the known `DatasetRef` instances of that type.
66 
67  Parameters
68  ----------
69  args
70  Positional arguments are forwarded to the `dict` constructor.
71  universe : `DimensionUniverse`
72  Universe of all possible dimensions.
73  """
74  def __init__(self, *args, universe: DimensionGraph):
75  super().__init__(*args)
76  self.universeuniverse = universe
77 
78  @classmethod
79  def fromDatasetTypes(cls, datasetTypes: Iterable[DatasetType], *,
80  universe: DimensionUniverse) -> _DatasetDict:
81  """Construct a dictionary from a flat iterable of `DatasetType` keys.
82 
83  Parameters
84  ----------
85  datasetTypes : `iterable` of `DatasetType`
86  DatasetTypes to use as keys for the dict. Values will be empty
87  dictionaries.
88  universe : `DimensionUniverse`
89  Universe of all possible dimensions.
90 
91  Returns
92  -------
93  dictionary : `_DatasetDict`
94  A new `_DatasetDict` instance.
95  """
96  return cls({datasetType: {} for datasetType in datasetTypes}, universe=universe)
97 
98  @classmethod
99  def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetDict, *rest: _DatasetDict
100  ) -> _DatasetDict:
101  """Return a new dictionary by extracting items corresponding to the
102  given keys from one or more existing dictionaries.
103 
104  Parameters
105  ----------
106  datasetTypes : `iterable` of `DatasetType`
107  DatasetTypes to use as keys for the dict. Values will be obtained
108  by lookups against ``first`` and ``rest``.
109  first : `_DatasetDict`
110  Another dictionary from which to extract values.
111  rest
112  Additional dictionaries from which to extract values.
113 
114  Returns
115  -------
116  dictionary : `_DatasetDict`
117  A new dictionary instance.
118  """
119  combined = ChainMap(first, *rest)
120  return cls({datasetType: combined[datasetType] for datasetType in datasetTypes},
121  universe=first.universe)
122 
123  @property
124  def dimensions(self) -> DimensionGraph:
125  """The union of all dimensions used by all dataset types in this
126  dictionary, including implied dependencies (`DimensionGraph`).
127  """
128  base = self.universeuniverse.empty
129  if len(self) == 0:
130  return base
131  return base.union(*[datasetType.dimensions for datasetType in self.keys()])
132 
133  def unpackSingleRefs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
134  """Unpack nested single-element `DatasetRef` dicts into a new
135  mapping with `DatasetType` keys and `DatasetRef` values.
136 
137  This method assumes that each nest contains exactly one item, as is the
138  case for all "init" datasets.
139 
140  Returns
141  -------
142  dictionary : `NamedKeyDict`
143  Dictionary mapping `DatasetType` to `DatasetRef`, with both
144  `DatasetType` instances and string names usable as keys.
145  """
146  def getOne(refs: Dict[DataCoordinate, DatasetRef]) -> DatasetRef:
147  ref, = refs.values()
148  return ref
149  return NamedKeyDict({datasetType: getOne(refs) for datasetType, refs in self.items()})
150 
151  def unpackMultiRefs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
152  """Unpack nested multi-element `DatasetRef` dicts into a new
153  mapping with `DatasetType` keys and `set` of `DatasetRef` values.
154 
155  Returns
156  -------
157  dictionary : `NamedKeyDict`
158  Dictionary mapping `DatasetType` to `DatasetRef`, with both
159  `DatasetType` instances and string names usable as keys.
160  """
161  return NamedKeyDict({datasetType: list(refs.values()) for datasetType, refs in self.items()})
162 
163  def extract(self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
164  ) -> Iterator[DatasetRef]:
165  """Iterate over the contained `DatasetRef` instances that match the
166  given `DatasetType` and data IDs.
167 
168  Parameters
169  ----------
170  datasetType : `DatasetType`
171  Dataset type to match.
172  dataIds : `Iterable` [ `DataCoordinate` ]
173  Data IDs to match.
174 
175  Returns
176  -------
177  refs : `Iterator` [ `DatasetRef` ]
178  DatasetRef instances for which ``ref.datasetType == datasetType``
179  and ``ref.dataId`` is in ``dataIds``.
180  """
181  refs = self[datasetType]
182  return (refs[dataId] for dataId in dataIds)
183 
184 
186  """Helper class aggregating information about a `Quantum`, used when
187  constructing a `QuantumGraph`.
188 
189  See `_PipelineScaffolding` for a top-down description of the full
190  scaffolding data structure.
191 
192  Parameters
193  ----------
194  task : _TaskScaffolding
195  Back-reference to the helper object for the `PipelineTask` this quantum
196  represents an execution of.
197  dataId : `DataCoordinate`
198  Data ID for this quantum.
199  """
200  def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate):
201  self.tasktask = task
202  self.dataIddataId = dataId
203  self.inputsinputs = _DatasetDict.fromDatasetTypes(task.inputs.keys(), universe=dataId.universe)
204  self.outputsoutputs = _DatasetDict.fromDatasetTypes(task.outputs.keys(), universe=dataId.universe)
205  self.prerequisitesprerequisites = _DatasetDict.fromDatasetTypes(task.prerequisites.keys(),
206  universe=dataId.universe)
207 
208  __slots__ = ("task", "dataId", "inputs", "outputs", "prerequisites")
209 
210  def __repr__(self):
211  return f"_QuantumScaffolding(taskDef={self.task.taskDef}, dataId={self.dataId}, ...)"
212 
213  task: _TaskScaffolding
214  """Back-reference to the helper object for the `PipelineTask` this quantum
215  represents an execution of.
216  """
217 
218  dataId: DataCoordinate
219  """Data ID for this quantum.
220  """
221 
222  inputs: _DatasetDict
223  """Nested dictionary containing `DatasetRef` inputs to this quantum.
224 
225  This is initialized to map each `DatasetType` to an empty dictionary at
226  construction. Those nested dictionaries are populated (with data IDs as
227  keys) with unresolved `DatasetRef` instances in
228  `_PipelineScaffolding.connectDataIds`.
229  """
230 
231  outputs: _DatasetDict
232  """Nested dictionary containing `DatasetRef` outputs this quantum.
233  """
234 
235  prerequisites: _DatasetDict
236  """Nested dictionary containing `DatasetRef` prerequisite inputs to this
237  quantum.
238  """
239 
240  def makeQuantum(self) -> Quantum:
241  """Transform the scaffolding object into a true `Quantum` instance.
242 
243  Returns
244  -------
245  quantum : `Quantum`
246  An actual `Quantum` instance.
247  """
248  allInputs = self.inputsinputs.unpackMultiRefs()
249  allInputs.update(self.prerequisitesprerequisites.unpackMultiRefs())
250  # Give the task's Connections class an opportunity to remove some
251  # inputs, or complain if they are unacceptable.
252  # This will raise if one of the check conditions is not met, which is
253  # the intended behavior
254  allInputs = self.tasktask.taskDef.connections.adjustQuantum(allInputs)
255  return Quantum(
256  taskName=self.tasktask.taskDef.taskName,
257  taskClass=self.tasktask.taskDef.taskClass,
258  dataId=self.dataIddataId,
259  initInputs=self.tasktask.initInputs.unpackSingleRefs(),
260  inputs=allInputs,
261  outputs=self.outputsoutputs.unpackMultiRefs(),
262  )
263 
264 
265 @dataclass
267  """Helper class aggregating information about a `PipelineTask`, used when
268  constructing a `QuantumGraph`.
269 
270  See `_PipelineScaffolding` for a top-down description of the full
271  scaffolding data structure.
272 
273  Parameters
274  ----------
275  taskDef : `TaskDef`
276  Data structure that identifies the task class and its config.
277  parent : `_PipelineScaffolding`
278  The parent data structure that will hold the instance being
279  constructed.
280  datasetTypes : `TaskDatasetTypes`
281  Data structure that categorizes the dataset types used by this task.
282  """
283  def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
284  universe = parent.dimensions.universe
285  self.taskDeftaskDef = taskDef
286  self.dimensionsdimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
287  assert self.dimensionsdimensions.issubset(parent.dimensions)
288  # Initialize _DatasetDicts as subsets of the one or two
289  # corresponding dicts in the parent _PipelineScaffolding.
290  self.initInputsinitInputs = _DatasetDict.fromSubset(datasetTypes.initInputs, parent.initInputs,
291  parent.initIntermediates)
292  self.initOutputsinitOutputs = _DatasetDict.fromSubset(datasetTypes.initOutputs, parent.initIntermediates,
293  parent.initOutputs)
294  self.inputsinputs = _DatasetDict.fromSubset(datasetTypes.inputs, parent.inputs, parent.intermediates)
295  self.outputsoutputs = _DatasetDict.fromSubset(datasetTypes.outputs, parent.intermediates, parent.outputs)
296  self.prerequisitesprerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
297  self.dataIdsdataIds = set()
298  self.quantaquanta = {}
299 
300  def __repr__(self):
301  # Default dataclass-injected __repr__ gets caught in an infinite loop
302  # because of back-references.
303  return f"_TaskScaffolding(taskDef={self.taskDef}, ...)"
304 
305  taskDef: TaskDef
306  """Data structure that identifies the task class and its config
307  (`TaskDef`).
308  """
309 
310  dimensions: DimensionGraph
311  """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
312  """
313 
314  initInputs: _DatasetDict
315  """Dictionary containing information about datasets used to construct this
316  task (`_DatasetDict`).
317  """
318 
319  initOutputs: _DatasetDict
320  """Dictionary containing information about datasets produced as a
321  side-effect of constructing this task (`_DatasetDict`).
322  """
323 
324  inputs: _DatasetDict
325  """Dictionary containing information about datasets used as regular,
326  graph-constraining inputs to this task (`_DatasetDict`).
327  """
328 
329  outputs: _DatasetDict
330  """Dictionary containing information about datasets produced by this task
331  (`_DatasetDict`).
332  """
333 
334  prerequisites: _DatasetDict
335  """Dictionary containing information about input datasets that must be
336  present in the repository before any Pipeline containing this task is run
337  (`_DatasetDict`).
338  """
339 
340  quanta: Dict[DataCoordinate, _QuantumScaffolding]
341  """Dictionary mapping data ID to a scaffolding object for the Quantum of
342  this task with that data ID.
343  """
344 
345  def makeQuantumSet(self) -> Set[Quantum]:
346  """Create a `set` of `Quantum` from the information in ``self``.
347 
348  Returns
349  -------
350  nodes : `set` of `Quantum
351  The `Quantum` elements corresponding to this task.
352  """
353  return set(q.makeQuantum() for q in self.quantaquanta.values())
354 
355 
356 @dataclass
358  """A helper data structure that organizes the information involved in
359  constructing a `QuantumGraph` for a `Pipeline`.
360 
361  Parameters
362  ----------
363  pipeline : `Pipeline`
364  Sequence of tasks from which a graph is to be constructed. Must
365  have nested task classes already imported.
366  universe : `DimensionUniverse`
367  Universe of all possible dimensions.
368 
369  Notes
370  -----
371  The scaffolding data structure contains nested data structures for both
372  tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`). The dataset
373  data structures are shared between the pipeline-level structure (which
374  aggregates all datasets and categorizes them from the perspective of the
375  complete pipeline) and the individual tasks that use them as inputs and
376  outputs.
377 
378  `QuantumGraph` construction proceeds in four steps, with each corresponding
379  to a different `_PipelineScaffolding` method:
380 
381  1. When `_PipelineScaffolding` is constructed, we extract and categorize
382  the DatasetTypes used by the pipeline (delegating to
383  `PipelineDatasetTypes.fromPipeline`), then use these to construct the
384  nested `_TaskScaffolding` and `_DatasetDict` objects.
385 
386  2. In `connectDataIds`, we construct and run the "Big Join Query", which
387  returns related tuples of all dimensions used to identify any regular
388  input, output, and intermediate datasets (not prerequisites). We then
389  iterate over these tuples of related dimensions, identifying the subsets
390  that correspond to distinct data IDs for each task and dataset type,
391  and then create `_QuantumScaffolding` objects.
392 
393  3. In `resolveDatasetRefs`, we run follow-up queries against all of the
394  dataset data IDs previously identified, transforming unresolved
395  DatasetRefs into resolved DatasetRefs where appropriate. We then look
396  up prerequisite datasets for all quanta.
397 
398  4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
399  per-task `_QuantumScaffolding` objects.
400  """
401  def __init__(self, pipeline, *, registry):
402  _LOG.debug("Initializing data structures for QuantumGraph generation.")
403  self.taskstasks = []
404  # Aggregate and categorize the DatasetTypes in the Pipeline.
405  datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
406  # Construct dictionaries that map those DatasetTypes to structures
407  # that will (later) hold addiitonal information about them.
408  for attr in ("initInputs", "initIntermediates", "initOutputs",
409  "inputs", "intermediates", "outputs", "prerequisites"):
410  setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
411  universe=registry.dimensions))
412  # Aggregate all dimensions for all non-init, non-prerequisite
413  # DatasetTypes. These are the ones we'll include in the big join
414  # query.
415  self.dimensionsdimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
416  self.outputs.dimensions)
417  # Construct scaffolding nodes for each Task, and add backreferences
418  # to the Task from each DatasetScaffolding node.
419  # Note that there's only one scaffolding node for each DatasetType,
420  # shared by _PipelineScaffolding and all _TaskScaffoldings that
421  # reference it.
422  if isinstance(pipeline, Pipeline):
423  pipeline = pipeline.toExpandedPipeline()
424  self.taskstasks = [_TaskScaffolding(taskDef=taskDef, parent=self, datasetTypes=taskDatasetTypes)
425  for taskDef, taskDatasetTypes in zip(pipeline,
426  datasetTypes.byTask.values())]
427 
428  def __repr__(self):
429  # Default dataclass-injected __repr__ gets caught in an infinite loop
430  # because of back-references.
431  return f"_PipelineScaffolding(tasks={self.tasks}, ...)"
432 
433  tasks: List[_TaskScaffolding]
434  """Scaffolding data structures for each task in the pipeline
435  (`list` of `_TaskScaffolding`).
436  """
437 
438  initInputs: _DatasetDict
439  """Datasets consumed but not produced when constructing the tasks in this
440  pipeline (`_DatasetDict`).
441  """
442 
443  initIntermediates: _DatasetDict
444  """Datasets that are both consumed and produced when constructing the tasks
445  in this pipeline (`_DatasetDict`).
446  """
447 
448  initOutputs: _DatasetDict
449  """Datasets produced but not consumed when constructing the tasks in this
450  pipeline (`_DatasetDict`).
451  """
452 
453  inputs: _DatasetDict
454  """Datasets that are consumed but not produced when running this pipeline
455  (`_DatasetDict`).
456  """
457 
458  intermediates: _DatasetDict
459  """Datasets that are both produced and consumed when running this pipeline
460  (`_DatasetDict`).
461  """
462 
463  outputs: _DatasetDict
464  """Datasets produced but not consumed when when running this pipeline
465  (`_DatasetDict`).
466  """
467 
468  prerequisites: _DatasetDict
469  """Datasets that are consumed when running this pipeline and looked up
470  per-Quantum when generating the graph (`_DatasetDict`).
471  """
472 
473  dimensions: DimensionGraph
474  """All dimensions used by any regular input, intermediate, or output
475  (not prerequisite) dataset; the set of dimension used in the "Big Join
476  Query" (`DimensionGraph`).
477 
478  This is required to be a superset of all task quantum dimensions.
479  """
480 
481  @contextmanager
482  def connectDataIds(self, registry, collections, userQuery, externalDataId):
483  """Query for the data IDs that connect nodes in the `QuantumGraph`.
484 
485  This method populates `_TaskScaffolding.dataIds` and
486  `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
487 
488  Parameters
489  ----------
490  registry : `lsst.daf.butler.Registry`
491  Registry for the data repository; used for all data ID queries.
492  collections
493  Expressions representing the collections to search for input
494  datasets. May be any of the types accepted by
495  `lsst.daf.butler.CollectionSearch.fromExpression`.
496  userQuery : `str` or `None`
497  User-provided expression to limit the data IDs processed.
498  externalDataId : `DataCoordinate`
499  Externally-provided data ID that should be used to restrict the
500  results, just as if these constraints had been included via ``AND``
501  in ``userQuery``. This includes (at least) any instrument named
502  in the pipeline definition.
503 
504  Returns
505  -------
506  commonDataIds : \
507  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
508  An interface to a database temporary table containing all data IDs
509  that will appear in this `QuantumGraph`. Returned inside a
510  context manager, which will drop the temporary table at the end of
511  the `with` block in which this method is called.
512  """
513  _LOG.debug("Building query for data IDs.")
514  # Initialization datasets always have empty data IDs.
515  emptyDataId = DataCoordinate.makeEmpty(registry.dimensions)
516  for datasetType, refs in itertools.chain(self.initInputs.items(),
517  self.initIntermediates.items(),
518  self.initOutputs.items()):
519  refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
520  # Run one big query for the data IDs for task dimensions and regular
521  # inputs and outputs. We limit the query to only dimensions that are
522  # associated with the input dataset types, but don't (yet) try to
523  # obtain the dataset_ids for those inputs.
524  _LOG.debug("Submitting data ID query and materializing results.")
525  with registry.queryDataIds(self.dimensionsdimensions,
526  datasets=list(self.inputs),
527  collections=collections,
528  where=userQuery,
529  dataId=externalDataId,
530  ).materialize() as commonDataIds:
531  _LOG.debug("Expanding data IDs.")
532  commonDataIds = commonDataIds.expanded()
533  _LOG.debug("Iterating over query results to associate quanta with datasets.")
534  # Iterate over query results, populating data IDs for datasets and
535  # quanta and then connecting them to each other.
536  n = 0
537  for n, commonDataId in enumerate(commonDataIds):
538  # Create DatasetRefs for all DatasetTypes from this result row,
539  # noting that we might have created some already.
540  # We remember both those that already existed and those that we
541  # create now.
542  refsForRow = {}
543  for datasetType, refs in itertools.chain(self.inputs.items(), self.intermediates.items(),
544  self.outputs.items()):
545  datasetDataId = commonDataId.subset(datasetType.dimensions)
546  ref = refs.get(datasetDataId)
547  if ref is None:
548  ref = DatasetRef(datasetType, datasetDataId)
549  refs[datasetDataId] = ref
550  refsForRow[datasetType.name] = ref
551  # Create _QuantumScaffolding objects for all tasks from this
552  # result row, noting that we might have created some already.
553  for task in self.taskstasks:
554  quantumDataId = commonDataId.subset(task.dimensions)
555  quantum = task.quanta.get(quantumDataId)
556  if quantum is None:
557  quantum = _QuantumScaffolding(task=task, dataId=quantumDataId)
558  task.quanta[quantumDataId] = quantum
559  # Whether this is a new quantum or an existing one, we can
560  # now associate the DatasetRefs for this row with it. The
561  # fact that a Quantum data ID and a dataset data ID both
562  # came from the same result row is what tells us they
563  # should be associated.
564  # Many of these associates will be duplicates (because
565  # another query row that differed from this one only in
566  # irrelevant dimensions already added them), and we use
567  # sets to skip.
568  for datasetType in task.inputs:
569  ref = refsForRow[datasetType.name]
570  quantum.inputs[datasetType.name][ref.dataId] = ref
571  for datasetType in task.outputs:
572  ref = refsForRow[datasetType.name]
573  quantum.outputs[datasetType.name][ref.dataId] = ref
574  _LOG.debug("Finished processing %d rows from data ID query.", n)
575  yield commonDataIds
576 
577  def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *, skipExisting=True):
578  """Perform follow up queries for each dataset data ID produced in
579  `fillDataIds`.
580 
581  This method populates `_DatasetScaffolding.refs` (except for those in
582  `prerequisites`).
583 
584  Parameters
585  ----------
586  registry : `lsst.daf.butler.Registry`
587  Registry for the data repository; used for all data ID queries.
588  collections
589  Expressions representing the collections to search for input
590  datasets. May be any of the types accepted by
591  `lsst.daf.butler.CollectionSearch.fromExpression`.
592  run : `str`, optional
593  Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
594  output datasets, if it already exists.
595  commonDataIds : \
596  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
597  Result of a previous call to `connectDataIds`.
598  skipExisting : `bool`, optional
599  If `True` (default), a Quantum is not created if all its outputs
600  already exist in ``run``. Ignored if ``run`` is `None`.
601 
602  Raises
603  ------
604  OutputExistsError
605  Raised if an output dataset already exists in the output run
606  and ``skipExisting`` is `False`. The case where some but not all
607  of a quantum's outputs are present and ``skipExisting`` is `True`
608  cannot be identified at this stage, and is handled by `fillQuanta`
609  instead.
610  """
611  # Look up [init] intermediate and output datasets in the output
612  # collection, if there is an output collection.
613  if run is not None:
614  for datasetType, refs in itertools.chain(self.initIntermediates.items(),
615  self.initOutputs.items(),
616  self.intermediates.items(),
617  self.outputs.items()):
618  _LOG.debug("Resolving %d datasets for intermediate and/or output dataset %s.",
619  len(refs), datasetType.name)
620  isInit = datasetType in self.initIntermediates or datasetType in self.initOutputs
621  resolvedRefQueryResults = commonDataIds.subset(
622  datasetType.dimensions,
623  unique=True
624  ).findDatasets(
625  datasetType,
626  collections=run,
627  findFirst=True
628  )
629  for resolvedRef in resolvedRefQueryResults:
630  # TODO: we could easily support per-DatasetType
631  # skipExisting and I could imagine that being useful - it's
632  # probably required in order to support writing initOutputs
633  # before QuantumGraph generation.
634  assert resolvedRef.dataId in refs
635  if skipExisting or isInit:
636  refs[resolvedRef.dataId] = resolvedRef
637  else:
638  raise OutputExistsError(f"Output dataset {datasetType.name} already exists in "
639  f"output RUN collection '{run}' with data ID"
640  f" {resolvedRef.dataId}.")
641  # Look up input and initInput datasets in the input collection(s).
642  for datasetType, refs in itertools.chain(self.initInputs.items(), self.inputs.items()):
643  _LOG.debug("Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
644  resolvedRefQueryResults = commonDataIds.subset(
645  datasetType.dimensions,
646  unique=True
647  ).findDatasets(
648  datasetType,
649  collections=collections,
650  findFirst=True
651  )
652  dataIdsNotFoundYet = set(refs.keys())
653  for resolvedRef in resolvedRefQueryResults:
654  dataIdsNotFoundYet.discard(resolvedRef.dataId)
655  refs[resolvedRef.dataId] = resolvedRef
656  if dataIdsNotFoundYet:
657  raise RuntimeError(
658  f"{len(dataIdsNotFoundYet)} dataset(s) of type "
659  f"'{datasetType.name}' was/were present in a previous "
660  f"query, but could not be found now."
661  f"This is either a logic bug in QuantumGraph generation "
662  f"or the input collections have been modified since "
663  f"QuantumGraph generation began."
664  )
665  # Copy the resolved DatasetRefs to the _QuantumScaffolding objects,
666  # replacing the unresolved refs there, and then look up prerequisites.
667  for task in self.taskstasks:
668  _LOG.debug(
669  "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
670  len(task.quanta),
671  task.taskDef.label
672  )
673  lookupFunctions = {
674  c.name: c.lookupFunction
675  for c in iterConnections(task.taskDef.connections, "prerequisiteInputs")
676  if c.lookupFunction is not None
677  }
678  dataIdsToSkip = []
679  for quantum in task.quanta.values():
680  # Process outputs datasets only if there is a run to look for
681  # outputs in and skipExisting is True. Note that if
682  # skipExisting is False, any output datasets that already exist
683  # would have already caused an exception to be raised.
684  # We never update the DatasetRefs in the quantum because those
685  # should never be resolved.
686  if run is not None and skipExisting:
687  resolvedRefs = []
688  unresolvedRefs = []
689  for datasetType, originalRefs in quantum.outputs.items():
690  for ref in task.outputs.extract(datasetType, originalRefs.keys()):
691  if ref.id is not None:
692  resolvedRefs.append(ref)
693  else:
694  unresolvedRefs.append(ref)
695  if resolvedRefs:
696  if unresolvedRefs:
697  raise OutputExistsError(
698  f"Quantum {quantum.dataId} of task with label "
699  f"'{quantum.task.taskDef.label}' has some outputs that exist "
700  f"({resolvedRefs}) "
701  f"and others that don't ({unresolvedRefs})."
702  )
703  else:
704  # All outputs are already present; skip this
705  # quantum and continue to the next.
706  dataIdsToSkip.append(quantum.dataId)
707  continue
708  # Update the input DatasetRefs to the resolved ones we already
709  # searched for.
710  for datasetType, refs in quantum.inputs.items():
711  for ref in task.inputs.extract(datasetType, refs.keys()):
712  refs[ref.dataId] = ref
713  # Look up prerequisite datasets in the input collection(s).
714  # These may have dimensions that extend beyond those we queried
715  # for originally, because we want to permit those data ID
716  # values to differ across quanta and dataset types.
717  for datasetType in task.prerequisites:
718  lookupFunction = lookupFunctions.get(datasetType.name)
719  if lookupFunction is not None:
720  # PipelineTask has provided its own function to do the
721  # lookup. This always takes precedence.
722  refs = list(
723  lookupFunction(datasetType, registry, quantum.dataId, collections)
724  )
725  elif (datasetType.isCalibration()
726  and datasetType.dimensions <= quantum.dataId.graph
727  and quantum.dataId.graph.temporal):
728  # This is a master calibration lookup, which we have to
729  # handle specially because the query system can't do a
730  # temporal join on a non-dimension-based timespan yet.
731  timespan = quantum.dataId.timespan
732  try:
733  refs = [registry.findDataset(datasetType, quantum.dataId,
734  collections=collections,
735  timespan=timespan)]
736  except KeyError:
737  # This dataset type is not present in the registry,
738  # which just means there are no datasets here.
739  refs = []
740  else:
741  # Most general case.
742  refs = list(registry.queryDatasets(datasetType,
743  collections=collections,
744  dataId=quantum.dataId,
745  findFirst=True).expanded())
746  quantum.prerequisites[datasetType].update({ref.dataId: ref for ref in refs
747  if ref is not None})
748  # Actually remove any quanta that we decided to skip above.
749  if dataIdsToSkip:
750  _LOG.debug("Pruning %d quanta for task with label '%s' because all of their outputs exist.",
751  len(dataIdsToSkip), task.taskDef.label)
752  for dataId in dataIdsToSkip:
753  del task.quanta[dataId]
754 
755  def makeQuantumGraph(self):
756  """Create a `QuantumGraph` from the quanta already present in
757  the scaffolding data structure.
758 
759  Returns
760  -------
761  graph : `QuantumGraph`
762  The full `QuantumGraph`.
763  """
764  graph = QuantumGraph({task.taskDef: task.makeQuantumSet() for task in self.taskstasks})
765  return graph
766 
767 
768 # ------------------------
769 # Exported definitions --
770 # ------------------------
771 
772 
773 class GraphBuilderError(Exception):
774  """Base class for exceptions generated by graph builder.
775  """
776  pass
777 
778 
779 class OutputExistsError(GraphBuilderError):
780  """Exception generated when output datasets already exist.
781  """
782  pass
783 
784 
786  """Exception generated when a prerequisite dataset does not exist.
787  """
788  pass
789 
790 
792  """GraphBuilder class is responsible for building task execution graph from
793  a Pipeline.
794 
795  Parameters
796  ----------
797  registry : `~lsst.daf.butler.Registry`
798  Data butler instance.
799  skipExisting : `bool`, optional
800  If `True` (default), a Quantum is not created if all its outputs
801  already exist.
802  """
803 
804  def __init__(self, registry, skipExisting=True):
805  self.registryregistry = registry
806  self.dimensionsdimensions = registry.dimensions
807  self.skipExistingskipExisting = skipExisting
808 
809  def makeGraph(self, pipeline, collections, run, userQuery):
810  """Create execution graph for a pipeline.
811 
812  Parameters
813  ----------
814  pipeline : `Pipeline`
815  Pipeline definition, task names/classes and their configs.
816  collections
817  Expressions representing the collections to search for input
818  datasets. May be any of the types accepted by
819  `lsst.daf.butler.CollectionSearch.fromExpression`.
820  run : `str`, optional
821  Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
822  output datasets, if it already exists.
823  userQuery : `str`
824  String which defines user-defined selection for registry, should be
825  empty or `None` if there is no restrictions on data selection.
826 
827  Returns
828  -------
829  graph : `QuantumGraph`
830 
831  Raises
832  ------
833  UserExpressionError
834  Raised when user expression cannot be parsed.
835  OutputExistsError
836  Raised when output datasets already exist.
837  Exception
838  Other exceptions types may be raised by underlying registry
839  classes.
840  """
841  scaffolding = _PipelineScaffolding(pipeline, registry=self.registryregistry)
842  if not collections and (scaffolding.initInputs or scaffolding.inputs or scaffolding.prerequisites):
843  raise ValueError("Pipeline requires input datasets but no input collections provided.")
844  instrument = pipeline.getInstrument()
845  if isinstance(instrument, str):
846  instrument = doImport(instrument)
847  if instrument is not None:
848  dataId = DataCoordinate.standardize(instrument=instrument.getName(),
849  universe=self.registryregistry.dimensions)
850  else:
851  dataId = DataCoordinate.makeEmpty(self.registryregistry.dimensions)
852  with scaffolding.connectDataIds(self.registryregistry, collections, userQuery, dataId) as commonDataIds:
853  scaffolding.resolveDatasetRefs(self.registryregistry, collections, run, commonDataIds,
854  skipExisting=self.skipExistingskipExisting)
855  return scaffolding.makeQuantumGraph()
std::vector< SchemaItem< Flag > > * items
def __init__(self, *args, DimensionGraph universe)
Definition: graphBuilder.py:74
_DatasetDict fromDatasetTypes(cls, Iterable[DatasetType] datasetTypes, *DimensionUniverse universe)
Definition: graphBuilder.py:80
NamedKeyDict[DatasetType, DatasetRef] unpackSingleRefs(self)
NamedKeyDict[DatasetType, DatasetRef] unpackMultiRefs(self)
Iterator[DatasetRef] extract(self, DatasetType datasetType, Iterable[DataCoordinate] dataIds)
_DatasetDict fromSubset(cls, Iterable[DatasetType] datasetTypes, _DatasetDict first, *_DatasetDict rest)
def __init__(self, pipeline, *registry)
def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *skipExisting=True)
def connectDataIds(self, registry, collections, userQuery, externalDataId)
def __init__(self, _TaskScaffolding task, DataCoordinate dataId)
def __init__(self, TaskDef taskDef, _PipelineScaffolding parent, TaskDatasetTypes datasetTypes)
def makeGraph(self, pipeline, collections, run, userQuery)
def __init__(self, registry, skipExisting=True)
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
typing.Generator[BaseConnection, None, None] iterConnections(PipelineTaskConnections connections, Union[str, Iterable[str]] connectionType)
Definition: connections.py:503