22 """This module defines PipelineTask class and related methods. 25 __all__ = [
"DatasetTypeDescriptor",
"PipelineTask"]
27 from lsst.daf.butler
import DatasetType
28 from .config
import (InputDatasetConfig, OutputDatasetConfig,
29 InitInputDatasetConfig, InitOutputDatasetConfig)
30 from .task
import Task
34 """Exception raised when dataset type is configured as scalar 35 but there are multiple DataIds in a Quantum for that dataset. 40 Name of the configuration field for dataset type. 42 Actual number of DataIds in a Quantum for this dataset type. 45 super().
__init__((
"Expected scalar for output dataset field {}, " 46 "received {} DataIds").
format(key, numDataIds))
50 """Describe DatasetType and its options for PipelineTask. 52 This class contains DatasetType and all relevant options that are used by 53 PipelineTask. Typically this is derived from configuration classes but 54 sub-classes of PipelineTask can also define additional DatasetTypes that 55 are not part of the task configuration. 59 datasetType : `DatasetType` 61 `True` if this is a scalar dataset. 64 def __init__(self, datasetType, scalar, manualLoad):
71 """Make DatasetTypeDescriptor instance from configuration object. 75 datasetConfig : `lsst.pex.config.Config` 76 Instance of one the `InputDatasetConfig`, `OutputDatasetConfig`, 77 `InitInputDatasetConfig`, or `InitOutputDatasetConfig` types 81 descriptor : `DatasetTypeDescriptor` 83 datasetType = DatasetType(name=datasetConfig.name,
84 dimensions=datasetConfig.dimensions,
85 storageClass=datasetConfig.storageClass)
87 scalar = getattr(datasetConfig,
'scalar',
True)
88 manualLoad = getattr(datasetConfig,
'manualLoad',
False)
89 return cls(datasetType=datasetType, scalar=scalar, manualLoad=manualLoad)
93 """`DatasetType` instance. 99 """`True` if this is a scalar dataset. 105 """`True` if the task will handle loading the data 111 """Base class for all pipeline tasks. 113 This is an abstract base class for PipelineTasks which represents an 114 algorithm executed by framework(s) on data which comes from data butler, 115 resulting data is also stored in a data butler. 117 PipelineTask inherits from a `pipe.base.Task` and uses the same 118 configuration mechanism based on `pex.config`. PipelineTask sub-class 119 typically implements `run()` method which receives Python-domain data 120 objects and returns `pipe.base.Struct` object with resulting data. 121 `run()` method is not supposed to perform any I/O, it operates entirely 122 on in-memory objects. `runQuantum()` is the method (can be re-implemented 123 in sub-class) where all necessary I/O is performed, it reads all input 124 data from data butler into memory, calls `run()` method with that data, 125 examines returned `Struct` object and saves some or all of that data back 126 to data butler. `runQuantum()` method receives `daf.butler.Quantum` 127 instance which defines all input and output datasets for a single 128 invocation of PipelineTask. 130 Subclasses must be constructable with exactly the arguments taken by the 131 PipelineTask base class constructor, but may support other signatures as 136 canMultiprocess : bool, True by default (class attribute) 137 This class attribute is checked by execution framework, sub-classes 138 can set it to ``False`` in case task does not support multiprocessing. 142 config : `pex.config.Config`, optional 143 Configuration for this task (an instance of ``self.ConfigClass``, 144 which is a task-specific subclass of `PipelineTaskConfig`). 145 If not specified then it defaults to `self.ConfigClass()`. 146 log : `lsst.log.Log`, optional 147 Logger instance whose name is used as a log name prefix, or ``None`` 149 initInputs : `dict`, optional 150 A dictionary of objects needed to construct this PipelineTask, with 151 keys matching the keys of the dictionary returned by 152 `getInitInputDatasetTypes` and values equivalent to what would be 153 obtained by calling `Butler.get` with those DatasetTypes and no data 154 IDs. While it is optional for the base class, subclasses are 155 permitted to require this argument. 158 canMultiprocess =
True 160 def __init__(self, *, config=None, log=None, initInputs=None, **kwargs):
161 super().
__init__(config=config, log=log, **kwargs)
164 """Return persistable outputs that are available immediately after 165 the task has been constructed. 167 Subclasses that operate on catalogs should override this method to 168 return the schema(s) of the catalog(s) they produce. 170 It is not necessary to return the PipelineTask's configuration or 171 other provenance information in order for it to be persisted; that is 172 the responsibility of the execution system. 177 Dictionary with keys that match those of the dict returned by 178 `getInitOutputDatasetTypes` values that can be written by calling 179 `Butler.put` with those DatasetTypes and no data IDs. An empty 180 `dict` should be returned by tasks that produce no initialization 187 """Return input dataset type descriptors for this task. 189 Default implementation finds all fields of type `InputDatasetConfig` 190 in configuration (non-recursively) and uses them for constructing 191 `DatasetTypeDescriptor` instances. The names of these fields are used 192 as keys in returned dictionary. Subclasses can override this behavior. 197 Configuration for this task. Typically datasets are defined in 198 a task configuration. 202 Dictionary where key is the name (arbitrary) of the input dataset 203 and value is the `DatasetTypeDescriptor` instance. Default 204 implementation uses configuration field name as dictionary key. 210 """Return output dataset type descriptors for this task. 212 Default implementation finds all fields of type `OutputDatasetConfig` 213 in configuration (non-recursively) and uses them for constructing 214 `DatasetTypeDescriptor` instances. The keys of these fields are used 215 as keys in returned dictionary. Subclasses can override this behavior. 220 Configuration for this task. Typically datasets are defined in 221 a task configuration. 225 Dictionary where key is the name (arbitrary) of the output dataset 226 and value is the `DatasetTypeDescriptor` instance. Default 227 implementation uses configuration field name as dictionary key. 233 """Return dataset type descriptors that can be used to retrieve the 234 ``initInputs`` constructor argument. 236 Datasets used in initialization may not be associated with any 237 Dimension (i.e. their data IDs must be empty dictionaries). 239 Default implementation finds all fields of type 240 `InitInputInputDatasetConfig` in configuration (non-recursively) and 241 uses them for constructing `DatasetTypeDescriptor` instances. The 242 names of these fields are used as keys in returned dictionary. 243 Subclasses can override this behavior. 248 Configuration for this task. Typically datasets are defined in 249 a task configuration. 253 Dictionary where key is the name (arbitrary) of the input dataset 254 and value is the `DatasetTypeDescriptor` instance. Default 255 implementation uses configuration field name as dictionary key. 257 When the task requires no initialization inputs, should return an 264 """Return dataset type descriptors that can be used to write the 265 objects returned by `getOutputDatasets`. 267 Datasets used in initialization may not be associated with any 268 Dimension (i.e. their data IDs must be empty dictionaries). 270 Default implementation finds all fields of type 271 `InitOutputDatasetConfig` in configuration (non-recursively) and uses 272 them for constructing `DatasetTypeDescriptor` instances. The names of 273 these fields are used as keys in returned dictionary. Subclasses can 274 override this behavior. 279 Configuration for this task. Typically datasets are defined in 280 a task configuration. 284 Dictionary where key is the name (arbitrary) of the output dataset 285 and value is the `DatasetTypeDescriptor` instance. Default 286 implementation uses configuration field name as dictionary key. 288 When the task produces no initialization outputs, should return an 295 """Return dataset type descriptors defined in task configuration. 297 This method can be used by other methods that need to extract dataset 298 types from task configuration (e.g. `getInputDatasetTypes` or 304 Configuration for this task. Typically datasets are defined in 305 a task configuration. 307 Class of the configuration object which defines dataset type. 311 Dictionary where key is the name (arbitrary) of the output dataset 312 and value is the `DatasetTypeDescriptor` instance. Default 313 implementation uses configuration field name as dictionary key. 314 Returns empty dict if configuration has no fields with the specified 318 for key, value
in config.items():
319 if isinstance(value, configClass):
320 dsTypes[key] = DatasetTypeDescriptor.fromConfig(value)
324 """Run task algorithm on in-memory data. 326 This method is called by `runQuantum` to operate on input in-memory 327 data and produce coressponding output in-memory data. It receives 328 arguments which are dictionaries with input data and input/output 329 DataIds. Many simple tasks do not need to know DataIds so default 330 implementation of this method calls `run` method passing input data 331 objects as keyword arguments. Most simple tasks will implement `run` 332 method, more complex tasks that need to know about output DataIds 333 will override this method instead. 335 All three arguments to this method are dictionaries with keys equal 336 to the name of the configuration fields for dataset type. If dataset 337 type is configured with ``scalar`` fiels set to ``True`` then it is 338 expected that only one dataset appears on input or output for that 339 dataset type and dictionary value will be a single data object or 340 DataId. Otherwise if ``scalar`` is ``False`` (default) then value 341 will be a list (even if only one item is in the list). 343 The method returns `Struct` instance with attributes matching the 344 configuration fields for output dataset types. Values stored in 345 returned struct are single object if ``scalar`` is ``True`` or 346 list of objects otherwise. If tasks produces more than one object 347 for some dataset type then data objects returned in ``struct`` must 348 match in count and order corresponding DataIds in ``outputDataIds``. 353 Dictionary whose keys are the names of the configuration fields 354 describing input dataset types and values are Python-domain data 355 objects (or lists of objects) retrieved from data butler. 356 inputDataIds : `dict` 357 Dictionary whose keys are the names of the configuration fields 358 describing input dataset types and values are DataIds (or lists 359 of DataIds) that task consumes for corresponding dataset type. 360 DataIds are guaranteed to match data objects in ``inputData`` 361 outputDataIds : `dict` 362 Dictionary whose keys are the names of the configuration fields 363 describing output dataset types and values are DataIds (or lists 364 of DataIds) that task is to produce for corresponding dataset 370 Standard convention is that this method should return `Struct` 371 instance containing all output data. Struct attribute names 372 should correspond to the names of the configuration fields 373 describing task output dataset types. If something different 374 is returned then `saveStruct` method has to be re-implemented 377 return self.
run(**inputData)
380 """Run task algorithm on in-memory data. 382 This method should be implemented in a subclass unless tasks overrides 383 `adaptArgsAndRun` to do something different from its default 384 implementation. With default implementation of `adaptArgsAndRun` this 385 method will receive keyword arguments whose names will be the same as 386 names of configuration fields describing input dataset types. Argument 387 values will be data objects retrieved from data butler. If a dataset 388 type is configured with ``scalar`` field set to ``True`` then argument 389 value will be a single object, otherwise it will be a list of objects. 391 If the task needs to know its input or output DataIds then it has to 392 override `adaptArgsAndRun` method instead. 397 See description of `adaptArgsAndRun` method. 401 Typical implementation of this method may look like:: 403 def run(self, input, calib): 404 # "input", "calib", and "output" are the names of the config fields 406 # Assuming that input/calib datasets are `scalar` they are simple objects, 407 # do something with inputs and calibs, produce output image. 408 image = self.makeImage(input, calib) 410 # If output dataset is `scalar` then return object, not list 411 return Struct(output=image) 414 raise NotImplementedError(
"run() is not implemented")
417 """Execute PipelineTask algorithm on single quantum of data. 419 Typical implementation of this method will use inputs from quantum 420 to retrieve Python-domain objects from data butler and call 421 `adaptArgsAndRun` method on that data. On return from 422 `adaptArgsAndRun` this method will extract data from returned 423 `Struct` instance and save that data to butler. 425 The `Struct` returned from `adaptArgsAndRun` is expected to contain 426 data attributes with the names equal to the names of the 427 configuration fields defining output dataset types. The values of 428 the data attributes must be data objects corresponding to 429 the DataIds of output dataset types. All data objects will be 430 saved in butler using DataRefs from Quantum's output dictionary. 432 This method does not return anything to the caller, on errors 433 corresponding exception is raised. 438 Object describing input and output corresponding to this 439 invocation of PipelineTask instance. 441 Data butler instance. 445 `ScalarError` if a dataset type is configured as scalar but receives 446 multiple DataIds in `quantum`. Any exceptions that happen in data 447 butler or in `adaptArgsAndRun` method. 450 def makeDataRefs(descriptors, refMap):
451 """Generate map of DatasetRefs and DataIds. 453 Given a map of DatasetTypeDescriptor and a map of Quantum 454 DatasetRefs makes maps of DataIds and and DatasetRefs. 455 For scalar dataset types unpacks DatasetRefs and DataIds. 460 Map of (dataset key, DatasetTypeDescriptor). 462 Map of (dataset type name, DatasetRefs). 467 Map of (dataset key, DataIds) 469 Map of (dataset key, DatasetRefs) 474 Raised if dataset type is configured as scalar but more than 475 one DatasetRef exists for it. 479 for key, descriptor
in descriptors.items():
480 keyDataRefs = refMap[descriptor.datasetType.name]
481 keyDataIds = [dataRef.dataId
for dataRef
in keyDataRefs]
482 if descriptor.scalar:
484 if len(keyDataRefs) != 1:
486 keyDataRefs = keyDataRefs[0]
487 keyDataIds = keyDataIds[0]
488 dataIds[key] = keyDataIds
489 if not descriptor.manualLoad:
490 dataRefs[key] = keyDataRefs
491 return dataIds, dataRefs
495 inputDataIds, inputDataRefs = makeDataRefs(descriptors, quantum.predictedInputs)
499 for key, dataRefs
in inputDataRefs.items():
500 if isinstance(dataRefs, list):
501 inputs[key] = [butler.get(dataRef)
for dataRef
in dataRefs]
503 inputs[key] = butler.get(dataRefs)
508 outputDataIds, outputDataRefs = makeDataRefs(descriptors, quantum.outputs)
511 struct = self.
adaptArgsAndRun(inputs, inputDataIds, outputDataIds, butler)
514 self.
saveStruct(struct, outputDataRefs, butler)
517 """Save data in butler. 519 Convention is that struct returned from ``run()`` method has data 520 field(s) with the same names as the config fields defining 521 output DatasetTypes. Subclasses may override this method to implement 522 different convention for `Struct` content or in case any 523 post-processing of data may be needed. 528 Data produced by the task packed into `Struct` instance 529 outputDataRefs : `dict` 530 Dictionary whose keys are the names of the configuration fields 531 describing output dataset types and values are lists of DataRefs. 532 DataRefs must match corresponding data objects in ``struct`` in 535 Data butler instance. 537 structDict = struct.getDict()
539 for key
in descriptors.keys():
540 dataList = structDict[key]
541 dataRefs = outputDataRefs[key]
542 if not isinstance(dataRefs, list):
544 dataRefs = [dataRefs]
545 dataList = [dataList]
547 for dataRef, data
in zip(dataRefs, dataList):
548 butler.put(data, dataRef.datasetType.name, dataRef.dataId)
551 """Return resource configuration for this task. 555 Object of type `~config.ResourceConfig` or ``None`` if resource 556 configuration is not defined for this task. 558 return getattr(self.
config,
"resources",
None)
def getDatasetTypes(cls, config, configClass)
def getInitOutputDatasetTypes(cls, config)
def __init__(self, datasetType, scalar, manualLoad)
def getInitInputDatasetTypes(cls, config)
def runQuantum(self, quantum, butler)
def getInitOutputDatasets(self)
def getOutputDatasetTypes(cls, config)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def fromConfig(cls, datasetConfig)
def __init__(self, key, numDataIds)
def __init__(self, config=None, log=None, initInputs=None, kwargs)
def getInputDatasetTypes(cls, config)
def getResourceConfig(self)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def saveStruct(self, struct, outputDataRefs, butler)