LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.pipe.base.cmdLineTask.TaskRunner Class Reference

Run a command-line task, using multiprocessing if requested. More...

Inheritance diagram for lsst.pipe.base.cmdLineTask.TaskRunner:
lsst.pipe.base.cmdLineTask.ButlerInitializedTaskRunner

Public Member Functions

def __init__
 Construct a TaskRunner. More...
 
def prepareForMultiProcessing
 Prepare this instance for multiprocessing by removing optional non-picklable elements. More...
 
def run
 Run the task on all targets. More...
 
def makeTask
 Create a Task instance. More...
 
def precall
 Hook for code that should run exactly once, before multiprocessing is invoked. More...
 
def __call__
 Run the Task on a single target. More...
 

Static Public Member Functions

def getTargetList
 Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner. More...
 

Public Attributes

 TaskClass
 
 doReturnResults
 
 config
 
 log
 
 doRaise
 
 clobberConfig
 
 numProcesses
 
 timeout
 

Static Public Attributes

int TIMEOUT = 9999
 

Detailed Description

Run a command-line task, using multiprocessing if requested.

Each command-line task (subclass of CmdLineTask) has a task runner. By default it is this class, but some tasks require a subclass. See the manual "how to write a command-line task" in the pipe_tasks documentation for more information. See CmdLineTask.parseAndRun to see how a task runner is used.

You may use this task runner for your command-line task if your task has a run method that takes exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of this runner for your task's RunnerClass that overrides TaskRunner.getTargetList and possibly TaskRunner.__call__. See TaskRunner.getTargetList for details.

This design matches the common pattern for command-line tasks: the run method takes a single data reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of TaskRunner that calls these additional arguments by name.

Instances of this class must be picklable in order to be compatible with multiprocessing. If multiprocessing is requested (parsedCmd.numProcesses > 1) then run() calls prepareForMultiProcessing to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing then indicate this in your task by setting class variable canMultiprocess=False.

Due to a python bug [1], handling a KeyboardInterrupt properly requires specifying a timeout [2]. This timeout (in sec) can be specified as the "timeout" element in the output from ArgumentParser (the "parsedCmd"), if available, otherwise we use TaskRunner.TIMEOUT_DEFAULT.

[1] http://bugs.python.org/issue8296 [2] http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool)

Definition at line 99 of file cmdLineTask.py.

Constructor & Destructor Documentation

def lsst.pipe.base.cmdLineTask.TaskRunner.__init__ (   self,
  TaskClass,
  parsedCmd,
  doReturnResults = False 
)

Construct a TaskRunner.

Warning
Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may contain non-picklable elements. It certainly contains more data than we need to send to each instance of the task.
Parameters
TaskClassThe class of the task to run
parsedCmdThe parsed command-line arguments, as returned by the task's argument parser's parse_args method.
doReturnResultsShould run return the collected result from each invocation of the task? This is only intended for unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you call it enough times) and it will fail when using multiprocessing if the returned data cannot be pickled.
Exceptions
ImportErrorif multiprocessing requested (and the task supports it) but the multiprocessing library cannot be imported.

Definition at line 130 of file cmdLineTask.py.

131  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
132  """!Construct a TaskRunner
133 
134  @warning Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may
135  contain non-picklable elements. It certainly contains more data than we need to send to each
136  instance of the task.
137 
138  @param TaskClass The class of the task to run
139  @param parsedCmd The parsed command-line arguments, as returned by the task's argument parser's
140  parse_args method.
141  @param doReturnResults Should run return the collected result from each invocation of the task?
142  This is only intended for unit tests and similar use.
143  It can easily exhaust memory (if the task returns enough data and you call it enough times)
144  and it will fail when using multiprocessing if the returned data cannot be pickled.
145 
146  @throws ImportError if multiprocessing requested (and the task supports it)
147  but the multiprocessing library cannot be imported.
148  """
149  self.TaskClass = TaskClass
150  self.doReturnResults = bool(doReturnResults)
151  self.config = parsedCmd.config
152  self.log = parsedCmd.log
153  self.doRaise = bool(parsedCmd.doraise)
154  self.clobberConfig = bool(parsedCmd.clobberConfig)
155  self.numProcesses = int(getattr(parsedCmd, 'processes', 1))
157  self.timeout = getattr(parsedCmd, 'timeout', None)
158  if self.timeout is None or self.timeout <= 0:
159  self.timeout = self.TIMEOUT
160 
161  if self.numProcesses > 1:
162  if not TaskClass.canMultiprocess:
163  self.log.warn("This task does not support multiprocessing; using one process")
164  self.numProcesses = 1
def __init__
Construct a TaskRunner.
Definition: cmdLineTask.py:130

