LSSTApplications  19.0.0-14-gb0260a2+72efe9b372,20.0.0+7927753e06,20.0.0+8829bf0056,20.0.0+995114c5d2,20.0.0+b6f4b2abd1,20.0.0+bddc4f4cbe,20.0.0-1-g253301a+8829bf0056,20.0.0-1-g2b7511a+0d71a2d77f,20.0.0-1-g5b95a8c+7461dd0434,20.0.0-12-g321c96ea+23efe4bbff,20.0.0-16-gfab17e72e+fdf35455f6,20.0.0-2-g0070d88+ba3ffc8f0b,20.0.0-2-g4dae9ad+ee58a624b3,20.0.0-2-g61b8584+5d3db074ba,20.0.0-2-gb780d76+d529cf1a41,20.0.0-2-ged6426c+226a441f5f,20.0.0-2-gf072044+8829bf0056,20.0.0-2-gf1f7952+ee58a624b3,20.0.0-20-geae50cf+e37fec0aee,20.0.0-25-g3dcad98+544a109665,20.0.0-25-g5eafb0f+ee58a624b3,20.0.0-27-g64178ef+f1f297b00a,20.0.0-3-g4cc78c6+e0676b0dc8,20.0.0-3-g8f21e14+4fd2c12c9a,20.0.0-3-gbd60e8c+187b78b4b8,20.0.0-3-gbecbe05+48431fa087,20.0.0-38-ge4adf513+a12e1f8e37,20.0.0-4-g97dc21a+544a109665,20.0.0-4-gb4befbc+087873070b,20.0.0-4-gf910f65+5d3db074ba,20.0.0-5-gdfe0fee+199202a608,20.0.0-5-gfbfe500+d529cf1a41,20.0.0-6-g64f541c+d529cf1a41,20.0.0-6-g9a5b7a1+a1cd37312e,20.0.0-68-ga3f3dda+5fca18c6a4,20.0.0-9-g4aef684+e18322736b,w.2020.45
LSSTDataManagementBasePackage
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining GraphBuilder class and related methods.
24 """
25 
26 __all__ = ['GraphBuilder']
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 import itertools
32 from collections import ChainMap
33 from contextlib import contextmanager
34 from dataclasses import dataclass
35 from typing import Dict, Iterable, Iterator, List, Set
36 import logging
37 
38 
39 # -----------------------------
40 # Imports for other modules --
41 # -----------------------------
42 from .connections import iterConnections
43 from .pipeline import PipelineDatasetTypes, TaskDatasetTypes, TaskDef, Pipeline
44 from .graph import QuantumGraph
45 from lsst.daf.butler import (
46  DataCoordinate,
47  DatasetRef,
48  DatasetType,
49  DimensionGraph,
50  DimensionUniverse,
51  NamedKeyDict,
52  Quantum,
53 )
54 from lsst.daf.butler.registry.queries.exprParser import ParseError, ParserYacc, TreeVisitor
55 from lsst.utils import doImport
56 
57 # ----------------------------------
58 # Local non-exported definitions --
59 # ----------------------------------
60 
61 _LOG = logging.getLogger(__name__.partition(".")[2])
62 
63 
64 class _DatasetDict(NamedKeyDict[DatasetType, Dict[DataCoordinate, DatasetRef]]):
65  """A custom dictionary that maps `DatasetType` to a nested dictionary of
66  the known `DatasetRef` instances of that type.
67 
68  Parameters
69  ----------
70  args
71  Positional arguments are forwarded to the `dict` constructor.
72  universe : `DimensionUniverse`
73  Universe of all possible dimensions.
74  """
75  def __init__(self, *args, universe: DimensionGraph):
76  super().__init__(*args)
77  self.universe = universe
78 
79  @classmethod
80  def fromDatasetTypes(cls, datasetTypes: Iterable[DatasetType], *,
81  universe: DimensionUniverse) -> _DatasetDict:
82  """Construct a dictionary from a flat iterable of `DatasetType` keys.
83 
84  Parameters
85  ----------
86  datasetTypes : `iterable` of `DatasetType`
87  DatasetTypes to use as keys for the dict. Values will be empty
88  dictionaries.
89  universe : `DimensionUniverse`
90  Universe of all possible dimensions.
91 
92  Returns
93  -------
94  dictionary : `_DatasetDict`
95  A new `_DatasetDict` instance.
96  """
97  return cls({datasetType: {} for datasetType in datasetTypes}, universe=universe)
98 
99  @classmethod
100  def fromSubset(cls, datasetTypes: Iterable[DatasetType], first: _DatasetDict, *rest: _DatasetDict
101  ) -> _DatasetDict:
102  """Return a new dictionary by extracting items corresponding to the
103  given keys from one or more existing dictionaries.
104 
105  Parameters
106  ----------
107  datasetTypes : `iterable` of `DatasetType`
108  DatasetTypes to use as keys for the dict. Values will be obtained
109  by lookups against ``first`` and ``rest``.
110  first : `_DatasetDict`
111  Another dictionary from which to extract values.
112  rest
113  Additional dictionaries from which to extract values.
114 
115  Returns
116  -------
117  dictionary : `_DatasetDict`
118  A new dictionary instance.
119  """
120  combined = ChainMap(first, *rest)
121  return cls({datasetType: combined[datasetType] for datasetType in datasetTypes},
122  universe=first.universe)
123 
124  @property
125  def dimensions(self) -> DimensionGraph:
126  """The union of all dimensions used by all dataset types in this
127  dictionary, including implied dependencies (`DimensionGraph`).
128  """
129  base = self.universe.empty
130  if len(self) == 0:
131  return base
132  return base.union(*[datasetType.dimensions for datasetType in self.keys()])
133 
134  def unpackSingleRefs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
135  """Unpack nested single-element `DatasetRef` dicts into a new
136  mapping with `DatasetType` keys and `DatasetRef` values.
137 
138  This method assumes that each nest contains exactly one item, as is the
139  case for all "init" datasets.
140 
141  Returns
142  -------
143  dictionary : `NamedKeyDict`
144  Dictionary mapping `DatasetType` to `DatasetRef`, with both
145  `DatasetType` instances and string names usable as keys.
146  """
147  def getOne(refs: Dict[DataCoordinate, DatasetRef]) -> DatasetRef:
148  ref, = refs.values()
149  return ref
150  return NamedKeyDict({datasetType: getOne(refs) for datasetType, refs in self.items()})
151 
152  def unpackMultiRefs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
153  """Unpack nested multi-element `DatasetRef` dicts into a new
154  mapping with `DatasetType` keys and `set` of `DatasetRef` values.
155 
156  Returns
157  -------
158  dictionary : `NamedKeyDict`
159  Dictionary mapping `DatasetType` to `DatasetRef`, with both
160  `DatasetType` instances and string names usable as keys.
161  """
162  return NamedKeyDict({datasetType: list(refs.values()) for datasetType, refs in self.items()})
163 
164  def extract(self, datasetType: DatasetType, dataIds: Iterable[DataCoordinate]
165  ) -> Iterator[DatasetRef]:
166  """Iterate over the contained `DatasetRef` instances that match the
167  given `DatasetType` and data IDs.
168 
169  Parameters
170  ----------
171  datasetType : `DatasetType`
172  Dataset type to match.
173  dataIds : `Iterable` [ `DataCoordinate` ]
174  Data IDs to match.
175 
176  Returns
177  -------
178  refs : `Iterator` [ `DatasetRef` ]
179  DatasetRef instances for which ``ref.datasetType == datasetType``
180  and ``ref.dataId`` is in ``dataIds``.
181  """
182  refs = self[datasetType]
183  return (refs[dataId] for dataId in dataIds)
184 
185 
187  """Helper class aggregating information about a `Quantum`, used when
188  constructing a `QuantumGraph`.
189 
190  See `_PipelineScaffolding` for a top-down description of the full
191  scaffolding data structure.
192 
193  Parameters
194  ----------
195  task : _TaskScaffolding
196  Back-reference to the helper object for the `PipelineTask` this quantum
197  represents an execution of.
198  dataId : `DataCoordinate`
199  Data ID for this quantum.
200  """
201  def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate):
202  self.task = task
203  self.dataId = dataId
204  self.inputs = _DatasetDict.fromDatasetTypes(task.inputs.keys(), universe=dataId.universe)
205  self.outputs = _DatasetDict.fromDatasetTypes(task.outputs.keys(), universe=dataId.universe)
206  self.prerequisites = _DatasetDict.fromDatasetTypes(task.prerequisites.keys(),
207  universe=dataId.universe)
208 
209  __slots__ = ("task", "dataId", "inputs", "outputs", "prerequisites")
210 
211  def __repr__(self):
212  return f"_QuantumScaffolding(taskDef={self.task.taskDef}, dataId={self.dataId}, ...)"
213 
214  task: _TaskScaffolding
215  """Back-reference to the helper object for the `PipelineTask` this quantum
216  represents an execution of.
217  """
218 
219  dataId: DataCoordinate
220  """Data ID for this quantum.
221  """
222 
223  inputs: _DatasetDict
224  """Nested dictionary containing `DatasetRef` inputs to this quantum.
225 
226  This is initialized to map each `DatasetType` to an empty dictionary at
227  construction. Those nested dictionaries are populated (with data IDs as
228  keys) with unresolved `DatasetRef` instances in
229  `_PipelineScaffolding.connectDataIds`.
230  """
231 
232  outputs: _DatasetDict
233  """Nested dictionary containing `DatasetRef` outputs this quantum.
234  """
235 
236  prerequisites: _DatasetDict
237  """Nested dictionary containing `DatasetRef` prerequisite inputs to this
238  quantum.
239  """
240 
241  def makeQuantum(self) -> Quantum:
242  """Transform the scaffolding object into a true `Quantum` instance.
243 
244  Returns
245  -------
246  quantum : `Quantum`
247  An actual `Quantum` instance.
248  """
249  allInputs = self.inputs.unpackMultiRefs()
250  allInputs.update(self.prerequisites.unpackMultiRefs())
251  # Give the task's Connections class an opportunity to remove some
252  # inputs, or complain if they are unacceptable.
253  # This will raise if one of the check conditions is not met, which is
254  # the intended behavior
255  allInputs = self.task.taskDef.connections.adjustQuantum(allInputs)
256  return Quantum(
257  taskName=self.task.taskDef.taskName,
258  taskClass=self.task.taskDef.taskClass,
259  dataId=self.dataId,
260  initInputs=self.task.initInputs.unpackSingleRefs(),
261  inputs=allInputs,
262  outputs=self.outputs.unpackMultiRefs(),
263  )
264 
265 
266 @dataclass
268  """Helper class aggregating information about a `PipelineTask`, used when
269  constructing a `QuantumGraph`.
270 
271  See `_PipelineScaffolding` for a top-down description of the full
272  scaffolding data structure.
273 
274  Parameters
275  ----------
276  taskDef : `TaskDef`
277  Data structure that identifies the task class and its config.
278  parent : `_PipelineScaffolding`
279  The parent data structure that will hold the instance being
280  constructed.
281  datasetTypes : `TaskDatasetTypes`
282  Data structure that categorizes the dataset types used by this task.
283  """
284  def __init__(self, taskDef: TaskDef, parent: _PipelineScaffolding, datasetTypes: TaskDatasetTypes):
285  universe = parent.dimensions.universe
286  self.taskDef = taskDef
287  self.dimensions = DimensionGraph(universe, names=taskDef.connections.dimensions)
288  assert self.dimensions.issubset(parent.dimensions)
289  # Initialize _DatasetDicts as subsets of the one or two
290  # corresponding dicts in the parent _PipelineScaffolding.
291  self.initInputs = _DatasetDict.fromSubset(datasetTypes.initInputs, parent.initInputs,
292  parent.initIntermediates)
293  self.initOutputs = _DatasetDict.fromSubset(datasetTypes.initOutputs, parent.initIntermediates,
294  parent.initOutputs)
295  self.inputs = _DatasetDict.fromSubset(datasetTypes.inputs, parent.inputs, parent.intermediates)
296  self.outputs = _DatasetDict.fromSubset(datasetTypes.outputs, parent.intermediates, parent.outputs)
297  self.prerequisites = _DatasetDict.fromSubset(datasetTypes.prerequisites, parent.prerequisites)
298  self.dataIds = set()
299  self.quanta = {}
300 
301  def __repr__(self):
302  # Default dataclass-injected __repr__ gets caught in an infinite loop
303  # because of back-references.
304  return f"_TaskScaffolding(taskDef={self.taskDef}, ...)"
305 
306  taskDef: TaskDef
307  """Data structure that identifies the task class and its config
308  (`TaskDef`).
309  """
310 
311  dimensions: DimensionGraph
312  """The dimensions of a single `Quantum` of this task (`DimensionGraph`).
313  """
314 
315  initInputs: _DatasetDict
316  """Dictionary containing information about datasets used to construct this
317  task (`_DatasetDict`).
318  """
319 
320  initOutputs: _DatasetDict
321  """Dictionary containing information about datasets produced as a
322  side-effect of constructing this task (`_DatasetDict`).
323  """
324 
325  inputs: _DatasetDict
326  """Dictionary containing information about datasets used as regular,
327  graph-constraining inputs to this task (`_DatasetDict`).
328  """
329 
330  outputs: _DatasetDict
331  """Dictionary containing information about datasets produced by this task
332  (`_DatasetDict`).
333  """
334 
335  prerequisites: _DatasetDict
336  """Dictionary containing information about input datasets that must be
337  present in the repository before any Pipeline containing this task is run
338  (`_DatasetDict`).
339  """
340 
341  quanta: Dict[DataCoordinate, _QuantumScaffolding]
342  """Dictionary mapping data ID to a scaffolding object for the Quantum of
343  this task with that data ID.
344  """
345 
346  def makeQuantumSet(self) -> Set[Quantum]:
347  """Create a `set` of `Quantum` from the information in ``self``.
348 
349  Returns
350  -------
351  nodes : `set` of `Quantum
352  The `Quantum` elements corresponding to this task.
353  """
354  return set(q.makeQuantum() for q in self.quanta.values())
355 
356 
357 @dataclass
359  """A helper data structure that organizes the information involved in
360  constructing a `QuantumGraph` for a `Pipeline`.
361 
362  Parameters
363  ----------
364  pipeline : `Pipeline`
365  Sequence of tasks from which a graph is to be constructed. Must
366  have nested task classes already imported.
367  universe : `DimensionUniverse`
368  Universe of all possible dimensions.
369 
370  Notes
371  -----
372  The scaffolding data structure contains nested data structures for both
373  tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`). The dataset
374  data structures are shared between the pipeline-level structure (which
375  aggregates all datasets and categorizes them from the perspective of the
376  complete pipeline) and the individual tasks that use them as inputs and
377  outputs.
378 
379  `QuantumGraph` construction proceeds in four steps, with each corresponding
380  to a different `_PipelineScaffolding` method:
381 
382  1. When `_PipelineScaffolding` is constructed, we extract and categorize
383  the DatasetTypes used by the pipeline (delegating to
384  `PipelineDatasetTypes.fromPipeline`), then use these to construct the
385  nested `_TaskScaffolding` and `_DatasetDict` objects.
386 
387  2. In `connectDataIds`, we construct and run the "Big Join Query", which
388  returns related tuples of all dimensions used to identify any regular
389  input, output, and intermediate datasets (not prerequisites). We then
390  iterate over these tuples of related dimensions, identifying the subsets
391  that correspond to distinct data IDs for each task and dataset type,
392  and then create `_QuantumScaffolding` objects.
393 
394  3. In `resolveDatasetRefs`, we run follow-up queries against all of the
395  dataset data IDs previously identified, transforming unresolved
396  DatasetRefs into resolved DatasetRefs where appropriate. We then look
397  up prerequisite datasets for all quanta.
398 
399  4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
400  per-task `_QuantumScaffolding` objects.
401  """
402  def __init__(self, pipeline, *, registry):
403  _LOG.debug("Initializing data structures for QuantumGraph generation.")
404  self.tasks = []
405  # Aggregate and categorize the DatasetTypes in the Pipeline.
406  datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
407  # Construct dictionaries that map those DatasetTypes to structures
408  # that will (later) hold addiitonal information about them.
409  for attr in ("initInputs", "initIntermediates", "initOutputs",
410  "inputs", "intermediates", "outputs", "prerequisites"):
411  setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
412  universe=registry.dimensions))
413  # Aggregate all dimensions for all non-init, non-prerequisite
414  # DatasetTypes. These are the ones we'll include in the big join
415  # query.
416  self.dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
417  self.outputs.dimensions)
418  # Construct scaffolding nodes for each Task, and add backreferences
419  # to the Task from each DatasetScaffolding node.
420  # Note that there's only one scaffolding node for each DatasetType,
421  # shared by _PipelineScaffolding and all _TaskScaffoldings that
422  # reference it.
423  if isinstance(pipeline, Pipeline):
424  pipeline = pipeline.toExpandedPipeline()
425  self.tasks = [_TaskScaffolding(taskDef=taskDef, parent=self, datasetTypes=taskDatasetTypes)
426  for taskDef, taskDatasetTypes in zip(pipeline,
427  datasetTypes.byTask.values())]
428 
429  def __repr__(self):
430  # Default dataclass-injected __repr__ gets caught in an infinite loop
431  # because of back-references.
432  return f"_PipelineScaffolding(tasks={self.tasks}, ...)"
433 
434  tasks: List[_TaskScaffolding]
435  """Scaffolding data structures for each task in the pipeline
436  (`list` of `_TaskScaffolding`).
437  """
438 
439  initInputs: _DatasetDict
440  """Datasets consumed but not produced when constructing the tasks in this
441  pipeline (`_DatasetDict`).
442  """
443 
444  initIntermediates: _DatasetDict
445  """Datasets that are both consumed and produced when constructing the tasks
446  in this pipeline (`_DatasetDict`).
447  """
448 
449  initOutputs: _DatasetDict
450  """Datasets produced but not consumed when constructing the tasks in this
451  pipeline (`_DatasetDict`).
452  """
453 
454  inputs: _DatasetDict
455  """Datasets that are consumed but not produced when running this pipeline
456  (`_DatasetDict`).
457  """
458 
459  intermediates: _DatasetDict
460  """Datasets that are both produced and consumed when running this pipeline
461  (`_DatasetDict`).
462  """
463 
464  outputs: _DatasetDict
465  """Datasets produced but not consumed when when running this pipeline
466  (`_DatasetDict`).
467  """
468 
469  prerequisites: _DatasetDict
470  """Datasets that are consumed when running this pipeline and looked up
471  per-Quantum when generating the graph (`_DatasetDict`).
472  """
473 
474  dimensions: DimensionGraph
475  """All dimensions used by any regular input, intermediate, or output
476  (not prerequisite) dataset; the set of dimension used in the "Big Join
477  Query" (`DimensionGraph`).
478 
479  This is required to be a superset of all task quantum dimensions.
480  """
481 
482  @contextmanager
483  def connectDataIds(self, registry, collections, userQuery):
484  """Query for the data IDs that connect nodes in the `QuantumGraph`.
485 
486  This method populates `_TaskScaffolding.dataIds` and
487  `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
488 
489  Parameters
490  ----------
491  registry : `lsst.daf.butler.Registry`
492  Registry for the data repository; used for all data ID queries.
493  collections
494  Expressions representing the collections to search for input
495  datasets. May be any of the types accepted by
496  `lsst.daf.butler.CollectionSearch.fromExpression`.
497  userQuery : `str`, optional
498  User-provided expression to limit the data IDs processed.
499 
500  Returns
501  -------
502  commonDataIds : \
503  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
504  An interface to a database temporary table containing all data IDs
505  that will appear in this `QuantumGraph`. Returned inside a
506  context manager, which will drop the temporary table at the end of
507  the `with` block in which this method is called.
508  """
509  _LOG.debug("Building query for data IDs.")
510  # Initialization datasets always have empty data IDs.
511  emptyDataId = DataCoordinate.makeEmpty(registry.dimensions)
512  for datasetType, refs in itertools.chain(self.initInputs.items(),
513  self.initIntermediates.items(),
514  self.initOutputs.items()):
515  refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
516  # Run one big query for the data IDs for task dimensions and regular
517  # inputs and outputs. We limit the query to only dimensions that are
518  # associated with the input dataset types, but don't (yet) try to
519  # obtain the dataset_ids for those inputs.
520  _LOG.debug("Submitting data ID query and materializing results.")
521  with registry.queryDataIds(self.dimensions,
522  datasets=list(self.inputs),
523  collections=collections,
524  where=userQuery,
525  ).materialize() as commonDataIds:
526  _LOG.debug("Expanding data IDs.")
527  commonDataIds = commonDataIds.expanded()
528  _LOG.debug("Iterating over query results to associate quanta with datasets.")
529  # Iterate over query results, populating data IDs for datasets and
530  # quanta and then connecting them to each other.
531  n = 0
532  for n, commonDataId in enumerate(commonDataIds):
533  # Create DatasetRefs for all DatasetTypes from this result row,
534  # noting that we might have created some already.
535  # We remember both those that already existed and those that we
536  # create now.
537  refsForRow = {}
538  for datasetType, refs in itertools.chain(self.inputs.items(), self.intermediates.items(),
539  self.outputs.items()):
540  datasetDataId = commonDataId.subset(datasetType.dimensions)
541  ref = refs.get(datasetDataId)
542  if ref is None:
543  ref = DatasetRef(datasetType, datasetDataId)
544  refs[datasetDataId] = ref
545  refsForRow[datasetType.name] = ref
546  # Create _QuantumScaffolding objects for all tasks from this
547  # result row, noting that we might have created some already.
548  for task in self.tasks:
549  quantumDataId = commonDataId.subset(task.dimensions)
550  quantum = task.quanta.get(quantumDataId)
551  if quantum is None:
552  quantum = _QuantumScaffolding(task=task, dataId=quantumDataId)
553  task.quanta[quantumDataId] = quantum
554  # Whether this is a new quantum or an existing one, we can
555  # now associate the DatasetRefs for this row with it. The
556  # fact that a Quantum data ID and a dataset data ID both
557  # came from the same result row is what tells us they
558  # should be associated.
559  # Many of these associates will be duplicates (because
560  # another query row that differed from this one only in
561  # irrelevant dimensions already added them), and we use
562  # sets to skip.
563  for datasetType in task.inputs:
564  ref = refsForRow[datasetType.name]
565  quantum.inputs[datasetType.name][ref.dataId] = ref
566  for datasetType in task.outputs:
567  ref = refsForRow[datasetType.name]
568  quantum.outputs[datasetType.name][ref.dataId] = ref
569  _LOG.debug("Finished processing %d rows from data ID query.", n)
570  yield commonDataIds
571 
572  def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *, skipExisting=True):
573  """Perform follow up queries for each dataset data ID produced in
574  `fillDataIds`.
575 
576  This method populates `_DatasetScaffolding.refs` (except for those in
577  `prerequisites`).
578 
579  Parameters
580  ----------
581  registry : `lsst.daf.butler.Registry`
582  Registry for the data repository; used for all data ID queries.
583  collections
584  Expressions representing the collections to search for input
585  datasets. May be any of the types accepted by
586  `lsst.daf.butler.CollectionSearch.fromExpression`.
587  run : `str`, optional
588  Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
589  output datasets, if it already exists.
590  commonDataIds : \
591  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
592  Result of a previous call to `connectDataIds`.
593  skipExisting : `bool`, optional
594  If `True` (default), a Quantum is not created if all its outputs
595  already exist in ``run``. Ignored if ``run`` is `None`.
596 
597  Raises
598  ------
599  OutputExistsError
600  Raised if an output dataset already exists in the output run
601  and ``skipExisting`` is `False`. The case where some but not all
602  of a quantum's outputs are present and ``skipExisting`` is `True`
603  cannot be identified at this stage, and is handled by `fillQuanta`
604  instead.
605  """
606  # Look up [init] intermediate and output datasets in the output
607  # collection, if there is an output collection.
608  if run is not None:
609  for datasetType, refs in itertools.chain(self.initIntermediates.items(),
610  self.initOutputs.items(),
611  self.intermediates.items(),
612  self.outputs.items()):
613  _LOG.debug("Resolving %d datasets for intermediate and/or output dataset %s.",
614  len(refs), datasetType.name)
615  isInit = datasetType in self.initIntermediates or datasetType in self.initOutputs
616  resolvedRefQueryResults = commonDataIds.subset(
617  datasetType.dimensions,
618  unique=True
619  ).findDatasets(
620  datasetType,
621  collections=run,
622  findFirst=True
623  )
624  for resolvedRef in resolvedRefQueryResults:
625  # TODO: we could easily support per-DatasetType
626  # skipExisting and I could imagine that being useful - it's
627  # probably required in order to support writing initOutputs
628  # before QuantumGraph generation.
629  assert resolvedRef.dataId in refs
630  if skipExisting or isInit:
631  refs[resolvedRef.dataId] = resolvedRef
632  else:
633  raise OutputExistsError(f"Output dataset {datasetType.name} already exists in "
634  f"output RUN collection '{run}' with data ID"
635  f" {resolvedRef.dataId}.")
636  # Look up input and initInput datasets in the input collection(s).
637  for datasetType, refs in itertools.chain(self.initInputs.items(), self.inputs.items()):
638  _LOG.debug("Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
639  resolvedRefQueryResults = commonDataIds.subset(
640  datasetType.dimensions,
641  unique=True
642  ).findDatasets(
643  datasetType,
644  collections=collections,
645  findFirst=True
646  )
647  dataIdsNotFoundYet = set(refs.keys())
648  for resolvedRef in resolvedRefQueryResults:
649  dataIdsNotFoundYet.discard(resolvedRef.dataId)
650  refs[resolvedRef.dataId] = resolvedRef
651  if dataIdsNotFoundYet:
652  raise RuntimeError(
653  f"{len(dataIdsNotFoundYet)} dataset(s) of type "
654  f"'{datasetType.name}' was/were present in a previous "
655  f"query, but could not be found now."
656  f"This is either a logic bug in QuantumGraph generation "
657  f"or the input collections have been modified since "
658  f"QuantumGraph generation began."
659  )
660  # Copy the resolved DatasetRefs to the _QuantumScaffolding objects,
661  # replacing the unresolved refs there, and then look up prerequisites.
662  for task in self.tasks:
663  _LOG.debug(
664  "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
665  len(task.quanta),
666  task.taskDef.label
667  )
668  lookupFunctions = {
669  c.name: c.lookupFunction
670  for c in iterConnections(task.taskDef.connections, "prerequisiteInputs")
671  if c.lookupFunction is not None
672  }
673  dataIdsToSkip = []
674  for quantum in task.quanta.values():
675  # Process outputs datasets only if there is a run to look for
676  # outputs in and skipExisting is True. Note that if
677  # skipExisting is False, any output datasets that already exist
678  # would have already caused an exception to be raised.
679  # We never update the DatasetRefs in the quantum because those
680  # should never be resolved.
681  if run is not None and skipExisting:
682  resolvedRefs = []
683  unresolvedRefs = []
684  for datasetType, originalRefs in quantum.outputs.items():
685  for ref in task.outputs.extract(datasetType, originalRefs.keys()):
686  if ref.id is not None:
687  resolvedRefs.append(ref)
688  else:
689  unresolvedRefs.append(ref)
690  if resolvedRefs:
691  if unresolvedRefs:
692  raise OutputExistsError(
693  f"Quantum {quantum.dataId} of task with label "
694  f"'{quantum.task.taskDef.label}' has some outputs that exist "
695  f"({resolvedRefs}) "
696  f"and others that don't ({unresolvedRefs})."
697  )
698  else:
699  # All outputs are already present; skip this
700  # quantum and continue to the next.
701  dataIdsToSkip.append(quantum.dataId)
702  continue
703  # Update the input DatasetRefs to the resolved ones we already
704  # searched for.
705  for datasetType, refs in quantum.inputs.items():
706  for ref in task.inputs.extract(datasetType, refs.keys()):
707  refs[ref.dataId] = ref
708  # Look up prerequisite datasets in the input collection(s).
709  # These may have dimensions that extend beyond those we queried
710  # for originally, because we want to permit those data ID
711  # values to differ across quanta and dataset types.
712  for datasetType in task.prerequisites:
713  lookupFunction = lookupFunctions.get(datasetType.name)
714  if lookupFunction is not None:
715  # PipelineTask has provided its own function to do the
716  # lookup. This always takes precedence.
717  refs = list(
718  lookupFunction(datasetType, registry, quantum.dataId, collections)
719  )
720  elif (datasetType.isCalibration()
721  and datasetType.dimensions <= quantum.dataId.graph
722  and quantum.dataId.graph.temporal):
723  # This is a master calibration lookup, which we have to
724  # handle specially because the query system can't do a
725  # temporal join on a non-dimension-based timespan yet.
726  timespan = quantum.dataId.timespan
727  try:
728  refs = [registry.findDataset(datasetType, quantum.dataId,
729  collections=collections,
730  timespan=timespan)]
731  except KeyError:
732  # This dataset type is not present in the registry,
733  # which just means there are no datasets here.
734  refs = []
735  else:
736  # Most general case.
737  refs = list(registry.queryDatasets(datasetType,
738  collections=collections,
739  dataId=quantum.dataId,
740  findFirst=True).expanded())
741  quantum.prerequisites[datasetType].update({ref.dataId: ref for ref in refs
742  if ref is not None})
743  # Actually remove any quanta that we decided to skip above.
744  if dataIdsToSkip:
745  _LOG.debug("Pruning %d quanta for task with label '%s' because all of their outputs exist.",
746  len(dataIdsToSkip), task.taskDef.label)
747  for dataId in dataIdsToSkip:
748  del task.quanta[dataId]
749 
750  def makeQuantumGraph(self):
751  """Create a `QuantumGraph` from the quanta already present in
752  the scaffolding data structure.
753 
754  Returns
755  -------
756  graph : `QuantumGraph`
757  The full `QuantumGraph`.
758  """
759  graph = QuantumGraph({task.taskDef: task.makeQuantumSet() for task in self.tasks})
760  return graph
761 
762 
763 class _InstrumentFinder(TreeVisitor):
764  """Implementation of TreeVisitor which looks for instrument name
765 
766  Instrument should be specified as a boolean expression
767 
768  instrument = 'string'
769  'string' = instrument
770 
771  so we only need to find a binary operator where operator is "=",
772  one side is a string literal and other side is an identifier.
773  All visit methods return tuple of (type, value), non-useful nodes
774  return None for both type and value.
775  """
776  def __init__(self):
777  self.instruments = []
778 
779  def visitNumericLiteral(self, value, node):
780  # do not care about numbers
781  return (None, None)
782 
783  def visitStringLiteral(self, value, node):
784  # return type and value
785  return ("str", value)
786 
787  def visitTimeLiteral(self, value, node):
788  # do not care about these
789  return (None, None)
790 
791  def visitRangeLiteral(self, start, stop, stride, node):
792  # do not care about these
793  return (None, None)
794 
795  def visitIdentifier(self, name, node):
796  if name.lower() == "instrument":
797  return ("id", "instrument")
798  return (None, None)
799 
800  def visitUnaryOp(self, operator, operand, node):
801  # do not care about these
802  return (None, None)
803 
804  def visitBinaryOp(self, operator, lhs, rhs, node):
805  if operator == "=":
806  if lhs == ("id", "instrument") and rhs[0] == "str":
807  self.instruments.append(rhs[1])
808  elif rhs == ("id", "instrument") and lhs[0] == "str":
809  self.instruments.append(lhs[1])
810  return (None, None)
811 
812  def visitIsIn(self, lhs, values, not_in, node):
813  # do not care about these
814  return (None, None)
815 
816  def visitParens(self, expression, node):
817  # do not care about these
818  return (None, None)
819 
820 
821 def _findInstruments(queryStr):
822  """Get the names of any instrument named in the query string by searching
823  for "instrument = <value>" and similar patterns.
824 
825  Parameters
826  ----------
827  queryStr : `str` or None
828  The query string to search, or None if there is no query.
829 
830  Returns
831  -------
832  instruments : `list` [`str`]
833  The list of instrument names found in the query.
834 
835  Raises
836  ------
837  ValueError
838  If the query expression can not be parsed.
839  """
840  if not queryStr:
841  return []
842  parser = ParserYacc()
843  finder = _InstrumentFinder()
844  try:
845  tree = parser.parse(queryStr)
846  except ParseError as exc:
847  raise ValueError(f"failed to parse query expression: {queryStr}") from exc
848  tree.visit(finder)
849  return finder.instruments
850 
851 
852 # ------------------------
853 # Exported definitions --
854 # ------------------------
855 
856 
857 class GraphBuilderError(Exception):
858  """Base class for exceptions generated by graph builder.
859  """
860  pass
861 
862 
863 class OutputExistsError(GraphBuilderError):
864  """Exception generated when output datasets already exist.
865  """
866  pass
867 
868 
870  """Exception generated when a prerequisite dataset does not exist.
871  """
872  pass
873 
874 
876  """GraphBuilder class is responsible for building task execution graph from
877  a Pipeline.
878 
879  Parameters
880  ----------
881  registry : `~lsst.daf.butler.Registry`
882  Data butler instance.
883  skipExisting : `bool`, optional
884  If `True` (default), a Quantum is not created if all its outputs
885  already exist.
886  """
887 
888  def __init__(self, registry, skipExisting=True):
889  self.registry = registry
890  self.dimensions = registry.dimensions
891  self.skipExisting = skipExisting
892 
893  def makeGraph(self, pipeline, collections, run, userQuery):
894  """Create execution graph for a pipeline.
895 
896  Parameters
897  ----------
898  pipeline : `Pipeline`
899  Pipeline definition, task names/classes and their configs.
900  collections
901  Expressions representing the collections to search for input
902  datasets. May be any of the types accepted by
903  `lsst.daf.butler.CollectionSearch.fromExpression`.
904  run : `str`, optional
905  Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
906  output datasets, if it already exists.
907  userQuery : `str`
908  String which defines user-defined selection for registry, should be
909  empty or `None` if there is no restrictions on data selection.
910 
911  Returns
912  -------
913  graph : `QuantumGraph`
914 
915  Raises
916  ------
917  UserExpressionError
918  Raised when user expression cannot be parsed.
919  OutputExistsError
920  Raised when output datasets already exist.
921  Exception
922  Other exceptions types may be raised by underlying registry
923  classes.
924  """
925  scaffolding = _PipelineScaffolding(pipeline, registry=self.registry)
926 
927  instrument = pipeline.getInstrument()
928  if isinstance(instrument, str):
929  instrument = doImport(instrument)
930  instrumentName = instrument.getName() if instrument else None
931  userQuery = self._verifyInstrumentRestriction(instrumentName, userQuery)
932 
933  with scaffolding.connectDataIds(self.registry, collections, userQuery) as commonDataIds:
934  scaffolding.resolveDatasetRefs(self.registry, collections, run, commonDataIds,
935  skipExisting=self.skipExisting)
936  return scaffolding.makeQuantumGraph()
937 
938  @staticmethod
939  def _verifyInstrumentRestriction(instrumentName, query):
940  """Add an instrument restriction to the query if it does not have one,
941  and verify that if given an instrument name that there are no other
942  instrument restrictions in the query.
943 
944  Parameters
945  ----------
946  instrumentName : `str`
947  The name of the instrument that should appear in the query.
948  query : `str`
949  The query string.
950 
951  Returns
952  -------
953  query : `str`
954  The query string with the instrument added to it if needed.
955 
956  Raises
957  ------
958  RuntimeError
959  If the pipeline names an instrument and the query contains more
960  than one instrument or the name of the instrument in the query does
961  not match the instrument named by the pipeline.
962  """
963  if not instrumentName:
964  return query
965  queryInstruments = _findInstruments(query)
966  if len(queryInstruments) > 1:
967  raise RuntimeError(f"When the pipeline has an instrument (\"{instrumentName}\") the query must "
968  "have zero instruments or one instrument that matches the pipeline. "
969  f"Found these instruments in the query: {queryInstruments}.")
970  if not queryInstruments:
971  # There is not an instrument in the query, add it:
972  restriction = f"instrument = '{instrumentName}'"
973  _LOG.debug(f"Adding restriction \"{restriction}\" to query.")
974  query = f"{restriction} AND ({query})" if query else restriction # (there may not be a query)
975  elif queryInstruments[0] != instrumentName:
976  # Since there is an instrument in the query, it should match
977  # the instrument in the pipeline.
978  raise RuntimeError(f"The instrument named in the query (\"{queryInstruments[0]}\") does not "
979  f"match the instrument named by the pipeline (\"{instrumentName}\")")
980  return query
lsst.pipe.base.graphBuilder._QuantumScaffolding.__init__
def __init__(self, _TaskScaffolding task, DataCoordinate dataId)
Definition: graphBuilder.py:201
lsst.pipe.base.graphBuilder._TaskScaffolding.dimensions
dimensions
Definition: graphBuilder.py:287
lsst.pipe.base.graphBuilder._TaskScaffolding.initInputs
initInputs
Definition: graphBuilder.py:291
lsst.pipe.base.graphBuilder._DatasetDict.fromSubset
_DatasetDict fromSubset(cls, Iterable[DatasetType] datasetTypes, _DatasetDict first, *_DatasetDict rest)
Definition: graphBuilder.py:100
lsst.pipe.base.graphBuilder._InstrumentFinder.visitStringLiteral
def visitStringLiteral(self, value, node)
Definition: graphBuilder.py:783
lsst.pipe.base.graphBuilder._PipelineScaffolding.connectDataIds
def connectDataIds(self, registry, collections, userQuery)
Definition: graphBuilder.py:483
lsst.pipe.base.graphBuilder._PipelineScaffolding.dimensions
dimensions
Definition: graphBuilder.py:416
lsst.pipe.base.graphBuilder._TaskScaffolding.__repr__
def __repr__(self)
Definition: graphBuilder.py:301
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.pipe.base.graphBuilder._QuantumScaffolding.__repr__
def __repr__(self)
Definition: graphBuilder.py:211
lsst.pipe.base.graphBuilder._TaskScaffolding.makeQuantumSet
Set[Quantum] makeQuantumSet(self)
Definition: graphBuilder.py:346
lsst.pipe.base.graphBuilder._PipelineScaffolding.__repr__
def __repr__(self)
Definition: graphBuilder.py:429
lsst.pipe.base.graphBuilder._QuantumScaffolding.prerequisites
prerequisites
Definition: graphBuilder.py:206
lsst.pipe.base.graphBuilder._InstrumentFinder.visitTimeLiteral
def visitTimeLiteral(self, value, node)
Definition: graphBuilder.py:787
lsst.pipe.base.graphBuilder.GraphBuilder._verifyInstrumentRestriction
def _verifyInstrumentRestriction(instrumentName, query)
Definition: graphBuilder.py:939
lsst.pipe.base.graph.graph.QuantumGraph
Definition: graph.py:54
lsst.pipe.base.graphBuilder._QuantumScaffolding.makeQuantum
Quantum makeQuantum(self)
Definition: graphBuilder.py:241
lsst.pipe.base.graphBuilder._DatasetDict.fromDatasetTypes
_DatasetDict fromDatasetTypes(cls, Iterable[DatasetType] datasetTypes, *DimensionUniverse universe)
Definition: graphBuilder.py:80
lsst.pipe.base.graphBuilder.PrerequisiteMissingError
Definition: graphBuilder.py:869
lsst.pipe.base.graphBuilder._DatasetDict
Definition: graphBuilder.py:64
lsst.pipe.base.graphBuilder._TaskScaffolding.dataIds
dataIds
Definition: graphBuilder.py:298
lsst::afw::geom.transform.transformContinued.cls
cls
Definition: transformContinued.py:33
lsst.pipe.base.connections.iterConnections
typing.Generator[BaseConnection, None, None] iterConnections(PipelineTaskConnections connections, Union[str, Iterable[str]] connectionType)
Definition: connections.py:501
lsst.pipe.base.graphBuilder._PipelineScaffolding.__init__
def __init__(self, pipeline, *registry)
Definition: graphBuilder.py:402
lsst.pipe.base.graphBuilder._InstrumentFinder.visitNumericLiteral
def visitNumericLiteral(self, value, node)
Definition: graphBuilder.py:779
lsst.pipe.base.graphBuilder._PipelineScaffolding.resolveDatasetRefs
def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *skipExisting=True)
Definition: graphBuilder.py:572
lsst.pipe.base.graphBuilder._TaskScaffolding.taskDef
taskDef
Definition: graphBuilder.py:286
lsst.pipe.base.graphBuilder._TaskScaffolding.outputs
outputs
Definition: graphBuilder.py:296
lsst.pipe.base.graphBuilder._InstrumentFinder.visitUnaryOp
def visitUnaryOp(self, operator, operand, node)
Definition: graphBuilder.py:800
lsst.pipe.base.graphBuilder._InstrumentFinder.visitBinaryOp
def visitBinaryOp(self, operator, lhs, rhs, node)
Definition: graphBuilder.py:804
lsst.pipe.base.graphBuilder._PipelineScaffolding
Definition: graphBuilder.py:358
lsst.pipe.base.graphBuilder._DatasetDict.extract
Iterator[DatasetRef] extract(self, DatasetType datasetType, Iterable[DataCoordinate] dataIds)
Definition: graphBuilder.py:164
lsst.pipe.base.graphBuilder._DatasetDict.__init__
def __init__(self, *args, DimensionGraph universe)
Definition: graphBuilder.py:75
lsst.pipe.base.graphBuilder._InstrumentFinder
Definition: graphBuilder.py:763
lsst.pipe.base.graphBuilder._DatasetDict.universe
universe
Definition: graphBuilder.py:77
lsst.pex.config.config.doImport
doImport
Definition: config.py:48
lsst::utils
Definition: Backtrace.h:29
lsst.pipe.base.graphBuilder._TaskScaffolding.quanta
quanta
Definition: graphBuilder.py:299
object
lsst.pipe.base.graphBuilder._InstrumentFinder.__init__
def __init__(self)
Definition: graphBuilder.py:776
lsst.pipe.base.graphBuilder.GraphBuilder.skipExisting
skipExisting
Definition: graphBuilder.py:891
lsst.pipe.base.graphBuilder.GraphBuilder.makeGraph
def makeGraph(self, pipeline, collections, run, userQuery)
Definition: graphBuilder.py:893
lsst.pipe.base.graphBuilder._InstrumentFinder.instruments
instruments
Definition: graphBuilder.py:777
lsst.pipe.base.graphBuilder._DatasetDict.unpackMultiRefs
NamedKeyDict[DatasetType, DatasetRef] unpackMultiRefs(self)
Definition: graphBuilder.py:152
lsst.pipe.base.graphBuilder._TaskScaffolding.prerequisites
prerequisites
Definition: graphBuilder.py:297
lsst.pipe.base.graphBuilder._InstrumentFinder.visitParens
def visitParens(self, expression, node)
Definition: graphBuilder.py:816
lsst.pipe.base.graphBuilder._DatasetDict.dimensions
DimensionGraph dimensions(self)
Definition: graphBuilder.py:125
items
std::vector< SchemaItem< Flag > > * items
Definition: BaseColumnView.cc:142
lsst.pipe.base.graphBuilder.GraphBuilder.registry
registry
Definition: graphBuilder.py:889
lsst.pipe.base.graphBuilder._TaskScaffolding
Definition: graphBuilder.py:267
lsst.pipe.base.graphBuilder.GraphBuilder.dimensions
dimensions
Definition: graphBuilder.py:890
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.base.graphBuilder._DatasetDict.unpackSingleRefs
NamedKeyDict[DatasetType, DatasetRef] unpackSingleRefs(self)
Definition: graphBuilder.py:134
lsst.pipe.base.graphBuilder._PipelineScaffolding.tasks
tasks
Definition: graphBuilder.py:404
lsst.pipe.base.graphBuilder._QuantumScaffolding
Definition: graphBuilder.py:186
lsst.pipe.base.graphBuilder._InstrumentFinder.visitIsIn
def visitIsIn(self, lhs, values, not_in, node)
Definition: graphBuilder.py:812
lsst.pipe.base.graphBuilder.GraphBuilder
Definition: graphBuilder.py:875
lsst.pipe.base.graphBuilder.GraphBuilder.__init__
def __init__(self, registry, skipExisting=True)
Definition: graphBuilder.py:888
lsst.pipe.base.graphBuilder.GraphBuilderError
Definition: graphBuilder.py:857
lsst.pipe.base.graphBuilder._InstrumentFinder.visitRangeLiteral
def visitRangeLiteral(self, start, stop, stride, node)
Definition: graphBuilder.py:791
lsst.pipe.base.graphBuilder._QuantumScaffolding.outputs
outputs
Definition: graphBuilder.py:205
lsst.pipe.base.graphBuilder._QuantumScaffolding.inputs
inputs
Definition: graphBuilder.py:204
lsst.pipe.base.graphBuilder._PipelineScaffolding.makeQuantumGraph
def makeQuantumGraph(self)
Definition: graphBuilder.py:750
lsst.pipe.base.graphBuilder._TaskScaffolding.initOutputs
initOutputs
Definition: graphBuilder.py:293
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.pipe.base.graphBuilder.OutputExistsError
Definition: graphBuilder.py:863
lsst.pipe.base.graphBuilder._QuantumScaffolding.dataId
dataId
Definition: graphBuilder.py:203
lsst.pipe.base.graphBuilder._QuantumScaffolding.task
task
Definition: graphBuilder.py:202
lsst.pipe.base.graphBuilder._TaskScaffolding.inputs
inputs
Definition: graphBuilder.py:295
lsst.pipe.base.graphBuilder._InstrumentFinder.visitIdentifier
def visitIdentifier(self, name, node)
Definition: graphBuilder.py:795
lsst.pipe.base.graphBuilder._TaskScaffolding.__init__
def __init__(self, TaskDef taskDef, _PipelineScaffolding parent, TaskDatasetTypes datasetTypes)
Definition: graphBuilder.py:284