22 """This module defines PipelineTask class and related methods. 25 __all__ = [
"DatasetTypeDescriptor",
"PipelineTask"]
27 from lsst.daf.butler
import DatasetType
28 from .config
import (InputDatasetConfig, OutputDatasetConfig,
29 InitInputDatasetConfig, InitOutputDatasetConfig)
30 from .task
import Task
34 """Exception raised when dataset type is configured as scalar 35 but there are multiple DataIds in a Quantum for that dataset. 40 Name of the configuration field for dataset type. 42 Actual number of DataIds in a Quantum for this dataset type. 45 super().
__init__((
"Expected scalar for output dataset field {}, " 46 "received {} DataIds").
format(key, numDataIds))
50 """Describe DatasetType and its options for PipelineTask. 52 This class contains DatasetType and all relevant options that are used by 53 PipelineTask. Typically this is derived from configuration classes but 54 sub-classes of PipelineTask can also define additional DatasetTypes that 55 are not part of the task configuration. 59 datasetType : `DatasetType` 61 `True` if this is a scalar dataset. 63 `True` if this dataset will be manually loaded by a concrete 64 `PipelineTask` instead of loaded automatically by the base class. 67 def __init__(self, datasetType, scalar, manualLoad):
74 """Make DatasetTypeDescriptor instance from configuration object. 78 datasetConfig : `lsst.pex.config.Config` 79 Instance of one the `InputDatasetConfig`, `OutputDatasetConfig`, 80 `InitInputDatasetConfig`, or `InitOutputDatasetConfig` types 84 descriptor : `DatasetTypeDescriptor` 86 datasetType = DatasetType(name=datasetConfig.name,
87 dimensions=datasetConfig.dimensions,
88 storageClass=datasetConfig.storageClass)
90 scalar = getattr(datasetConfig,
'scalar',
True)
91 manualLoad = getattr(datasetConfig,
'manualLoad',
False)
92 return cls(datasetType=datasetType, scalar=scalar, manualLoad=manualLoad)
96 """`DatasetType` instance. 102 """`True` if this is a scalar dataset. 108 """`True` if the task will handle loading the data 114 """Base class for all pipeline tasks. 116 This is an abstract base class for PipelineTasks which represents an 117 algorithm executed by framework(s) on data which comes from data butler, 118 resulting data is also stored in a data butler. 120 PipelineTask inherits from a `pipe.base.Task` and uses the same 121 configuration mechanism based on `pex.config`. PipelineTask sub-class 122 typically implements `run()` method which receives Python-domain data 123 objects and returns `pipe.base.Struct` object with resulting data. 124 `run()` method is not supposed to perform any I/O, it operates entirely 125 on in-memory objects. `runQuantum()` is the method (can be re-implemented 126 in sub-class) where all necessary I/O is performed, it reads all input 127 data from data butler into memory, calls `run()` method with that data, 128 examines returned `Struct` object and saves some or all of that data back 129 to data butler. `runQuantum()` method receives `daf.butler.Quantum` 130 instance which defines all input and output datasets for a single 131 invocation of PipelineTask. 133 Subclasses must be constructable with exactly the arguments taken by the 134 PipelineTask base class constructor, but may support other signatures as 139 canMultiprocess : bool, True by default (class attribute) 140 This class attribute is checked by execution framework, sub-classes 141 can set it to ``False`` in case task does not support multiprocessing. 145 config : `pex.config.Config`, optional 146 Configuration for this task (an instance of ``self.ConfigClass``, 147 which is a task-specific subclass of `PipelineTaskConfig`). 148 If not specified then it defaults to `self.ConfigClass()`. 149 log : `lsst.log.Log`, optional 150 Logger instance whose name is used as a log name prefix, or ``None`` 152 initInputs : `dict`, optional 153 A dictionary of objects needed to construct this PipelineTask, with 154 keys matching the keys of the dictionary returned by 155 `getInitInputDatasetTypes` and values equivalent to what would be 156 obtained by calling `Butler.get` with those DatasetTypes and no data 157 IDs. While it is optional for the base class, subclasses are 158 permitted to require this argument. 161 canMultiprocess =
True 163 def __init__(self, *, config=None, log=None, initInputs=None, **kwargs):
164 super().
__init__(config=config, log=log, **kwargs)
167 """Return persistable outputs that are available immediately after 168 the task has been constructed. 170 Subclasses that operate on catalogs should override this method to 171 return the schema(s) of the catalog(s) they produce. 173 It is not necessary to return the PipelineTask's configuration or 174 other provenance information in order for it to be persisted; that is 175 the responsibility of the execution system. 180 Dictionary with keys that match those of the dict returned by 181 `getInitOutputDatasetTypes` values that can be written by calling 182 `Butler.put` with those DatasetTypes and no data IDs. An empty 183 `dict` should be returned by tasks that produce no initialization 190 """Return input dataset type descriptors for this task. 192 Default implementation finds all fields of type `InputDatasetConfig` 193 in configuration (non-recursively) and uses them for constructing 194 `DatasetTypeDescriptor` instances. The names of these fields are used 195 as keys in returned dictionary. Subclasses can override this behavior. 200 Configuration for this task. Typically datasets are defined in 201 a task configuration. 205 Dictionary where key is the name (arbitrary) of the input dataset 206 and value is the `DatasetTypeDescriptor` instance. Default 207 implementation uses configuration field name as dictionary key. 213 """Return output dataset type descriptors for this task. 215 Default implementation finds all fields of type `OutputDatasetConfig` 216 in configuration (non-recursively) and uses them for constructing 217 `DatasetTypeDescriptor` instances. The keys of these fields are used 218 as keys in returned dictionary. Subclasses can override this behavior. 223 Configuration for this task. Typically datasets are defined in 224 a task configuration. 228 Dictionary where key is the name (arbitrary) of the output dataset 229 and value is the `DatasetTypeDescriptor` instance. Default 230 implementation uses configuration field name as dictionary key. 236 """Return the local names of input dataset types that should be 237 assumed to exist instead of constraining what data to process with 240 Usually, when running a `PipelineTask`, the presence of input datasets 241 constrains the processing to be done (as defined by the `QuantumGraph` 242 generated during "preflight"). "Prerequisites" are special input 243 datasets that do not constrain that graph, but instead cause a hard 244 failure when missing. Calibration products and reference catalogs 245 are examples of dataset types that should usually be marked as 251 Configuration for this task. Typically datasets are defined in 252 a task configuration. 256 prerequisite : `~collections.abc.Set` of `str` 257 The keys in the dictionary returned by `getInputDatasetTypes` that 258 represent dataset types that should be considered prerequisites. 259 Names returned here that are not keys in that dictionary are 260 ignored; that way, if a config option removes an input dataset type 261 only `getInputDatasetTypes` needs to be updated. 267 """Return dataset type descriptors that can be used to retrieve the 268 ``initInputs`` constructor argument. 270 Datasets used in initialization may not be associated with any 271 Dimension (i.e. their data IDs must be empty dictionaries). 273 Default implementation finds all fields of type 274 `InitInputInputDatasetConfig` in configuration (non-recursively) and 275 uses them for constructing `DatasetTypeDescriptor` instances. The 276 names of these fields are used as keys in returned dictionary. 277 Subclasses can override this behavior. 282 Configuration for this task. Typically datasets are defined in 283 a task configuration. 287 Dictionary where key is the name (arbitrary) of the input dataset 288 and value is the `DatasetTypeDescriptor` instance. Default 289 implementation uses configuration field name as dictionary key. 291 When the task requires no initialization inputs, should return an 298 """Return dataset type descriptors that can be used to write the 299 objects returned by `getOutputDatasets`. 301 Datasets used in initialization may not be associated with any 302 Dimension (i.e. their data IDs must be empty dictionaries). 304 Default implementation finds all fields of type 305 `InitOutputDatasetConfig` in configuration (non-recursively) and uses 306 them for constructing `DatasetTypeDescriptor` instances. The names of 307 these fields are used as keys in returned dictionary. Subclasses can 308 override this behavior. 313 Configuration for this task. Typically datasets are defined in 314 a task configuration. 318 Dictionary where key is the name (arbitrary) of the output dataset 319 and value is the `DatasetTypeDescriptor` instance. Default 320 implementation uses configuration field name as dictionary key. 322 When the task produces no initialization outputs, should return an 329 """Return dataset type descriptors defined in task configuration. 331 This method can be used by other methods that need to extract dataset 332 types from task configuration (e.g. `getInputDatasetTypes` or 338 Configuration for this task. Typically datasets are defined in 339 a task configuration. 341 Class of the configuration object which defines dataset type. 345 Dictionary where key is the name (arbitrary) of the output dataset 346 and value is the `DatasetTypeDescriptor` instance. Default 347 implementation uses configuration field name as dictionary key. 348 Returns empty dict if configuration has no fields with the specified 352 for key, value
in config.items():
353 if isinstance(value, configClass):
354 dsTypes[key] = DatasetTypeDescriptor.fromConfig(value)
359 """Return any Dimensions that are permitted to have different values 360 for different DatasetTypes within the same quantum. 365 Configuration for this task. 369 dimensions : `~collections.abc.Set` of `Dimension` or `str` 370 The dimensions or names thereof that should be considered 375 Any Dimension declared to be per-DatasetType by a PipelineTask must 376 also be declared to be per-DatasetType by other PipelineTasks in the 379 The classic example of a per-DatasetType dimension is the 380 ``CalibrationLabel`` dimension that maps to a validity range for 381 master calibrations. When running Instrument Signature Removal, one 382 does not care that different dataset types like flat, bias, and dark 383 have different validity ranges, as long as those validity ranges all 384 overlap the relevant observation. 389 """Run task algorithm on in-memory data. 391 This method is called by `runQuantum` to operate on input in-memory 392 data and produce coressponding output in-memory data. It receives 393 arguments which are dictionaries with input data and input/output 394 DataIds. Many simple tasks do not need to know DataIds so default 395 implementation of this method calls `run` method passing input data 396 objects as keyword arguments. Most simple tasks will implement `run` 397 method, more complex tasks that need to know about output DataIds 398 will override this method instead. 400 All three arguments to this method are dictionaries with keys equal 401 to the name of the configuration fields for dataset type. If dataset 402 type is configured with ``scalar`` fiels set to ``True`` then it is 403 expected that only one dataset appears on input or output for that 404 dataset type and dictionary value will be a single data object or 405 DataId. Otherwise if ``scalar`` is ``False`` (default) then value 406 will be a list (even if only one item is in the list). 408 The method returns `Struct` instance with attributes matching the 409 configuration fields for output dataset types. Values stored in 410 returned struct are single object if ``scalar`` is ``True`` or 411 list of objects otherwise. If tasks produces more than one object 412 for some dataset type then data objects returned in ``struct`` must 413 match in count and order corresponding DataIds in ``outputDataIds``. 418 Dictionary whose keys are the names of the configuration fields 419 describing input dataset types and values are Python-domain data 420 objects (or lists of objects) retrieved from data butler. 421 inputDataIds : `dict` 422 Dictionary whose keys are the names of the configuration fields 423 describing input dataset types and values are DataIds (or lists 424 of DataIds) that task consumes for corresponding dataset type. 425 DataIds are guaranteed to match data objects in ``inputData`` 426 outputDataIds : `dict` 427 Dictionary whose keys are the names of the configuration fields 428 describing output dataset types and values are DataIds (or lists 429 of DataIds) that task is to produce for corresponding dataset 435 Standard convention is that this method should return `Struct` 436 instance containing all output data. Struct attribute names 437 should correspond to the names of the configuration fields 438 describing task output dataset types. If something different 439 is returned then `saveStruct` method has to be re-implemented 442 return self.
run(**inputData)
445 """Run task algorithm on in-memory data. 447 This method should be implemented in a subclass unless tasks overrides 448 `adaptArgsAndRun` to do something different from its default 449 implementation. With default implementation of `adaptArgsAndRun` this 450 method will receive keyword arguments whose names will be the same as 451 names of configuration fields describing input dataset types. Argument 452 values will be data objects retrieved from data butler. If a dataset 453 type is configured with ``scalar`` field set to ``True`` then argument 454 value will be a single object, otherwise it will be a list of objects. 456 If the task needs to know its input or output DataIds then it has to 457 override `adaptArgsAndRun` method instead. 462 See description of `adaptArgsAndRun` method. 466 Typical implementation of this method may look like:: 468 def run(self, input, calib): 469 # "input", "calib", and "output" are the names of the config fields 471 # Assuming that input/calib datasets are `scalar` they are simple objects, 472 # do something with inputs and calibs, produce output image. 473 image = self.makeImage(input, calib) 475 # If output dataset is `scalar` then return object, not list 476 return Struct(output=image) 479 raise NotImplementedError(
"run() is not implemented")
482 """Execute PipelineTask algorithm on single quantum of data. 484 Typical implementation of this method will use inputs from quantum 485 to retrieve Python-domain objects from data butler and call 486 `adaptArgsAndRun` method on that data. On return from 487 `adaptArgsAndRun` this method will extract data from returned 488 `Struct` instance and save that data to butler. 490 The `Struct` returned from `adaptArgsAndRun` is expected to contain 491 data attributes with the names equal to the names of the 492 configuration fields defining output dataset types. The values of 493 the data attributes must be data objects corresponding to 494 the DataIds of output dataset types. All data objects will be 495 saved in butler using DataRefs from Quantum's output dictionary. 497 This method does not return anything to the caller, on errors 498 corresponding exception is raised. 503 Object describing input and output corresponding to this 504 invocation of PipelineTask instance. 506 Data butler instance. 510 `ScalarError` if a dataset type is configured as scalar but receives 511 multiple DataIds in `quantum`. Any exceptions that happen in data 512 butler or in `adaptArgsAndRun` method. 515 def makeDataRefs(descriptors, refMap):
516 """Generate map of DatasetRefs and DataIds. 518 Given a map of DatasetTypeDescriptor and a map of Quantum 519 DatasetRefs makes maps of DataIds and and DatasetRefs. 520 For scalar dataset types unpacks DatasetRefs and DataIds. 525 Map of (dataset key, DatasetTypeDescriptor). 527 Map of (dataset type name, DatasetRefs). 532 Map of (dataset key, DataIds) 534 Map of (dataset key, DatasetRefs) 539 Raised if dataset type is configured as scalar but more than 540 one DatasetRef exists for it. 544 for key, descriptor
in descriptors.items():
545 keyDataRefs = refMap[descriptor.datasetType.name]
546 keyDataIds = [dataRef.dataId
for dataRef
in keyDataRefs]
547 if descriptor.scalar:
549 if len(keyDataRefs) != 1:
551 keyDataRefs = keyDataRefs[0]
552 keyDataIds = keyDataIds[0]
553 dataIds[key] = keyDataIds
554 if not descriptor.manualLoad:
555 dataRefs[key] = keyDataRefs
556 return dataIds, dataRefs
560 inputDataIds, inputDataRefs = makeDataRefs(descriptors, quantum.predictedInputs)
564 for key, dataRefs
in inputDataRefs.items():
565 if isinstance(dataRefs, list):
566 inputs[key] = [butler.get(dataRef)
for dataRef
in dataRefs]
568 inputs[key] = butler.get(dataRefs)
573 outputDataIds, outputDataRefs = makeDataRefs(descriptors, quantum.outputs)
576 struct = self.
adaptArgsAndRun(inputs, inputDataIds, outputDataIds, butler)
579 self.
saveStruct(struct, outputDataRefs, butler)
582 """Save data in butler. 584 Convention is that struct returned from ``run()`` method has data 585 field(s) with the same names as the config fields defining 586 output DatasetTypes. Subclasses may override this method to implement 587 different convention for `Struct` content or in case any 588 post-processing of data may be needed. 593 Data produced by the task packed into `Struct` instance 594 outputDataRefs : `dict` 595 Dictionary whose keys are the names of the configuration fields 596 describing output dataset types and values are lists of DataRefs. 597 DataRefs must match corresponding data objects in ``struct`` in 600 Data butler instance. 602 structDict = struct.getDict()
604 for key
in descriptors.keys():
605 dataList = structDict[key]
606 dataRefs = outputDataRefs[key]
607 if not isinstance(dataRefs, list):
609 dataRefs = [dataRefs]
610 dataList = [dataList]
612 for dataRef, data
in zip(dataRefs, dataList):
613 butler.put(data, dataRef.datasetType.name, dataRef.dataId)
616 """Return resource configuration for this task. 620 Object of type `~config.ResourceConfig` or ``None`` if resource 621 configuration is not defined for this task. 623 return getattr(self.
config,
"resources",
None)
def getDatasetTypes(cls, config, configClass)
def getPerDatasetTypeDimensions(cls, config)
def getInitOutputDatasetTypes(cls, config)
def __init__(self, datasetType, scalar, manualLoad)
def getInitInputDatasetTypes(cls, config)
def getPrerequisiteDatasetTypes(cls, config)
def runQuantum(self, quantum, butler)
def getInitOutputDatasets(self)
def getOutputDatasetTypes(cls, config)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def fromConfig(cls, datasetConfig)
def __init__(self, key, numDataIds)
def __init__(self, config=None, log=None, initInputs=None, kwargs)
def getInputDatasetTypes(cls, config)
def getResourceConfig(self)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def saveStruct(self, struct, outputDataRefs, butler)