Member Function Documentation

def lsst.pipe.base.cmdLineTask.TaskRunner.__call__ (   self,
  args 
)

Run the Task on a single target.

This default implementation assumes that the 'args' is a tuple containing a data reference and a dict of keyword arguments.

Warning
if you override this method and wish to return something when doReturnResults is false, then it must be picklable to support multiprocessing and it should be small enough that pickling and unpickling do not add excessive overhead.
Parameters
argsArguments for Task.run()
Returns
:
  • None if doReturnResults false
  • A pipe_base Struct containing these fields if doReturnResults true:
    • dataRef: the provided data reference
    • metadata: task metadata after execution of run
    • result: result returned by task run, or None if the task fails

Definition at line 293 of file cmdLineTask.py.

294  def __call__(self, args):
295  """!Run the Task on a single target.
296 
297  This default implementation assumes that the 'args' is a tuple
298  containing a data reference and a dict of keyword arguments.
299 
300  @warning if you override this method and wish to return something when
301  doReturnResults is false, then it must be picklable to support
302  multiprocessing and it should be small enough that pickling and
303  unpickling do not add excessive overhead.
304 
305  @param args Arguments for Task.run()
306 
307  @return:
308  - None if doReturnResults false
309  - A pipe_base Struct containing these fields if doReturnResults true:
310  - dataRef: the provided data reference
311  - metadata: task metadata after execution of run
312  - result: result returned by task run, or None if the task fails
313  """
314  dataRef, kwargs = args
315  task = self.makeTask(args=args)
316  result = None # in case the task fails
317  if self.doRaise:
318  result = task.run(dataRef, **kwargs)
319  else:
320  try:
321  result = task.run(dataRef, **kwargs)
322  except Exception, e:
323  task.log.fatal("Failed on dataId=%s: %s" % (dataRef.dataId, e))
324  if not isinstance(e, TaskError):
325  traceback.print_exc(file=sys.stderr)
326  task.writeMetadata(dataRef)
327 
328  if self.doReturnResults:
329  return Struct(
330  dataRef = dataRef,
331  metadata = task.metadata,
332  result = result,
333  )
def makeTask
Create a Task instance.
Definition: cmdLineTask.py:252
def __call__
Run the Task on a single target.
Definition: cmdLineTask.py:293
def lsst.pipe.base.cmdLineTask.TaskRunner.getTargetList (   parsedCmd,
  kwargs 
)
static

Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.

__call__.

Parameters
parsedCmdthe parsed command object (an argparse.Namespace) returned by ArgumentParser.parse_args.
**kwargsany additional keyword arguments. In the default TaskRunner this is an empty dict, but having it simplifies overriding TaskRunner for tasks whose run method takes additional arguments (see case (1) below).

The default implementation of TaskRunner.getTargetList and TaskRunner.__call__ works for any command-line task whose run method takes exactly one argument: a data reference. Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList and possibly TaskRunner.__call__. There are two cases:

(1) If your command-line task has a run method that takes one data reference followed by additional arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as an argument dict. To make this easier, your overridden version of getTargetList may call TaskRunner.getTargetList with the extra arguments as keyword arguments. For example, the following adds an argument dict containing a single key: "calExpList", whose value is the list of data IDs for the calexp ID argument:

1 \@staticmethod
2 def getTargetList(parsedCmd):
3  return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)

