22 """This module defines PipelineTask class and related methods. 
   25 __all__ = [
"PipelineTask"]  
 
   27 from .task 
import Task
 
   28 from .butlerQuantumContext 
import ButlerQuantumContext
 
   29 from .connections 
import InputQuantizedConnection, OutputQuantizedConnection
 
   33     """Base class for all pipeline tasks. 
   35     This is an abstract base class for PipelineTasks which represents an 
   36     algorithm executed by framework(s) on data which comes from data butler, 
   37     resulting data is also stored in a data butler. 
   39     PipelineTask inherits from a `pipe.base.Task` and uses the same 
   40     configuration mechanism based on `pex.config`. `PipelineTask` classes also 
   41     have a `PipelineTaskConnections` class associated with their config which 
   42     defines all of the IO a `PipelineTask` will need to do. PipelineTask 
   43     sub-class typically implements `run()` method which receives Python-domain 
   44     data objects and returns `pipe.base.Struct` object with resulting data. 
   45     `run()` method is not supposed to perform any I/O, it operates entirely on 
   46     in-memory objects. `runQuantum()` is the method (can be re-implemented in 
   47     sub-class) where all necessary I/O is performed, it reads all input data 
   48     from data butler into memory, calls `run()` method with that data, examines 
   49     returned `Struct` object and saves some or all of that data back to data 
   50     butler. `runQuantum()` method receives a `ButlerQuantumContext` instance to 
   51     facilitate I/O, a `InputQuantizedConnection` instance which defines all 
   52     input `lsst.daf.butler.DatasetRef`, and a `OutputQuantizedConnection` 
   53     instance which defines all the output `lsst.daf.butler.DatasetRef` for a 
   54     single invocation of PipelineTask. 
   56     Subclasses must be constructable with exactly the arguments taken by the 
   57     PipelineTask base class constructor, but may support other signatures as 
   62     canMultiprocess : bool, True by default (class attribute) 
   63         This class attribute is checked by execution framework, sub-classes 
   64         can set it to ``False`` in case task does not support multiprocessing. 
   68     config : `pex.config.Config`, optional 
   69         Configuration for this task (an instance of ``self.ConfigClass``, 
   70         which is a task-specific subclass of `PipelineTaskConfig`). 
   71         If not specified then it defaults to `self.ConfigClass()`. 
   72     log : `lsst.log.Log`, optional 
   73         Logger instance whose name is used as a log name prefix, or ``None`` 
   75     initInputs : `dict`, optional 
   76         A dictionary of objects needed to construct this PipelineTask, with 
   77         keys matching the keys of the dictionary returned by 
   78         `getInitInputDatasetTypes` and values equivalent to what would be 
   79         obtained by calling `Butler.get` with those DatasetTypes and no data 
   80         IDs.  While it is optional for the base class, subclasses are 
   81         permitted to require this argument. 
   83     canMultiprocess = 
True 
   85     def __init__(self, *, config=None, log=None, initInputs=None, **kwargs):
 
   86         super().
__init__(config=config, log=log, **kwargs)
 
   88     def run(self, **kwargs):
 
   89         """Run task algorithm on in-memory data. 
   91         This method should be implemented in a subclass. This method will 
   92         receive keyword arguments whose names will be the same as names of 
   93         connection fields describing input dataset types. Argument values will 
   94         be data objects retrieved from data butler. If a dataset type is 
   95         configured with ``multiple`` field set to ``True`` then the argument 
   96         value will be a list of objects, otherwise it will be a single object. 
   98         If the task needs to know its input or output DataIds then it has to 
   99         override `runQuantum` method instead. 
  101         This method should return a `Struct` whose attributes share the same 
  102         name as the connection fields describing output dataset types. 
  107             Struct with attribute names corresponding to output connection 
  112         Typical implementation of this method may look like:: 
  114             def run(self, input, calib): 
  115                 # "input", "calib", and "output" are the names of the config fields 
  117                 # Assuming that input/calib datasets are `scalar` they are simple objects, 
  118                 # do something with inputs and calibs, produce output image. 
  119                 image = self.makeImage(input, calib) 
  121                 # If output dataset is `scalar` then return object, not list 
  122                 return Struct(output=image) 
  125         raise NotImplementedError(
"run() is not implemented")
 
  127     def runQuantum(self, butlerQC: ButlerQuantumContext, inputRefs: InputQuantizedConnection,
 
  128                    outputRefs: OutputQuantizedConnection):
 
  129         """Method to do butler IO and or transforms to provide in memory objects for tasks run method 
  133         butlerQC : `ButlerQuantumContext` 
  134             A butler which is specialized to operate in the context of a `lsst.daf.butler.Quantum`. 
  135         inputRefs : `InputQuantizedConnection` 
  136             Datastructure whose attribute names are the names that identify connections defined in 
  137             corresponding `PipelineTaskConnections` class. The values of these attributes are the 
  138             `lsst.daf.butler.DatasetRef` objects associated with the defined input/prerequisite connections. 
  139         outputRefs : `OutputQuantizedConnection` 
  140             Datastructure whose attribute names are the names that identify connections defined in 
  141             corresponding `PipelineTaskConnections` class. The values of these attributes are the 
  142             `lsst.daf.butler.DatasetRef` objects associated with the defined output connections. 
  144         inputs = butlerQC.get(inputRefs)
 
  145         outputs = self.
run(**inputs)
 
  146         butlerQC.put(outputs, outputRefs)
 
  149         """Return resource configuration for this task. 
  153         Object of type `~config.ResourceConfig` or ``None`` if resource 
  154         configuration is not defined for this task. 
  156         return getattr(self.
config, 
"resources", 
None)