LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
graphBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining GraphBuilder class and related methods.
23 """
24 
25 __all__ = ['GraphBuilder']
26 
27 # -------------------------------
28 # Imports of standard modules --
29 # -------------------------------
30 import copy
31 from collections import namedtuple
32 from itertools import chain
33 import logging
34 
35 # -----------------------------
36 # Imports for other modules --
37 # -----------------------------
38 from .graph import QuantumGraphTaskNodes, QuantumGraph
39 from lsst.daf.butler import Quantum, DatasetRef
40 
41 # ----------------------------------
42 # Local non-exported definitions --
43 # ----------------------------------
44 
45 _LOG = logging.getLogger(__name__.partition(".")[2])
46 
47 # Tuple containing TaskDef, its input dataset types and output dataset types
48 #
49 # Attributes
50 # ----------
51 # taskDef : `TaskDef`
52 # inputs : `list` of `DatasetType`
53 # outputs : `list` of `DatasetType`
54 _TaskDatasetTypes = namedtuple("_TaskDatasetTypes", "taskDef inputs outputs initInputs initOutputs")
55 
56 
57 class GraphBuilderError(Exception):
58  """Base class for exceptions generated by graph builder.
59  """
60  pass
61 
62 
64  """Exception generated when output datasets already exist.
65  """
66 
67  def __init__(self, taskName, refs):
68  refs = ', '.join(str(ref) for ref in refs)
69  msg = "Output datasets already exist for task {}: {}".format(taskName, refs)
70  GraphBuilderError.__init__(self, msg)
71 
72 
73 # ------------------------
74 # Exported definitions --
75 # ------------------------
76 
77 
79  """
80  GraphBuilder class is responsible for building task execution graph from
81  a Pipeline.
82 
83  Parameters
84  ----------
85  taskFactory : `TaskFactory`
86  Factory object used to load/instantiate PipelineTasks
87  registry : `~lsst.daf.butler.Registry`
88  Data butler instance.
89  skipExisting : `bool`, optional
90  If ``True`` (default) then Quantum is not created if all its outputs
91  already exist, otherwise exception is raised.
92  """
93 
94  def __init__(self, taskFactory, registry, skipExisting=True):
95  self.taskFactory = taskFactory
96  self.registry = registry
97  self.dimensions = registry.dimensions
98  self.skipExisting = skipExisting
99 
100  def _loadTaskClass(self, taskDef):
101  """Make sure task class is loaded.
102 
103  Load task class, update task name to make sure it is fully-qualified,
104  do not update original taskDef in a Pipeline though.
105 
106  Parameters
107  ----------
108  taskDef : `TaskDef`
109 
110  Returns
111  -------
112  `TaskDef` instance, may be the same as parameter if task class is
113  already loaded.
114  """
115  if taskDef.taskClass is None:
116  tClass, tName = self.taskFactory.loadTaskClass(taskDef.taskName)
117  taskDef = copy.copy(taskDef)
118  taskDef.taskClass = tClass
119  taskDef.taskName = tName
120  return taskDef
121 
122  def makeGraph(self, pipeline, originInfo, userQuery):
123  """Create execution graph for a pipeline.
124 
125  Parameters
126  ----------
127  pipeline : `Pipeline`
128  Pipeline definition, task names/classes and their configs.
129  originInfo : `~lsst.daf.butler.DatasetOriginInfo`
130  Object which provides names of the input/output collections.
131  userQuery : `str`
132  String which defunes user-defined selection for registry, should be
133  empty or `None` if there is no restrictions on data selection.
134 
135  Returns
136  -------
137  graph : `QuantumGraph`
138 
139  Raises
140  ------
141  UserExpressionError
142  Raised when user expression cannot be parsed.
143  OutputExistsError
144  Raised when output datasets already exist.
145  Exception
146  Other exceptions types may be raised by underlying registry
147  classes.
148  """
149 
150  # make sure all task classes are loaded
151  taskList = [self._loadTaskClass(taskDef) for taskDef in pipeline]
152 
153  # collect inputs/outputs from each task
154  taskDatasets = []
155  for taskDef in taskList:
156  taskClass = taskDef.taskClass
157  taskIo = []
158  for attr in ("Input", "Output", "InitInput", "InitOutput"):
159  getter = getattr(taskClass, f"get{attr}DatasetTypes")
160  ioObject = getter(taskDef.config) or {}
161  taskIo.append([dsTypeDescr.datasetType for dsTypeDescr in ioObject.values()])
162  taskDatasets.append(_TaskDatasetTypes(taskDef, *taskIo))
163 
164  # build initial dataset graph
165  inputs, outputs, initInputs, initOutputs = self._makeFullIODatasetTypes(taskDatasets)
166 
167  # make a graph
168  return self._makeGraph(taskDatasets, inputs, outputs, initInputs, initOutputs,
169  originInfo, userQuery)
170 
171  def _makeFullIODatasetTypes(self, taskDatasets):
172  """Returns full set of input and output dataset types for all tasks.
173 
174  Parameters
175  ----------
176  taskDatasets : sequence of `_TaskDatasetTypes`
177  Tasks with their inputs, outputs, initInputs and initOutputs.
178 
179  Returns
180  -------
181  inputs : `set` of `butler.DatasetType`
182  Datasets used as inputs by the pipeline.
183  outputs : `set` of `butler.DatasetType`
184  Datasets produced by the pipeline.
185  initInputs : `set` of `butler.DatasetType`
186  Datasets used as init method inputs by the pipeline.
187  initOutputs : `set` of `butler.DatasetType`
188  Datasets used as init method outputs by the pipeline.
189  """
190  # to build initial dataset graph we have to collect info about all
191  # datasets to be used by this pipeline
192  allDatasetTypes = {}
193  inputs = set()
194  outputs = set()
195  initInputs = set()
196  initOutputs = set()
197  for taskDs in taskDatasets:
198  for ioType, ioSet in zip(("inputs", "outputs", "initInputs", "initOutputs"),
199  (inputs, outputs, initInputs, initOutputs)):
200  for dsType in getattr(taskDs, ioType):
201  ioSet.add(dsType.name)
202  allDatasetTypes[dsType.name] = dsType
203  # remove outputs from inputs
204  inputs -= outputs
205 
206  # remove initOutputs from initInputs
207  initInputs -= initOutputs
208 
209  inputs = set(allDatasetTypes[name] for name in inputs)
210  outputs = set(allDatasetTypes[name] for name in outputs)
211  initInputs = set(allDatasetTypes[name] for name in initInputs)
212  initOutputs = set(allDatasetTypes[name] for name in initOutputs)
213  return inputs, outputs, initInputs, initOutputs
214 
215  def _makeGraph(self, taskDatasets, inputs, outputs, initInputs, initOutputs, originInfo, userQuery):
216  """Make QuantumGraph instance.
217 
218  Parameters
219  ----------
220  taskDatasets : sequence of `_TaskDatasetTypes`
221  Tasks with their inputs and outputs.
222  inputs : `set` of `DatasetType`
223  Datasets which should already exist in input repository
224  outputs : `set` of `DatasetType`
225  Datasets which will be created by tasks
226  initInputs : `set` of `DatasetType`
227  Datasets which should exist in input repository, and will be used
228  in task initialization
229  initOutputs : `set` of `DatasetType`
230  Datasets which which will be created in task initialization
231  originInfo : `DatasetOriginInfo`
232  Object which provides names of the input/output collections.
233  userQuery : `str`
234  String which defines user-defined selection for registry, should be
235  empty or `None` if there is no restrictions on data selection.
236 
237  Returns
238  -------
239  `QuantumGraph` instance.
240  """
241  rows = self.registry.selectDimensions(originInfo, userQuery, inputs, outputs)
242 
243  # store result locally for multi-pass algorithm below
244  # TODO: change it to single pass
245  dimensionVerse = []
246  for row in rows:
247  _LOG.debug("row: %s", row)
248  dimensionVerse.append(row)
249 
250  # Next step is to group by task quantum dimensions
251  qgraph = QuantumGraph()
252  qgraph._inputDatasetTypes = inputs
253  qgraph._outputDatasetTypes = outputs
254  for dsType in initInputs:
255  for collection in originInfo.getInputCollections(dsType.name):
256  result = self.registry.find(collection, dsType)
257  if result is not None:
258  qgraph.initInputs.append(result)
259  break
260  else:
261  raise GraphBuilderError(f"Could not find initInput {dsType.name} in any input"
262  " collection")
263  for dsType in initOutputs:
264  qgraph.initOutputs.append(DatasetRef(dsType, {}))
265 
266  for taskDss in taskDatasets:
267  taskQuantaInputs = {} # key is the quantum dataId (as tuple)
268  taskQuantaOutputs = {} # key is the quantum dataId (as tuple)
269  qlinks = []
270  for dimensionName in taskDss.taskDef.config.quantum.dimensions:
271  dimension = self.dimensions[dimensionName]
272  qlinks += dimension.links()
273  _LOG.debug("task %s qdimensions: %s", taskDss.taskDef.label, qlinks)
274 
275  # some rows will be non-unique for subset of dimensions, create
276  # temporary structure to remove duplicates
277  for row in dimensionVerse:
278  qkey = tuple((col, row.dataId[col]) for col in qlinks)
279  _LOG.debug("qkey: %s", qkey)
280 
281  def _dataRefKey(dataRef):
282  return tuple(sorted(dataRef.dataId.items()))
283 
284  qinputs = taskQuantaInputs.setdefault(qkey, {})
285  for dsType in taskDss.inputs:
286  dataRefs = qinputs.setdefault(dsType, {})
287  dataRef = row.datasetRefs[dsType]
288  dataRefs[_dataRefKey(dataRef)] = dataRef
289  _LOG.debug("add input dataRef: %s %s", dsType.name, dataRef)
290 
291  qoutputs = taskQuantaOutputs.setdefault(qkey, {})
292  for dsType in taskDss.outputs:
293  dataRefs = qoutputs.setdefault(dsType, {})
294  dataRef = row.datasetRefs[dsType]
295  dataRefs[_dataRefKey(dataRef)] = dataRef
296  _LOG.debug("add output dataRef: %s %s", dsType.name, dataRef)
297 
298  # all nodes for this task
299  quanta = []
300  for qkey in taskQuantaInputs:
301  # taskQuantaInputs and taskQuantaOutputs have the same keys
302  _LOG.debug("make quantum for qkey: %s", qkey)
303  quantum = Quantum(run=None, task=None)
304 
305  # add all outputs, but check first that outputs don't exist
306  outputs = list(chain.from_iterable(dataRefs.values()
307  for dataRefs in taskQuantaOutputs[qkey].values()))
308  for ref in outputs:
309  _LOG.debug("add output: %s", ref)
310  if self.skipExisting and all(ref.id is not None for ref in outputs):
311  _LOG.debug("all output dataRefs already exist, skip quantum")
312  continue
313  if any(ref.id is not None for ref in outputs):
314  # some outputs exist, can't override them
315  raise OutputExistsError(taskDss.taskDef.taskName, outputs)
316  for ref in outputs:
317  quantum.addOutput(ref)
318 
319  # add all inputs
320  for dataRefs in taskQuantaInputs[qkey].values():
321  for ref in dataRefs.values():
322  quantum.addPredictedInput(ref)
323  _LOG.debug("add input: %s", ref)
324 
325  quanta.append(quantum)
326 
327  qgraph.append(QuantumGraphTaskNodes(taskDss.taskDef, quanta))
328 
329  return qgraph
def makeGraph(self, pipeline, originInfo, userQuery)
def _makeGraph(self, taskDatasets, inputs, outputs, initInputs, initOutputs, originInfo, userQuery)
def _makeFullIODatasetTypes(self, taskDatasets)
daf::base::PropertySet * set
Definition: fits.cc:832
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def __init__(self, taskFactory, registry, skipExisting=True)
Definition: graphBuilder.py:94
daf::base::PropertyList * list
Definition: fits.cc:833