It is equivalent to this slightly longer version:

1 \@staticmethod
2 def getTargetList(parsedCmd):
3  argDict = dict(calExpList=parsedCmd.calexp.idList)
4  return [(dataId, argDict) for dataId in parsedCmd.id.idList]

(2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and TaskRunner.__call__. You may do this however you see fit, so long as TaskRunner.getTargetList returns a list, each of whose elements is sent to TaskRunner.__call__, which runs your task.

Definition at line 210 of file cmdLineTask.py.

211  def getTargetList(parsedCmd, **kwargs):
212  """!Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.\_\_call\_\_.
213 
214  @param parsedCmd the parsed command object (an argparse.Namespace) returned by
215  \ref argumentParser.ArgumentParser.parse_args "ArgumentParser.parse_args".
216  @param **kwargs any additional keyword arguments. In the default TaskRunner
217  this is an empty dict, but having it simplifies overriding TaskRunner for tasks
218  whose run method takes additional arguments (see case (1) below).
219 
220  The default implementation of TaskRunner.getTargetList and TaskRunner.\_\_call\_\_ works for any
221  command-line task whose run method takes exactly one argument: a data reference.
222  Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList
223  and possibly TaskRunner.\_\_call\_\_. There are two cases:
224 
225  (1) If your command-line task has a `run` method that takes one data reference followed by additional
226  arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as
227  an argument dict. To make this easier, your overridden version of getTargetList may call
228  TaskRunner.getTargetList with the extra arguments as keyword arguments. For example,
229  the following adds an argument dict containing a single key: "calExpList", whose value is the list
230  of data IDs for the calexp ID argument:
231 
232  \code
233  \@staticmethod
234  def getTargetList(parsedCmd):
235  return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)
236  \endcode
237 
238  It is equivalent to this slightly longer version:
239 
240  \code
241  \@staticmethod
242  def getTargetList(parsedCmd):
243  argDict = dict(calExpList=parsedCmd.calexp.idList)
244  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
245  \endcode
246 
247  (2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList
248  and TaskRunner.\_\_call\_\_. You may do this however you see fit, so long as TaskRunner.getTargetList
249  returns a list, each of whose elements is sent to TaskRunner.\_\_call\_\_, which runs your task.
250  """
251  return [(ref, kwargs) for ref in parsedCmd.id.refList]
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
Definition: cmdLineTask.py:210
def lsst.pipe.base.cmdLineTask.TaskRunner.makeTask (   self,
  parsedCmd = None,
  args = None 
)

Create a Task instance.

Parameters
[in]parsedCmdparsed command-line options (used for extra task args by some task runners)
[in]argsargs tuple passed to TaskRunner.__call__ (used for extra task arguments by some task runners)

makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None, but it must construct identical Task instances in either case.

Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and TaskRunner.__call__

Definition at line 252 of file cmdLineTask.py.

253  def makeTask(self, parsedCmd=None, args=None):
254  """!Create a Task instance
255 
256  @param[in] parsedCmd parsed command-line options (used for extra task args by some task runners)
257  @param[in] args args tuple passed to TaskRunner.\_\_call\_\_ (used for extra task arguments
258  by some task runners)
259 
260  makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None,
261  but it must construct identical Task instances in either case.
262 
263  Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and
264  TaskRunner.\_\_call\_\_
265  """
266  return self.TaskClass(config=self.config, log=self.log)
def makeTask
Create a Task instance.
Definition: cmdLineTask.py:252
def lsst.pipe.base.cmdLineTask.TaskRunner.precall (   self,
  parsedCmd 
)

Hook for code that should run exactly once, before multiprocessing is invoked.

Must return True if TaskRunner.__call__ should subsequently be called.

Warning
Implementations must take care to ensure that no unpicklable attributes are added to the TaskRunner itself, for compatibility with multiprocessing.

The default implementation writes schemas and configs, or compares them to existing files on disk if present.

Definition at line 267 of file cmdLineTask.py.

268  def precall(self, parsedCmd):
269  """!Hook for code that should run exactly once, before multiprocessing is invoked.
270 
271  Must return True if TaskRunner.\_\_call\_\_ should subsequently be called.
272 
273  @warning Implementations must take care to ensure that no unpicklable attributes are added to
274  the TaskRunner itself, for compatibility with multiprocessing.
275 
276  The default implementation writes schemas and configs, or compares them to existing
277  files on disk if present.
278  """
279  task = self.makeTask(parsedCmd=parsedCmd)
280  if self.doRaise:
281  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig)
282  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfig)
283  else:
284  try:
285  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig)
286  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfig)
287  except Exception, e:
288  task.log.fatal("Failed in task initialization: %s" % e)
289  if not isinstance(e, TaskError):
290  traceback.print_exc(file=sys.stderr)
291  return False
292  return True
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
Definition: cmdLineTask.py:267
def makeTask
Create a Task instance.
Definition: cmdLineTask.py:252
def lsst.pipe.base.cmdLineTask.TaskRunner.prepareForMultiProcessing (   self)

