Base class for all pipeline tasks.
This is an abstract base class for PipelineTasks which represents an
algorithm executed by framework(s) on data which comes from data butler,
resulting data is also stored in a data butler.
PipelineTask inherits from a `pipe.base.Task` and uses the same
configuration mechanism based on `pex.config`. `PipelineTask` classes also
have a `PipelineTaskConnections` class associated with their config which
defines all of the IO a `PipelineTask` will need to do. PipelineTask
sub-class typically implements `run()` method which receives Python-domain
data objects and returns `pipe.base.Struct` object with resulting data.
`run()` method is not supposed to perform any I/O, it operates entirely on
in-memory objects. `runQuantum()` is the method (can be re-implemented in
sub-class) where all necessary I/O is performed, it reads all input data
from data butler into memory, calls `run()` method with that data, examines
returned `Struct` object and saves some or all of that data back to data
butler. `runQuantum()` method receives a `ButlerQuantumContext` instance to
facilitate I/O, a `InputQuantizedConnection` instance which defines all
input `lsst.daf.butler.DatasetRef`, and a `OutputQuantizedConnection`
instance which defines all the output `lsst.daf.butler.DatasetRef` for a
single invocation of PipelineTask.
Subclasses must be constructable with exactly the arguments taken by the
PipelineTask base class constructor, but may support other signatures as
well.
Attributes
----------
canMultiprocess : bool, True by default (class attribute)
This class attribute is checked by execution framework, sub-classes
can set it to ``False`` in case task does not support multiprocessing.
Parameters
----------
config : `pex.config.Config`, optional
Configuration for this task (an instance of ``self.ConfigClass``,
which is a task-specific subclass of `PipelineTaskConfig`).
If not specified then it defaults to `self.ConfigClass()`.
log : `lsst.log.Log`, optional
Logger instance whose name is used as a log name prefix, or ``None``
for no prefix.
initInputs : `dict`, optional
A dictionary of objects needed to construct this PipelineTask, with
keys matching the keys of the dictionary returned by
`getInitInputDatasetTypes` and values equivalent to what would be
obtained by calling `Butler.get` with those DatasetTypes and no data
IDs. While it is optional for the base class, subclasses are
permitted to require this argument.
Definition at line 32 of file pipelineTask.py.
def lsst.pipe.base.task.Task.getAllSchemaCatalogs |
( |
|
self | ) |
|
|
inherited |
Get schema catalogs for all tasks in the hierarchy, combining the
results into a single dict.
Returns
-------
schemacatalogs : `dict`
Keys are butler dataset type, values are a empty catalog (an
instance of the appropriate `lsst.afw.table` Catalog type) for all
tasks in the hierarchy, from the top-level task down
through all subtasks.
Notes
-----
This method may be called on any task in the hierarchy; it will return
the same answer, regardless.
The default implementation should always suffice. If your subtask uses
schemas the override `Task.getSchemaCatalogs`, not this method.
Definition at line 204 of file task.py.
204 def getAllSchemaCatalogs(self):
205 """Get schema catalogs for all tasks in the hierarchy, combining the
206 results into a single dict.
210 schemacatalogs : `dict`
211 Keys are butler dataset type, values are a empty catalog (an
212 instance of the appropriate `lsst.afw.table` Catalog type) for all
213 tasks in the hierarchy, from the top-level task down
214 through all subtasks.
218 This method may be called on any task in the hierarchy; it will return
219 the same answer, regardless.
221 The default implementation should always suffice. If your subtask uses
222 schemas the override `Task.getSchemaCatalogs`, not this method.
224 schemaDict = self.getSchemaCatalogs()
225 for subtask
in self._taskDict.values():
226 schemaDict.update(subtask.getSchemaCatalogs())
def lsst.pipe.base.pipelineTask.PipelineTask.run |
( |
|
self, |
|
|
** |
kwargs |
|
) |
| |
Run task algorithm on in-memory data.
This method should be implemented in a subclass. This method will
receive keyword arguments whose names will be the same as names of
connection fields describing input dataset types. Argument values will
be data objects retrieved from data butler. If a dataset type is
configured with ``multiple`` field set to ``True`` then the argument
value will be a list of objects, otherwise it will be a single object.
If the task needs to know its input or output DataIds then it has to
override `runQuantum` method instead.
This method should return a `Struct` whose attributes share the same
name as the connection fields describing output dataset types.
Returns
-------
struct : `Struct`
Struct with attribute names corresponding to output connection
fields
Examples
--------
Typical implementation of this method may look like:
.. code-block:: python
def run(self, input, calib):
# "input", "calib", and "output" are the names of the config
# fields
# Assuming that input/calib datasets are `scalar` they are
# simple objects, do something with inputs and calibs, produce
# output image.
image = self.makeImage(input, calib)
# If output dataset is `scalar` then return object, not list
return Struct(output=image)
Definition at line 88 of file pipelineTask.py.
88 def run(self, **kwargs):
89 """Run task algorithm on in-memory data.
91 This method should be implemented in a subclass. This method will
92 receive keyword arguments whose names will be the same as names of
93 connection fields describing input dataset types. Argument values will
94 be data objects retrieved from data butler. If a dataset type is
95 configured with ``multiple`` field set to ``True`` then the argument
96 value will be a list of objects, otherwise it will be a single object.
98 If the task needs to know its input or output DataIds then it has to
99 override `runQuantum` method instead.
101 This method should return a `Struct` whose attributes share the same
102 name as the connection fields describing output dataset types.
107 Struct with attribute names corresponding to output connection
112 Typical implementation of this method may look like:
114 .. code-block:: python
116 def run(self, input, calib):
117 # "input", "calib", and "output" are the names of the config
120 # Assuming that input/calib datasets are `scalar` they are
121 # simple objects, do something with inputs and calibs, produce
123 image = self.makeImage(input, calib)
125 # If output dataset is `scalar` then return object, not list
126 return Struct(output=image)
129 raise NotImplementedError(
"run() is not implemented")
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)