22 """Module defining GraphBuilder class and related methods. 25 __all__ = [
'GraphBuilder']
31 from collections
import namedtuple
32 from itertools
import chain
38 from .graph
import QuantumGraphTaskNodes, QuantumGraph
39 from lsst.daf.butler
import Quantum, DatasetRef, DimensionSet
45 _LOG = logging.getLogger(__name__.partition(
".")[2])
58 _TaskDatasetTypes = namedtuple(
"_TaskDatasetTypes", (
"taskDef",
"inputs",
"outputs",
59 "initInputs",
"initOutputs",
60 "perDatasetTypeDimensions",
"prerequisite"))
64 """Base class for exceptions generated by graph builder. 70 """Exception generated when output datasets already exist. 74 refs =
', '.join(
str(ref)
for ref
in refs)
75 msg =
"Output datasets already exist for task {}: {}".
format(taskName, refs)
76 GraphBuilderError.__init__(self, msg)
80 """Exception generated when a prerequisite dataset does not exist. 87 GraphBuilder class is responsible for building task execution graph from 92 taskFactory : `TaskFactory` 93 Factory object used to load/instantiate PipelineTasks 94 registry : `~lsst.daf.butler.Registry` 96 skipExisting : `bool`, optional 97 If ``True`` (default) then Quantum is not created if all its outputs 98 already exist, otherwise exception is raised. 101 def __init__(self, taskFactory, registry, skipExisting=True):
107 def _loadTaskClass(self, taskDef):
108 """Make sure task class is loaded. 110 Load task class, update task name to make sure it is fully-qualified, 111 do not update original taskDef in a Pipeline though. 119 `TaskDef` instance, may be the same as parameter if task class is 122 if taskDef.taskClass
is None:
123 tClass, tName = self.
taskFactory.loadTaskClass(taskDef.taskName)
124 taskDef = copy.copy(taskDef)
125 taskDef.taskClass = tClass
126 taskDef.taskName = tName
130 """Create execution graph for a pipeline. 134 pipeline : `Pipeline` 135 Pipeline definition, task names/classes and their configs. 136 originInfo : `~lsst.daf.butler.DatasetOriginInfo` 137 Object which provides names of the input/output collections. 139 String which defunes user-defined selection for registry, should be 140 empty or `None` if there is no restrictions on data selection. 144 graph : `QuantumGraph` 149 Raised when user expression cannot be parsed. 151 Raised when output datasets already exist. 153 Other exceptions types may be raised by underlying registry 162 for taskDef
in taskList:
163 taskClass = taskDef.taskClass
164 inputs = {k: v.datasetType
for k, v
in taskClass.getInputDatasetTypes(taskDef.config).
items()}
165 prerequisite =
set(inputs[k]
for k
in taskClass.getPrerequisiteDatasetTypes(taskDef.config))
166 taskIo = [inputs.values()]
167 for attr
in (
"Output",
"InitInput",
"InitOutput"):
168 getter = getattr(taskClass, f
"get{attr}DatasetTypes")
169 ioObject = getter(taskDef.config)
or {}
170 taskIo.append(
set(dsTypeDescr.datasetType
for dsTypeDescr
in ioObject.values()))
171 perDatasetTypeDimensions = DimensionSet(self.
registry.dimensions,
172 taskClass.getPerDatasetTypeDimensions(taskDef.config))
173 taskDatasets.append(_TaskDatasetTypes(taskDef, *taskIo, prerequisite=prerequisite,
174 perDatasetTypeDimensions=perDatasetTypeDimensions))
182 return self.
_makeGraph(taskDatasets, required, optional, prerequisite, initInputs, initOutputs,
183 originInfo, userQuery, perDatasetTypeDimensions=perDatasetTypeDimensions)
185 def _extractPerDatasetTypeDimensions(self, taskDatasets):
186 """Return the complete set of all per-DatasetType dimensions declared 189 Per-DatasetType dimensions are those that need not have the same values 190 for different Datasets within a Quantum. 194 taskDatasets : sequence of `_TaskDatasetTypes` 195 Information for each task in the pipeline. 199 perDatasetTypeDimensions : `~lsst.daf.butler.DimensionSet` 200 All per-DatasetType dimensions. 205 Raised if tasks disagree on whether a dimension is declared 210 noDimensions = DimensionSet(self.
registry.dimensions, ())
213 perDatasetTypeDimensions = noDimensions.union(
214 *[taskDs.perDatasetTypeDimensions
for taskDs
in taskDatasets]
218 for taskDs
in taskDatasets:
219 allTaskDimensions = noDimensions.union(
220 *[datasetType.dimensions
for datasetType
in chain(taskDs.inputs, taskDs.outputs)]
222 commonTaskDimensions = allTaskDimensions - taskDs.perDatasetTypeDimensions
223 if not commonTaskDimensions.isdisjoint(perDatasetTypeDimensions):
224 overlap = commonTaskDimensions.intersections(perDatasetTypeDimensions)
226 f
"Task {taskDs.taskDef.taskName} uses dimensions {overlap} without declaring them " 227 f
"per-DatasetType, but they are declared per-DatasetType by another task." 229 return perDatasetTypeDimensions
231 def _makeFullIODatasetTypes(self, taskDatasets):
232 """Returns full set of input and output dataset types for all tasks. 236 taskDatasets : sequence of `_TaskDatasetTypes` 237 Tasks with their inputs, outputs, initInputs and initOutputs. 241 required : `set` of `~lsst.daf.butler.DatasetType` 242 Datasets that must exist in the repository in order to generate 243 a QuantumGraph node that consumes them. 244 optional : `set` of `~lsst.daf.butler.DatasetType` 245 Datasets that will be produced by the graph, but may exist in the 246 repository. If ``self.skipExisting`` is `True` and all outputs of 247 a particular node already exist, it will be skipped. Otherwise 248 pre-existing datasets of these types will cause 249 `OutputExistsError` to be raised. 250 prerequisite : `set` of `~lsst.daf.butler.DatasetType` 251 Datasets that must exist in the repository, but whose absence 252 should cause `PrerequisiteMissingError` to be raised if they 253 are needed by any graph node that would otherwise be created. 254 initInputs : `set` of `~lsst.daf.butler.DatasetType` 255 Datasets used as init method inputs by the pipeline. 256 initOutputs : `set` of `~lsst.daf.butler.DatasetType` 257 Datasets used as init method outputs by the pipeline. 267 for taskDs
in taskDatasets:
268 for ioType, ioSet
in zip((
"inputs",
"outputs",
"prerequisite",
"initInputs",
"initOutputs"),
269 (required, optional, prerequisite, initInputs, initOutputs)):
270 for dsType
in getattr(taskDs, ioType):
271 ioSet.add(dsType.name)
272 allDatasetTypes[dsType.name] = dsType
276 prerequisite -= optional
279 initInputs -= initOutputs
281 required =
set(allDatasetTypes[name]
for name
in required)
282 optional =
set(allDatasetTypes[name]
for name
in optional)
283 prerequisite =
set(allDatasetTypes[name]
for name
in prerequisite)
284 initInputs =
set(allDatasetTypes[name]
for name
in initInputs)
285 initOutputs =
set(allDatasetTypes[name]
for name
in initOutputs)
286 return required, optional, prerequisite, initInputs, initOutputs
288 def _makeGraph(self, taskDatasets, required, optional, prerequisite,
289 initInputs, initOutputs, originInfo, userQuery,
290 perDatasetTypeDimensions=()):
291 """Make QuantumGraph instance. 295 taskDatasets : sequence of `_TaskDatasetTypes` 296 Tasks with their inputs and outputs. 297 required : `set` of `~lsst.daf.butler.DatasetType` 298 Datasets that must exist in the repository in order to generate 299 a QuantumGraph node that consumes them. 300 optional : `set` of `~lsst.daf.butler.DatasetType` 301 Datasets that will be produced by the graph, but may exist in 302 the repository. If ``self.skipExisting`` and all outputs of a 303 particular node already exist, it will be skipped. Otherwise 304 pre-existing datasets of these types will cause 305 `OutputExistsError` to be raised. 306 prerequisite : `set` of `~lsst.daf.butler.DatasetType` 307 Datasets that must exist in the repository, but whose absence 308 should cause `PrerequisiteMissingError` to be raised if they 309 are needed by any graph node that would otherwise be created. 310 initInputs : `set` of `DatasetType` 311 Datasets which should exist in input repository, and will be used 312 in task initialization 313 initOutputs : `set` of `DatasetType` 314 Datasets which which will be created in task initialization 315 originInfo : `DatasetOriginInfo` 316 Object which provides names of the input/output collections. 318 String which defines user-defined selection for registry, should be 319 empty or `None` if there is no restrictions on data selection. 320 perDatasetTypeDimensions : iterable of `Dimension` or `str` 321 Dimensions (or names thereof) that may have different values for 322 different dataset types within the same quantum. 326 `QuantumGraph` instance. 328 rows = self.
registry.selectMultipleDatasetTypes(
329 originInfo, userQuery,
330 required=required, optional=optional, prerequisite=prerequisite,
331 perDatasetTypeDimensions=perDatasetTypeDimensions
339 _LOG.debug(
"row: %s", row)
340 dimensionVerse.append(row)
341 except LookupError
as err:
346 qgraph._inputDatasetTypes = (required | prerequisite)
347 qgraph._outputDatasetTypes = optional
348 for dsType
in initInputs:
349 for collection
in originInfo.getInputCollections(dsType.name):
350 result = self.
registry.find(collection, dsType)
351 if result
is not None:
352 qgraph.initInputs.append(result)
357 for dsType
in initOutputs:
358 qgraph.initOutputs.append(DatasetRef(dsType, {}))
360 for taskDss
in taskDatasets:
361 taskQuantaInputs = {}
362 taskQuantaOutputs = {}
364 for dimensionName
in taskDss.taskDef.config.quantum.dimensions:
366 qlinks += dimension.links()
367 _LOG.debug(
"task %s qdimensions: %s", taskDss.taskDef.label, qlinks)
371 for row
in dimensionVerse:
372 qkey = tuple((col, row.dataId[col])
for col
in qlinks)
373 _LOG.debug(
"qkey: %s", qkey)
375 def _datasetRefKey(datasetRef):
376 return tuple(sorted(datasetRef.dataId.items()))
378 qinputs = taskQuantaInputs.setdefault(qkey, {})
379 for dsType
in taskDss.inputs:
380 datasetRefs = qinputs.setdefault(dsType, {})
381 datasetRef = row.datasetRefs[dsType]
382 datasetRefs[_datasetRefKey(datasetRef)] = datasetRef
383 _LOG.debug(
"add input datasetRef: %s %s", dsType.name, datasetRef)
385 qoutputs = taskQuantaOutputs.setdefault(qkey, {})
386 for dsType
in taskDss.outputs:
387 datasetRefs = qoutputs.setdefault(dsType, {})
388 datasetRef = row.datasetRefs[dsType]
389 datasetRefs[_datasetRefKey(datasetRef)] = datasetRef
390 _LOG.debug(
"add output datasetRef: %s %s", dsType.name, datasetRef)
394 for qkey
in taskQuantaInputs:
396 _LOG.debug(
"make quantum for qkey: %s", qkey)
397 quantum = Quantum(run=
None, task=
None)
400 outputs =
list(chain.from_iterable(datasetRefs.values()
401 for datasetRefs
in taskQuantaOutputs[qkey].values()))
403 _LOG.debug(
"add output: %s", ref)
405 _LOG.debug(
"all output datasetRefs already exist, skip quantum")
407 if any(ref.id
is not None for ref
in outputs):
412 quantum.addOutput(ref)
415 for datasetRefs
in taskQuantaInputs[qkey].values():
416 for ref
in datasetRefs.values():
417 quantum.addPredictedInput(ref)
418 _LOG.debug(
"add input: %s", ref)
420 quanta.append(quantum)
def __init__(self, taskName, refs)
def makeGraph(self, pipeline, originInfo, userQuery)
def _makeFullIODatasetTypes(self, taskDatasets)
daf::base::PropertySet * set
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def _makeGraph(self, taskDatasets, required, optional, prerequisite, initInputs, initOutputs, originInfo, userQuery, perDatasetTypeDimensions=())
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def _loadTaskClass(self, taskDef)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __init__(self, taskFactory, registry, skipExisting=True)
std::vector< SchemaItem< Flag > > * items
daf::base::PropertyList * list
def _extractPerDatasetTypeDimensions(self, taskDatasets)