Prepare this instance for multiprocessing by removing optional non-picklable elements.

This is only called if the task is run under multiprocessing.

Definition at line 165 of file cmdLineTask.py.

166  def prepareForMultiProcessing(self):
167  """!Prepare this instance for multiprocessing by removing optional non-picklable elements.
168 
169  This is only called if the task is run under multiprocessing.
170  """
171  self.log = None
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
Definition: cmdLineTask.py:165
def lsst.pipe.base.cmdLineTask.TaskRunner.run (   self,
  parsedCmd 
)

Run the task on all targets.

The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.

Returns
a list of results returned by TaskRunner.__call__, or an empty list if TaskRunner.__call__ is not called (e.g. if TaskRunner.precall returns False). See TaskRunner.__call__ for details.

Definition at line 172 of file cmdLineTask.py.

173  def run(self, parsedCmd):
174  """!Run the task on all targets.
175 
176  The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.
177 
178  @return a list of results returned by TaskRunner.\_\_call\_\_, or an empty list if
179  TaskRunner.\_\_call\_\_ is not called (e.g. if TaskRunner.precall returns `False`).
180  See TaskRunner.\_\_call\_\_ for details.
181  """
182  resultList = []
183  if self.numProcesses > 1:
184  import multiprocessing
186  pool = multiprocessing.Pool(processes=self.numProcesses, maxtasksperchild=1)
187  mapFunc = functools.partial(_runPool, pool, self.timeout)
188  else:
189  pool = None
190  mapFunc = map
191 
192  if self.precall(parsedCmd):
193  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
194  log = parsedCmd.log
195  targetList = self.getTargetList(parsedCmd)
196  if len(targetList) > 0:
197  with profile(profileName, log):
198  # Run the task using self.__call__
199  resultList = mapFunc(self, targetList)
200  else:
201  log.warn("Not running the task because there is no data to process; "
202  "you may preview data using \"--show data\"")
203 
204  if pool is not None:
205  pool.close()
206  pool.join()
207 
208  return resultList
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
Definition: cmdLineTask.py:267
def run
Run the task on all targets.
Definition: cmdLineTask.py:172
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
Definition: cmdLineTask.py:165
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
Definition: cmdLineTask.py:210
def profile
Context manager for profiling with cProfile.
Definition: cmdLineTask.py:66

Member Data Documentation

lsst.pipe.base.cmdLineTask.TaskRunner.clobberConfig

Definition at line 153 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.config

Definition at line 150 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.doRaise

Definition at line 152 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.doReturnResults

Definition at line 149 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.log

Definition at line 151 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.numProcesses

Definition at line 154 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.TaskClass

Definition at line 148 of file cmdLineTask.py.

int lsst.pipe.base.cmdLineTask.TaskRunner.TIMEOUT = 9999
static

Definition at line 129 of file cmdLineTask.py.

lsst.pipe.base.cmdLineTask.TaskRunner.timeout

Definition at line 156 of file cmdLineTask.py.


The documentation for this class was generated from the following file: