22 """Module defining GraphBuilder class and related methods. 25 __all__ = [
'GraphBuilder']
31 from collections
import namedtuple
32 from itertools
import chain
38 from .graph
import QuantumGraphTaskNodes, QuantumGraph
39 from lsst.daf.butler
import Quantum, DatasetRef
45 _LOG = logging.getLogger(__name__.partition(
".")[2])
54 _TaskDatasetTypes = namedtuple(
"_TaskDatasetTypes",
"taskDef inputs outputs initInputs initOutputs")
58 """Base class for exceptions generated by graph builder. 64 """Exception generated when output datasets already exist. 68 refs =
', '.join(
str(ref)
for ref
in refs)
69 msg =
"Output datasets already exist for task {}: {}".
format(taskName, refs)
70 GraphBuilderError.__init__(self, msg)
80 GraphBuilder class is responsible for building task execution graph from 85 taskFactory : `TaskFactory` 86 Factory object used to load/instantiate PipelineTasks 87 registry : `~lsst.daf.butler.Registry` 89 skipExisting : `bool`, optional 90 If ``True`` (default) then Quantum is not created if all its outputs 91 already exist, otherwise exception is raised. 94 def __init__(self, taskFactory, registry, skipExisting=True):
100 def _loadTaskClass(self, taskDef):
101 """Make sure task class is loaded. 103 Load task class, update task name to make sure it is fully-qualified, 104 do not update original taskDef in a Pipeline though. 112 `TaskDef` instance, may be the same as parameter if task class is 115 if taskDef.taskClass
is None:
116 tClass, tName = self.
taskFactory.loadTaskClass(taskDef.taskName)
117 taskDef = copy.copy(taskDef)
118 taskDef.taskClass = tClass
119 taskDef.taskName = tName
123 """Create execution graph for a pipeline. 127 pipeline : `Pipeline` 128 Pipeline definition, task names/classes and their configs. 129 originInfo : `~lsst.daf.butler.DatasetOriginInfo` 130 Object which provides names of the input/output collections. 132 String which defunes user-defined selection for registry, should be 133 empty or `None` if there is no restrictions on data selection. 137 graph : `QuantumGraph` 142 Raised when user expression cannot be parsed. 144 Raised when output datasets already exist. 146 Other exceptions types may be raised by underlying registry 155 for taskDef
in taskList:
156 taskClass = taskDef.taskClass
158 for attr
in (
"Input",
"Output",
"InitInput",
"InitOutput"):
159 getter = getattr(taskClass, f
"get{attr}DatasetTypes")
160 ioObject = getter(taskDef.config)
or {}
161 taskIo.append([dsTypeDescr.datasetType
for dsTypeDescr
in ioObject.values()])
162 taskDatasets.append(_TaskDatasetTypes(taskDef, *taskIo))
168 return self.
_makeGraph(taskDatasets, inputs, outputs, initInputs, initOutputs,
169 originInfo, userQuery)
171 def _makeFullIODatasetTypes(self, taskDatasets):
172 """Returns full set of input and output dataset types for all tasks. 176 taskDatasets : sequence of `_TaskDatasetTypes` 177 Tasks with their inputs, outputs, initInputs and initOutputs. 181 inputs : `set` of `butler.DatasetType` 182 Datasets used as inputs by the pipeline. 183 outputs : `set` of `butler.DatasetType` 184 Datasets produced by the pipeline. 185 initInputs : `set` of `butler.DatasetType` 186 Datasets used as init method inputs by the pipeline. 187 initOutputs : `set` of `butler.DatasetType` 188 Datasets used as init method outputs by the pipeline. 197 for taskDs
in taskDatasets:
198 for ioType, ioSet
in zip((
"inputs",
"outputs",
"initInputs",
"initOutputs"),
199 (inputs, outputs, initInputs, initOutputs)):
200 for dsType
in getattr(taskDs, ioType):
201 ioSet.add(dsType.name)
202 allDatasetTypes[dsType.name] = dsType
207 initInputs -= initOutputs
209 inputs =
set(allDatasetTypes[name]
for name
in inputs)
210 outputs =
set(allDatasetTypes[name]
for name
in outputs)
211 initInputs =
set(allDatasetTypes[name]
for name
in initInputs)
212 initOutputs =
set(allDatasetTypes[name]
for name
in initOutputs)
213 return inputs, outputs, initInputs, initOutputs
215 def _makeGraph(self, taskDatasets, inputs, outputs, initInputs, initOutputs, originInfo, userQuery):
216 """Make QuantumGraph instance. 220 taskDatasets : sequence of `_TaskDatasetTypes` 221 Tasks with their inputs and outputs. 222 inputs : `set` of `DatasetType` 223 Datasets which should already exist in input repository 224 outputs : `set` of `DatasetType` 225 Datasets which will be created by tasks 226 initInputs : `set` of `DatasetType` 227 Datasets which should exist in input repository, and will be used 228 in task initialization 229 initOutputs : `set` of `DatasetType` 230 Datasets which which will be created in task initialization 231 originInfo : `DatasetOriginInfo` 232 Object which provides names of the input/output collections. 234 String which defines user-defined selection for registry, should be 235 empty or `None` if there is no restrictions on data selection. 239 `QuantumGraph` instance. 241 rows = self.
registry.selectDimensions(originInfo, userQuery, inputs, outputs)
247 _LOG.debug(
"row: %s", row)
248 dimensionVerse.append(row)
252 qgraph._inputDatasetTypes = inputs
253 qgraph._outputDatasetTypes = outputs
254 for dsType
in initInputs:
255 for collection
in originInfo.getInputCollections(dsType.name):
256 result = self.
registry.find(collection, dsType)
257 if result
is not None:
258 qgraph.initInputs.append(result)
263 for dsType
in initOutputs:
264 qgraph.initOutputs.append(DatasetRef(dsType, {}))
266 for taskDss
in taskDatasets:
267 taskQuantaInputs = {}
268 taskQuantaOutputs = {}
270 for dimensionName
in taskDss.taskDef.config.quantum.dimensions:
272 qlinks += dimension.links()
273 _LOG.debug(
"task %s qdimensions: %s", taskDss.taskDef.label, qlinks)
277 for row
in dimensionVerse:
278 qkey = tuple((col, row.dataId[col])
for col
in qlinks)
279 _LOG.debug(
"qkey: %s", qkey)
281 def _dataRefKey(dataRef):
282 return tuple(sorted(dataRef.dataId.items()))
284 qinputs = taskQuantaInputs.setdefault(qkey, {})
285 for dsType
in taskDss.inputs:
286 dataRefs = qinputs.setdefault(dsType, {})
287 dataRef = row.datasetRefs[dsType]
288 dataRefs[_dataRefKey(dataRef)] = dataRef
289 _LOG.debug(
"add input dataRef: %s %s", dsType.name, dataRef)
291 qoutputs = taskQuantaOutputs.setdefault(qkey, {})
292 for dsType
in taskDss.outputs:
293 dataRefs = qoutputs.setdefault(dsType, {})
294 dataRef = row.datasetRefs[dsType]
295 dataRefs[_dataRefKey(dataRef)] = dataRef
296 _LOG.debug(
"add output dataRef: %s %s", dsType.name, dataRef)
300 for qkey
in taskQuantaInputs:
302 _LOG.debug(
"make quantum for qkey: %s", qkey)
303 quantum = Quantum(run=
None, task=
None)
306 outputs =
list(chain.from_iterable(dataRefs.values()
307 for dataRefs
in taskQuantaOutputs[qkey].values()))
309 _LOG.debug(
"add output: %s", ref)
311 _LOG.debug(
"all output dataRefs already exist, skip quantum")
313 if any(ref.id
is not None for ref
in outputs):
317 quantum.addOutput(ref)
320 for dataRefs
in taskQuantaInputs[qkey].values():
321 for ref
in dataRefs.values():
322 quantum.addPredictedInput(ref)
323 _LOG.debug(
"add input: %s", ref)
325 quanta.append(quantum)
def __init__(self, taskName, refs)
def makeGraph(self, pipeline, originInfo, userQuery)
def _makeGraph(self, taskDatasets, inputs, outputs, initInputs, initOutputs, originInfo, userQuery)
def _makeFullIODatasetTypes(self, taskDatasets)
daf::base::PropertySet * set
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def _loadTaskClass(self, taskDef)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __init__(self, taskFactory, registry, skipExisting=True)
daf::base::PropertyList * list