Run a command-line task, using `multiprocessing` if requested.
Parameters
----------
TaskClass : `lsst.pipe.base.Task` subclass
The class of the task to run.
parsedCmd : `argparse.Namespace`
The parsed command-line arguments, as returned by the task's argument parser's
`~lsst.pipe.base.ArgumentParser.parse_args` method.
.. warning::
Do not store ``parsedCmd``, as this instance is pickled (if multiprocessing) and parsedCmd may
contain non-picklable elements. It certainly contains more data than we need to send to each
instance of the task.
doReturnResults : `bool`, optional
Should run return the collected result from each invocation of the task? This is only intended for
unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
call it enough times) and it will fail when using multiprocessing if the returned data cannot be
pickled.
Note that even if ``doReturnResults`` is False a struct with a single member "exitStatus" is returned,
with value 0 or 1 to be returned to the unix shell.
Raises
------
ImportError
If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be
imported.
Notes
-----
Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a task runner. By default it is this
class, but some tasks require a subclass. See the manual :ref:`creating-a-command-line-task` for more
information. See `CmdLineTask.parseAndRun` to see how a task runner is used.
You may use this task runner for your command-line task if your task has a runDataRef method that takes
exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of
this runner for your task's ``RunnerClass`` that overrides `TaskRunner.getTargetList` and possibly
`TaskRunner.__call__`. See `TaskRunner.getTargetList` for details.
This design matches the common pattern for command-line tasks: the runDataRef method takes a single data
reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of
`TaskRunner` that calls these additional arguments by name.
Instances of this class must be picklable in order to be compatible with multiprocessing. If
multiprocessing is requested (``parsedCmd.numProcesses > 1``) then `runDataRef` calls
`prepareForMultiProcessing` to jettison optional non-picklable elements. If your task runner is not
compatible with multiprocessing then indicate this in your task by setting class variable
``canMultiprocess=False``.
Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires specifying a timeout`__. This
timeout (in sec) can be specified as the ``timeout`` element in the output from
`~lsst.pipe.base.ArgumentParser` (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`.
By default, we disable "implicit" threading -- ie, as provided by underlying numerical libraries such as
MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns
multiple processes and when multiple users are running on a shared system. Users can override this
behaviour by setting the ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable.
.. __: http://bugs.python.org/issue8296
.. __: http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Definition at line 87 of file cmdLineTask.py.
def lsst.pipe.base.cmdLineTask.TaskRunner.getTargetList |
( |
|
parsedCmd, |
|
|
|
kwargs |
|
) |
| |
|
static |
Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.
Parameters
----------
parsedCmd : `argparse.Namespace`
The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
kwargs
Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having
it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments
(see case (1) below).
Notes
-----
The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any
command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you
must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly
`TaskRunner.__call__`. There are two cases.
**Case 1**
If your command-line task has a ``runDataRef`` method that takes one data reference followed by
additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional
arguments as an argument dict. To make this easier, your overridden version of
`~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword
arguments. For example, the following adds an argument dict containing a single key: "calExpList",
whose value is the list of data IDs for the calexp ID argument::
def getTargetList(parsedCmd):
return TaskRunner.getTargetList(
parsedCmd,
calExpList=parsedCmd.calexp.idList
)
It is equivalent to this slightly longer version::
@staticmethod
def getTargetList(parsedCmd):
argDict = dict(calExpList=parsedCmd.calexp.idList)
return [(dataId, argDict) for dataId in parsedCmd.id.idList]
**Case 2**
If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
`TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList`
returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task.
Definition at line 233 of file cmdLineTask.py.
233 def getTargetList(parsedCmd, **kwargs):
234 """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`. 238 parsedCmd : `argparse.Namespace` 239 The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`. 241 Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having 242 it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments 243 (see case (1) below). 247 The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any 248 command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you 249 must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly 250 `TaskRunner.__call__`. There are two cases. 254 If your command-line task has a ``runDataRef`` method that takes one data reference followed by 255 additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional 256 arguments as an argument dict. To make this easier, your overridden version of 257 `~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword 258 arguments. For example, the following adds an argument dict containing a single key: "calExpList", 259 whose value is the list of data IDs for the calexp ID argument:: 261 def getTargetList(parsedCmd): 262 return TaskRunner.getTargetList( 264 calExpList=parsedCmd.calexp.idList 267 It is equivalent to this slightly longer version:: 270 def getTargetList(parsedCmd): 271 argDict = dict(calExpList=parsedCmd.calexp.idList) 272 return [(dataId, argDict) for dataId in parsedCmd.id.idList] 276 If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and 277 `TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList` 278 returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task. 280 return [(ref, kwargs)
for ref
in parsedCmd.id.refList]
def lsst.pipe.base.cmdLineTask.TaskRunner.precall |
( |
|
self, |
|
|
|
parsedCmd |
|
) |
| |
Hook for code that should run exactly once, before multiprocessing.
Notes
-----
Must return True if `TaskRunner.__call__` should subsequently be called.
.. warning::
Implementations must take care to ensure that no unpicklable attributes are added to the
TaskRunner itself, for compatibility with multiprocessing.
The default implementation writes package versions, schemas and configs, or compares them to existing
files on disk if present.
Definition at line 312 of file cmdLineTask.py.
312 def precall(self, parsedCmd):
313 """Hook for code that should run exactly once, before multiprocessing. 317 Must return True if `TaskRunner.__call__` should subsequently be called. 321 Implementations must take care to ensure that no unpicklable attributes are added to the 322 TaskRunner itself, for compatibility with multiprocessing. 324 The default implementation writes package versions, schemas and configs, or compares them to existing 325 files on disk if present. 327 task = self.makeTask(parsedCmd=parsedCmd)
330 self._precallImpl(task, parsedCmd)
333 self._precallImpl(task, parsedCmd)
334 except Exception
as e:
335 task.log.fatal(
"Failed in task initialization: %s", e)
336 if not isinstance(e, TaskError):
337 traceback.print_exc(file=sys.stderr)