LSSTApplications  18.1.0
LSSTDataManagementBasePackage
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining GraphBuilder class and related methods.
23 """
24 
25 __all__ = ['GraphBuilder']
26 
27 # -------------------------------
28 # Imports of standard modules --
29 # -------------------------------
30 import copy
31 from collections import namedtuple
32 from itertools import chain
33 import logging
34 
35 # -----------------------------
36 # Imports for other modules --
37 # -----------------------------
38 from .graph import QuantumGraphTaskNodes, QuantumGraph
39 from lsst.daf.butler import Quantum, DatasetRef, DimensionSet
40 
41 # ----------------------------------
42 # Local non-exported definitions --
43 # ----------------------------------
44 
45 _LOG = logging.getLogger(__name__.partition(".")[2])
46 
47 # Tuple containing TaskDef, its input dataset types and output dataset types
48 #
49 # Attributes
50 # ----------
51 # taskDef : `TaskDef`
52 # inputs : `set` of `DatasetType`
53 # outputs : `set` of `DatasetType`
54 # initTnputs : `set` of `DatasetType`
55 # initOutputs : `set` of `DatasetType`
56 # perDatasetTypeDimensions : `~lsst.daf.butler.DimensionSet`
57 # prerequisite : `set` of `DatasetType`
58 _TaskDatasetTypes = namedtuple("_TaskDatasetTypes", ("taskDef", "inputs", "outputs",
59  "initInputs", "initOutputs",
60  "perDatasetTypeDimensions", "prerequisite"))
61 
62 
63 class GraphBuilderError(Exception):
64  """Base class for exceptions generated by graph builder.
65  """
66  pass
67 
68 
70  """Exception generated when output datasets already exist.
71  """
72 
73  def __init__(self, taskName, refs):
74  refs = ', '.join(str(ref) for ref in refs)
75  msg = "Output datasets already exist for task {}: {}".format(taskName, refs)
76  GraphBuilderError.__init__(self, msg)
77 
78 
80  """Exception generated when a prerequisite dataset does not exist.
81  """
82  pass
83 
84 
85 class GraphBuilder(object):
86  """
87  GraphBuilder class is responsible for building task execution graph from
88  a Pipeline.
89 
90  Parameters
91  ----------
92  taskFactory : `TaskFactory`
93  Factory object used to load/instantiate PipelineTasks
94  registry : `~lsst.daf.butler.Registry`
95  Data butler instance.
96  skipExisting : `bool`, optional
97  If ``True`` (default) then Quantum is not created if all its outputs
98  already exist, otherwise exception is raised.
99  """
100 
101  def __init__(self, taskFactory, registry, skipExisting=True):
102  self.taskFactory = taskFactory
103  self.registry = registry
104  self.dimensions = registry.dimensions
105  self.skipExisting = skipExisting
106 
107  def _loadTaskClass(self, taskDef):
108  """Make sure task class is loaded.
109 
110  Load task class, update task name to make sure it is fully-qualified,
111  do not update original taskDef in a Pipeline though.
112 
113  Parameters
114  ----------
115  taskDef : `TaskDef`
116 
117  Returns
118  -------
119  `TaskDef` instance, may be the same as parameter if task class is
120  already loaded.
121  """
122  if taskDef.taskClass is None:
123  tClass, tName = self.taskFactory.loadTaskClass(taskDef.taskName)
124  taskDef = copy.copy(taskDef)
125  taskDef.taskClass = tClass
126  taskDef.taskName = tName
127  return taskDef
128 
129  def makeGraph(self, pipeline, originInfo, userQuery):
130  """Create execution graph for a pipeline.
131 
132  Parameters
133  ----------
134  pipeline : `Pipeline`
135  Pipeline definition, task names/classes and their configs.
136  originInfo : `~lsst.daf.butler.DatasetOriginInfo`
137  Object which provides names of the input/output collections.
138  userQuery : `str`
139  String which defunes user-defined selection for registry, should be
140  empty or `None` if there is no restrictions on data selection.
141 
142  Returns
143  -------
144  graph : `QuantumGraph`
145 
146  Raises
147  ------
148  UserExpressionError
149  Raised when user expression cannot be parsed.
150  OutputExistsError
151  Raised when output datasets already exist.
152  Exception
153  Other exceptions types may be raised by underlying registry
154  classes.
155  """
156 
157  # make sure all task classes are loaded
158  taskList = [self._loadTaskClass(taskDef) for taskDef in pipeline]
159 
160  # collect inputs/outputs from each task
161  taskDatasets = []
162  for taskDef in taskList:
163  taskClass = taskDef.taskClass
164  inputs = {k: v.datasetType for k, v in taskClass.getInputDatasetTypes(taskDef.config).items()}
165  prerequisite = set(inputs[k] for k in taskClass.getPrerequisiteDatasetTypes(taskDef.config))
166  taskIo = [inputs.values()]
167  for attr in ("Output", "InitInput", "InitOutput"):
168  getter = getattr(taskClass, f"get{attr}DatasetTypes")
169  ioObject = getter(taskDef.config) or {}
170  taskIo.append(set(dsTypeDescr.datasetType for dsTypeDescr in ioObject.values()))
171  perDatasetTypeDimensions = DimensionSet(self.registry.dimensions,
172  taskClass.getPerDatasetTypeDimensions(taskDef.config))
173  taskDatasets.append(_TaskDatasetTypes(taskDef, *taskIo, prerequisite=prerequisite,
174  perDatasetTypeDimensions=perDatasetTypeDimensions))
175 
176  perDatasetTypeDimensions = self._extractPerDatasetTypeDimensions(taskDatasets)
177 
178  # categorize dataset types for the full Pipeline
179  required, optional, prerequisite, initInputs, initOutputs = self._makeFullIODatasetTypes(taskDatasets)
180 
181  # make a graph
182  return self._makeGraph(taskDatasets, required, optional, prerequisite, initInputs, initOutputs,
183  originInfo, userQuery, perDatasetTypeDimensions=perDatasetTypeDimensions)
184 
185  def _extractPerDatasetTypeDimensions(self, taskDatasets):
186  """Return the complete set of all per-DatasetType dimensions declared
187  by any task.
188 
189  Per-DatasetType dimensions are those that need not have the same values
190  for different Datasets within a Quantum.
191 
192  Parameters
193  ----------
194  taskDatasets : sequence of `_TaskDatasetTypes`
195  Information for each task in the pipeline.
196 
197  Returns
198  -------
199  perDatasetTypeDimensions : `~lsst.daf.butler.DimensionSet`
200  All per-DatasetType dimensions.
201 
202  Raises
203  ------
204  ValueError
205  Raised if tasks disagree on whether a dimension is declared
206  per-DatasetType.
207  """
208  # Empty dimension set, just used to construct more DimensionSets via
209  # union method.
210  noDimensions = DimensionSet(self.registry.dimensions, ())
211  # Construct pipeline-wide perDatasetTypeDimensions set from union of
212  # all Task-level perDatasetTypeDimensions.
213  perDatasetTypeDimensions = noDimensions.union(
214  *[taskDs.perDatasetTypeDimensions for taskDs in taskDatasets]
215  )
216  # Check that no tasks want any of these as common (i.e. not
217  # per-DatasetType) dimensions.
218  for taskDs in taskDatasets:
219  allTaskDimensions = noDimensions.union(
220  *[datasetType.dimensions for datasetType in chain(taskDs.inputs, taskDs.outputs)]
221  )
222  commonTaskDimensions = allTaskDimensions - taskDs.perDatasetTypeDimensions
223  if not commonTaskDimensions.isdisjoint(perDatasetTypeDimensions):
224  overlap = commonTaskDimensions.intersections(perDatasetTypeDimensions)
225  raise ValueError(
226  f"Task {taskDs.taskDef.taskName} uses dimensions {overlap} without declaring them "
227  f"per-DatasetType, but they are declared per-DatasetType by another task."
228  )
229  return perDatasetTypeDimensions
230 
231  def _makeFullIODatasetTypes(self, taskDatasets):
232  """Returns full set of input and output dataset types for all tasks.
233 
234  Parameters
235  ----------
236  taskDatasets : sequence of `_TaskDatasetTypes`
237  Tasks with their inputs, outputs, initInputs and initOutputs.
238 
239  Returns
240  -------
241  required : `set` of `~lsst.daf.butler.DatasetType`
242  Datasets that must exist in the repository in order to generate
243  a QuantumGraph node that consumes them.
244  optional : `set` of `~lsst.daf.butler.DatasetType`
245  Datasets that will be produced by the graph, but may exist in the
246  repository. If ``self.skipExisting`` is `True` and all outputs of
247  a particular node already exist, it will be skipped. Otherwise
248  pre-existing datasets of these types will cause
249  `OutputExistsError` to be raised.
250  prerequisite : `set` of `~lsst.daf.butler.DatasetType`
251  Datasets that must exist in the repository, but whose absence
252  should cause `PrerequisiteMissingError` to be raised if they
253  are needed by any graph node that would otherwise be created.
254  initInputs : `set` of `~lsst.daf.butler.DatasetType`
255  Datasets used as init method inputs by the pipeline.
256  initOutputs : `set` of `~lsst.daf.butler.DatasetType`
257  Datasets used as init method outputs by the pipeline.
258  """
259  # to build initial dataset graph we have to collect info about all
260  # datasets to be used by this pipeline
261  allDatasetTypes = {}
262  required = set()
263  optional = set()
264  prerequisite = set()
265  initInputs = set()
266  initOutputs = set()
267  for taskDs in taskDatasets:
268  for ioType, ioSet in zip(("inputs", "outputs", "prerequisite", "initInputs", "initOutputs"),
269  (required, optional, prerequisite, initInputs, initOutputs)):
270  for dsType in getattr(taskDs, ioType):
271  ioSet.add(dsType.name)
272  allDatasetTypes[dsType.name] = dsType
273 
274  # Any dataset the pipeline produces can't be required or prerequisite
275  required -= optional
276  prerequisite -= optional
277 
278  # remove initOutputs from initInputs
279  initInputs -= initOutputs
280 
281  required = set(allDatasetTypes[name] for name in required)
282  optional = set(allDatasetTypes[name] for name in optional)
283  prerequisite = set(allDatasetTypes[name] for name in prerequisite)
284  initInputs = set(allDatasetTypes[name] for name in initInputs)
285  initOutputs = set(allDatasetTypes[name] for name in initOutputs)
286  return required, optional, prerequisite, initInputs, initOutputs
287 
288  def _makeGraph(self, taskDatasets, required, optional, prerequisite,
289  initInputs, initOutputs, originInfo, userQuery,
290  perDatasetTypeDimensions=()):
291  """Make QuantumGraph instance.
292 
293  Parameters
294  ----------
295  taskDatasets : sequence of `_TaskDatasetTypes`
296  Tasks with their inputs and outputs.
297  required : `set` of `~lsst.daf.butler.DatasetType`
298  Datasets that must exist in the repository in order to generate
299  a QuantumGraph node that consumes them.
300  optional : `set` of `~lsst.daf.butler.DatasetType`
301  Datasets that will be produced by the graph, but may exist in
302  the repository. If ``self.skipExisting`` and all outputs of a
303  particular node already exist, it will be skipped. Otherwise
304  pre-existing datasets of these types will cause
305  `OutputExistsError` to be raised.
306  prerequisite : `set` of `~lsst.daf.butler.DatasetType`
307  Datasets that must exist in the repository, but whose absence
308  should cause `PrerequisiteMissingError` to be raised if they
309  are needed by any graph node that would otherwise be created.
310  initInputs : `set` of `DatasetType`
311  Datasets which should exist in input repository, and will be used
312  in task initialization
313  initOutputs : `set` of `DatasetType`
314  Datasets which which will be created in task initialization
315  originInfo : `DatasetOriginInfo`
316  Object which provides names of the input/output collections.
317  userQuery : `str`
318  String which defines user-defined selection for registry, should be
319  empty or `None` if there is no restrictions on data selection.
320  perDatasetTypeDimensions : iterable of `Dimension` or `str`
321  Dimensions (or names thereof) that may have different values for
322  different dataset types within the same quantum.
323 
324  Returns
325  -------
326  `QuantumGraph` instance.
327  """
328  rows = self.registry.selectMultipleDatasetTypes(
329  originInfo, userQuery,
330  required=required, optional=optional, prerequisite=prerequisite,
331  perDatasetTypeDimensions=perDatasetTypeDimensions
332  )
333 
334  # store result locally for multi-pass algorithm below
335  # TODO: change it to single pass
336  dimensionVerse = []
337  try:
338  for row in rows:
339  _LOG.debug("row: %s", row)
340  dimensionVerse.append(row)
341  except LookupError as err:
342  raise PrerequisiteMissingError(str(err)) from err
343 
344  # Next step is to group by task quantum dimensions
345  qgraph = QuantumGraph()
346  qgraph._inputDatasetTypes = (required | prerequisite)
347  qgraph._outputDatasetTypes = optional
348  for dsType in initInputs:
349  for collection in originInfo.getInputCollections(dsType.name):
350  result = self.registry.find(collection, dsType)
351  if result is not None:
352  qgraph.initInputs.append(result)
353  break
354  else:
355  raise GraphBuilderError(f"Could not find initInput {dsType.name} in any input"
356  " collection")
357  for dsType in initOutputs:
358  qgraph.initOutputs.append(DatasetRef(dsType, {}))
359 
360  for taskDss in taskDatasets:
361  taskQuantaInputs = {} # key is the quantum dataId (as tuple)
362  taskQuantaOutputs = {} # key is the quantum dataId (as tuple)
363  qlinks = []
364  for dimensionName in taskDss.taskDef.config.quantum.dimensions:
365  dimension = self.dimensions[dimensionName]
366  qlinks += dimension.links()
367  _LOG.debug("task %s qdimensions: %s", taskDss.taskDef.label, qlinks)
368 
369  # some rows will be non-unique for subset of dimensions, create
370  # temporary structure to remove duplicates
371  for row in dimensionVerse:
372  qkey = tuple((col, row.dataId[col]) for col in qlinks)
373  _LOG.debug("qkey: %s", qkey)
374 
375  def _datasetRefKey(datasetRef):
376  return tuple(sorted(datasetRef.dataId.items()))
377 
378  qinputs = taskQuantaInputs.setdefault(qkey, {})
379  for dsType in taskDss.inputs:
380  datasetRefs = qinputs.setdefault(dsType, {})
381  datasetRef = row.datasetRefs[dsType]
382  datasetRefs[_datasetRefKey(datasetRef)] = datasetRef
383  _LOG.debug("add input datasetRef: %s %s", dsType.name, datasetRef)
384 
385  qoutputs = taskQuantaOutputs.setdefault(qkey, {})
386  for dsType in taskDss.outputs:
387  datasetRefs = qoutputs.setdefault(dsType, {})
388  datasetRef = row.datasetRefs[dsType]
389  datasetRefs[_datasetRefKey(datasetRef)] = datasetRef
390  _LOG.debug("add output datasetRef: %s %s", dsType.name, datasetRef)
391 
392  # all nodes for this task
393  quanta = []
394  for qkey in taskQuantaInputs:
395  # taskQuantaInputs and taskQuantaOutputs have the same keys
396  _LOG.debug("make quantum for qkey: %s", qkey)
397  quantum = Quantum(run=None, task=None)
398 
399  # add all outputs, but check first that outputs don't exist
400  outputs = list(chain.from_iterable(datasetRefs.values()
401  for datasetRefs in taskQuantaOutputs[qkey].values()))
402  for ref in outputs:
403  _LOG.debug("add output: %s", ref)
404  if self.skipExisting and all(ref.id is not None for ref in outputs):
405  _LOG.debug("all output datasetRefs already exist, skip quantum")
406  continue
407  if any(ref.id is not None for ref in outputs):
408  # some outputs exist, can't override them
409  raise OutputExistsError(taskDss.taskDef.taskName, outputs)
410 
411  for ref in outputs:
412  quantum.addOutput(ref)
413 
414  # add all inputs
415  for datasetRefs in taskQuantaInputs[qkey].values():
416  for ref in datasetRefs.values():
417  quantum.addPredictedInput(ref)
418  _LOG.debug("add input: %s", ref)
419 
420  quanta.append(quantum)
421 
422  qgraph.append(QuantumGraphTaskNodes(taskDss.taskDef, quanta))
423 
424  return qgraph
def makeGraph(self, pipeline, originInfo, userQuery)
def _makeFullIODatasetTypes(self, taskDatasets)
daf::base::PropertySet * set
Definition: fits.cc:884
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def _makeGraph(self, taskDatasets, required, optional, prerequisite, initInputs, initOutputs, originInfo, userQuery, perDatasetTypeDimensions=())
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def __init__(self, taskFactory, registry, skipExisting=True)
std::vector< SchemaItem< Flag > > * items
daf::base::PropertyList * list
Definition: fits.cc:885
def _extractPerDatasetTypeDimensions(self, taskDatasets)