LSSTApplications  16.0-10-g9d3e444,16.0-11-g09ed895+3,16.0-11-g12e47bd+4,16.0-11-g9bb73b2+10,16.0-12-g5c924a4+10,16.0-15-g7af1f30,16.0-15-gdd5ca33+2,16.0-16-gf0259e2+1,16.0-17-g31abd91+11,16.0-17-g5cf0468+3,16.0-18-g51a54b3+3,16.0-18-ga4d4bcb+5,16.0-18-gcf94535+2,16.0-19-g9d290d5+2,16.0-2-g0febb12+22,16.0-2-g9d5294e+73,16.0-2-ga8830df+7,16.0-21-g3d035912+2,16.0-26-g8e79609,16.0-28-gfc9ea6c+9,16.0-29-ge8801f9+4,16.0-3-ge00e371+38,16.0-4-g18f3627+17,16.0-4-g5f3a788+21,16.0-4-ga3eb747+11,16.0-4-gabf74b7+33,16.0-4-gb13d127+7,16.0-5-g27fb78a+11,16.0-5-g6a53317+38,16.0-5-gb3f8a4b+91,16.0-51-gbbe9c988+3,16.0-6-g9321be7+5,16.0-6-gcbc7b31+47,16.0-6-gf49912c+33,16.0-7-gd2eeba5+56,16.0-75-gbf7a9a820,16.0-8-g21fd5fe+34,16.0-8-g3a9f023+24,16.0-9-gf3bc169+2,16.0-9-gf5c1f43+12,master-gd73dc1d098+5,w.2019.02
LSSTDataManagementBasePackage
Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.pipe.base.cmdLineTask.TaskRunner Class Reference
Inheritance diagram for lsst.pipe.base.cmdLineTask.TaskRunner:
lsst.ctrl.pool.parallel.BatchTaskRunner lsst.meas.algorithms.debugger.MeasurementDebuggerRunner lsst.pipe.base.cmdLineTask.ButlerInitializedTaskRunner lsst.pipe.base.cmdLineTask.LegacyTaskRunner lsst.pipe.drivers.constructCalibs.CalibTaskRunner lsst.pipe.drivers.multiBandDriver.MultiBandDriverTaskRunner lsst.pipe.drivers.utils.ButlerTaskRunner lsst.pipe.tasks.multiBandUtils.MergeSourcesRunner

Public Member Functions

def __init__ (self, TaskClass, parsedCmd, doReturnResults=False)
 
def prepareForMultiProcessing (self)
 
def run (self, parsedCmd)
 
def makeTask (self, parsedCmd=None, args=None)
 
def precall (self, parsedCmd)
 
def __call__ (self, args)
 
def runTask (self, task, dataRef, kwargs)
 

Static Public Member Functions

def getTargetList (parsedCmd, kwargs)
 

Public Attributes

 TaskClass
 
 doReturnResults
 
 config
 
 log
 
 doRaise
 
 clobberConfig
 
 doBackup
 
 numProcesses
 
 timeout
 

Static Public Attributes

int TIMEOUT = 3600*24*30
 

Detailed Description

Run a command-line task, using `multiprocessing` if requested.

Parameters
----------
TaskClass : `lsst.pipe.base.Task` subclass
    The class of the task to run.
parsedCmd : `argparse.Namespace`
    The parsed command-line arguments, as returned by the task's argument parser's
    `~lsst.pipe.base.ArgumentParser.parse_args` method.

    .. warning::

       Do not store ``parsedCmd``, as this instance is pickled (if multiprocessing) and parsedCmd may
       contain non-picklable elements. It certainly contains more data than we need to send to each
       instance of the task.
doReturnResults : `bool`, optional
    Should run return the collected result from each invocation of the task? This is only intended for
    unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
    call it enough times) and it will fail when using multiprocessing if the returned data cannot be
    pickled.

    Note that even if ``doReturnResults`` is False a struct with a single member "exitStatus" is returned,
    with value 0 or 1 to be returned to the unix shell.

Raises
------
ImportError
    If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be
    imported.

Notes
-----
Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a task runner. By default it is this
class, but some tasks require a subclass. See the manual :ref:`creating-a-command-line-task` for more
information. See `CmdLineTask.parseAndRun` to see how a task runner is used.

You may use this task runner for your command-line task if your task has a runDataRef method that takes
exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of
this runner for your task's ``RunnerClass`` that overrides `TaskRunner.getTargetList` and possibly
`TaskRunner.__call__`. See `TaskRunner.getTargetList` for details.

This design matches the common pattern for command-line tasks: the runDataRef method takes a single data
reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of
`TaskRunner` that calls these additional arguments by name.

Instances of this class must be picklable in order to be compatible with multiprocessing. If
multiprocessing is requested (``parsedCmd.numProcesses > 1``) then `runDataRef` calls
`prepareForMultiProcessing` to jettison optional non-picklable elements. If your task runner is not
compatible with multiprocessing then indicate this in your task by setting class variable
``canMultiprocess=False``.

Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires specifying a timeout`__. This
timeout (in sec) can be specified as the ``timeout`` element in the output from
`~lsst.pipe.base.ArgumentParser` (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`.

By default, we disable "implicit" threading -- ie, as provided by underlying numerical libraries such as
MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns
multiple processes and when multiple users are running on a shared system. Users can override this
behaviour by setting the ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable.

.. __: http://bugs.python.org/issue8296
.. __:  http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

Definition at line 87 of file cmdLineTask.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.base.cmdLineTask.TaskRunner.__init__ (   self,
  TaskClass,
  parsedCmd,
  doReturnResults = False 
)

Definition at line 155 of file cmdLineTask.py.

155  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
156  self.TaskClass = TaskClass
157  self.doReturnResults = bool(doReturnResults)
158  self.config = parsedCmd.config
159  self.log = parsedCmd.log
160  self.doRaise = bool(parsedCmd.doraise)
161  self.clobberConfig = bool(parsedCmd.clobberConfig)
162  self.doBackup = not bool(parsedCmd.noBackupConfig)
163  self.numProcesses = int(getattr(parsedCmd, 'processes', 1))
164 
165  self.timeout = getattr(parsedCmd, 'timeout', None)
166  if self.timeout is None or self.timeout <= 0:
167  self.timeout = self.TIMEOUT
168 
169  if self.numProcesses > 1:
170  if not TaskClass.canMultiprocess:
171  self.log.warn("This task does not support multiprocessing; using one process")
172  self.numProcesses = 1
173 
def __init__(self, minimum, dataRange, Q)

Member Function Documentation

◆ __call__()

def lsst.pipe.base.cmdLineTask.TaskRunner.__call__ (   self,
  args 
)
Run the Task on a single target.

Parameters
----------
args
    Arguments for Task.runDataRef()

Returns
-------
struct : `lsst.pipe.base.Struct`
    Contains these fields if ``doReturnResults`` is `True`:

    - ``dataRef``: the provided data reference.
    - ``metadata``: task metadata after execution of run.
    - ``result``: result returned by task run, or `None` if the task fails.
    - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.

    If ``doReturnResults`` is `False` the struct contains:

    - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.

Notes
-----
This default implementation assumes that the ``args`` is a tuple containing a data reference and a
dict of keyword arguments.

.. warning::

   If you override this method and wish to return something when ``doReturnResults`` is `False`,
   then it must be picklable to support multiprocessing and it should be small enough that pickling
   and unpickling do not add excessive overhead.

Definition at line 341 of file cmdLineTask.py.

341  def __call__(self, args):
342  """Run the Task on a single target.
343 
344  Parameters
345  ----------
346  args
347  Arguments for Task.runDataRef()
348 
349  Returns
350  -------
351  struct : `lsst.pipe.base.Struct`
352  Contains these fields if ``doReturnResults`` is `True`:
353 
354  - ``dataRef``: the provided data reference.
355  - ``metadata``: task metadata after execution of run.
356  - ``result``: result returned by task run, or `None` if the task fails.
357  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
358 
359  If ``doReturnResults`` is `False` the struct contains:
360 
361  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
362 
363  Notes
364  -----
365  This default implementation assumes that the ``args`` is a tuple containing a data reference and a
366  dict of keyword arguments.
367 
368  .. warning::
369 
370  If you override this method and wish to return something when ``doReturnResults`` is `False`,
371  then it must be picklable to support multiprocessing and it should be small enough that pickling
372  and unpickling do not add excessive overhead.
373  """
374  dataRef, kwargs = args
375  if self.log is None:
376  self.log = Log.getDefaultLogger()
377  if hasattr(dataRef, "dataId"):
378  self.log.MDC("LABEL", str(dataRef.dataId))
379  elif isinstance(dataRef, (list, tuple)):
380  self.log.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
381  task = self.makeTask(args=args)
382  result = None # in case the task fails
383  exitStatus = 0 # exit status for the shell
384  if self.doRaise:
385  result = self.runTask(task, dataRef, kwargs)
386  else:
387  try:
388  result = self.runTask(task, dataRef, kwargs)
389  except Exception as e:
390  # The shell exit value will be the number of dataRefs returning
391  # non-zero, so the actual value used here is lost.
392  exitStatus = 1
393 
394  # don't use a try block as we need to preserve the original exception
395  eName = type(e).__name__
396  if hasattr(dataRef, "dataId"):
397  task.log.fatal("Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
398  elif isinstance(dataRef, (list, tuple)):
399  task.log.fatal("Failed on dataIds=[%s]: %s: %s",
400  ", ".join(str(ref.dataId) for ref in dataRef), eName, e)
401  else:
402  task.log.fatal("Failed on dataRef=%s: %s: %s", dataRef, eName, e)
403 
404  if not isinstance(e, TaskError):
405  traceback.print_exc(file=sys.stderr)
406 
407  # Ensure all errors have been logged and aren't hanging around in a buffer
408  sys.stdout.flush()
409  sys.stderr.flush()
410 
411  task.writeMetadata(dataRef)
412 
413  # remove MDC so it does not show up outside of task context
414  self.log.MDCRemove("LABEL")
415 
416  if self.doReturnResults:
417  return Struct(
418  exitStatus=exitStatus,
419  dataRef=dataRef,
420  metadata=task.metadata,
421  result=result,
422  )
423  else:
424  return Struct(
425  exitStatus=exitStatus,
426  )
427 
table::Key< int > type
Definition: Detector.cc:164

◆ getTargetList()

def lsst.pipe.base.cmdLineTask.TaskRunner.getTargetList (   parsedCmd,
  kwargs 
)
static
Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.

Parameters
----------
parsedCmd : `argparse.Namespace`
    The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
kwargs
    Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having
    it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments
    (see case (1) below).

Notes
-----
The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any
command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you
must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly
`TaskRunner.__call__`. There are two cases.

**Case 1**

If your command-line task has a ``runDataRef`` method that takes one data reference followed by
additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional
arguments as an argument dict. To make this easier, your overridden version of
`~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword
arguments. For example, the following adds an argument dict containing a single key: "calExpList",
whose value is the list of data IDs for the calexp ID argument::

    def getTargetList(parsedCmd):
return TaskRunner.getTargetList(
    parsedCmd,
    calExpList=parsedCmd.calexp.idList
)

It is equivalent to this slightly longer version::

    @staticmethod
    def getTargetList(parsedCmd):
argDict = dict(calExpList=parsedCmd.calexp.idList)
return [(dataId, argDict) for dataId in parsedCmd.id.idList]

**Case 2**

If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
`TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList`
returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task.

Definition at line 233 of file cmdLineTask.py.

233  def getTargetList(parsedCmd, **kwargs):
234  """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.
235 
236  Parameters
237  ----------
238  parsedCmd : `argparse.Namespace`
239  The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
240  kwargs
241  Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having
242  it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments
243  (see case (1) below).
244 
245  Notes
246  -----
247  The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any
248  command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you
249  must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly
250  `TaskRunner.__call__`. There are two cases.
251 
252  **Case 1**
253 
254  If your command-line task has a ``runDataRef`` method that takes one data reference followed by
255  additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional
256  arguments as an argument dict. To make this easier, your overridden version of
257  `~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword
258  arguments. For example, the following adds an argument dict containing a single key: "calExpList",
259  whose value is the list of data IDs for the calexp ID argument::
260 
261  def getTargetList(parsedCmd):
262  return TaskRunner.getTargetList(
263  parsedCmd,
264  calExpList=parsedCmd.calexp.idList
265  )
266 
267  It is equivalent to this slightly longer version::
268 
269  @staticmethod
270  def getTargetList(parsedCmd):
271  argDict = dict(calExpList=parsedCmd.calexp.idList)
272  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
273 
274  **Case 2**
275 
276  If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
277  `TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList`
278  returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task.
279  """
280  return [(ref, kwargs) for ref in parsedCmd.id.refList]
281 

◆ makeTask()

def lsst.pipe.base.cmdLineTask.TaskRunner.makeTask (   self,
  parsedCmd = None,
  args = None 
)
Create a Task instance.

Parameters
----------
parsedCmd
    Parsed command-line options (used for extra task args by some task runners).
args
    Args tuple passed to `TaskRunner.__call__` (used for extra task arguments by some task runners).

Notes
-----
``makeTask`` can be called with either the ``parsedCmd`` argument or ``args`` argument set to None,
but it must construct identical Task instances in either case.

Subclasses may ignore this method entirely if they reimplement both `TaskRunner.precall` and
`TaskRunner.__call__`.

Definition at line 282 of file cmdLineTask.py.

282  def makeTask(self, parsedCmd=None, args=None):
283  """Create a Task instance.
284 
285  Parameters
286  ----------
287  parsedCmd
288  Parsed command-line options (used for extra task args by some task runners).
289  args
290  Args tuple passed to `TaskRunner.__call__` (used for extra task arguments by some task runners).
291 
292  Notes
293  -----
294  ``makeTask`` can be called with either the ``parsedCmd`` argument or ``args`` argument set to None,
295  but it must construct identical Task instances in either case.
296 
297  Subclasses may ignore this method entirely if they reimplement both `TaskRunner.precall` and
298  `TaskRunner.__call__`.
299  """
300  return self.TaskClass(config=self.config, log=self.log)
301 

◆ precall()

def lsst.pipe.base.cmdLineTask.TaskRunner.precall (   self,
  parsedCmd 
)
Hook for code that should run exactly once, before multiprocessing.

Notes
-----
Must return True if `TaskRunner.__call__` should subsequently be called.

.. warning::

   Implementations must take care to ensure that no unpicklable attributes are added to the
   TaskRunner itself, for compatibility with multiprocessing.

The default implementation writes package versions, schemas and configs, or compares them to existing
files on disk if present.

Definition at line 312 of file cmdLineTask.py.

312  def precall(self, parsedCmd):
313  """Hook for code that should run exactly once, before multiprocessing.
314 
315  Notes
316  -----
317  Must return True if `TaskRunner.__call__` should subsequently be called.
318 
319  .. warning::
320 
321  Implementations must take care to ensure that no unpicklable attributes are added to the
322  TaskRunner itself, for compatibility with multiprocessing.
323 
324  The default implementation writes package versions, schemas and configs, or compares them to existing
325  files on disk if present.
326  """
327  task = self.makeTask(parsedCmd=parsedCmd)
328 
329  if self.doRaise:
330  self._precallImpl(task, parsedCmd)
331  else:
332  try:
333  self._precallImpl(task, parsedCmd)
334  except Exception as e:
335  task.log.fatal("Failed in task initialization: %s", e)
336  if not isinstance(e, TaskError):
337  traceback.print_exc(file=sys.stderr)
338  return False
339  return True
340 

◆ prepareForMultiProcessing()

def lsst.pipe.base.cmdLineTask.TaskRunner.prepareForMultiProcessing (   self)
Prepare this instance for multiprocessing

Optional non-picklable elements are removed.

This is only called if the task is run under multiprocessing.

Definition at line 174 of file cmdLineTask.py.

174  def prepareForMultiProcessing(self):
175  """Prepare this instance for multiprocessing
176 
177  Optional non-picklable elements are removed.
178 
179  This is only called if the task is run under multiprocessing.
180  """
181  self.log = None
182 

◆ run()

def lsst.pipe.base.cmdLineTask.TaskRunner.run (   self,
  parsedCmd 
)
Run the task on all targets.

Parameters
----------
parsedCmd : `argparse.Namespace`
    Parsed command `argparse.Namespace`.

Returns
-------
resultList : `list`
    A list of results returned by `TaskRunner.__call__`, or an empty list if `TaskRunner.__call__`
    is not called (e.g. if `TaskRunner.precall` returns `False`). See `TaskRunner.__call__`
    for details.

Notes
-----
The task is run under multiprocessing if `TaskRunner.numProcesses` is more than 1; otherwise
processing is serial.

Definition at line 183 of file cmdLineTask.py.

183  def run(self, parsedCmd):
184  """Run the task on all targets.
185 
186  Parameters
187  ----------
188  parsedCmd : `argparse.Namespace`
189  Parsed command `argparse.Namespace`.
190 
191  Returns
192  -------
193  resultList : `list`
194  A list of results returned by `TaskRunner.__call__`, or an empty list if `TaskRunner.__call__`
195  is not called (e.g. if `TaskRunner.precall` returns `False`). See `TaskRunner.__call__`
196  for details.
197 
198  Notes
199  -----
200  The task is run under multiprocessing if `TaskRunner.numProcesses` is more than 1; otherwise
201  processing is serial.
202  """
203  resultList = []
204  disableImplicitThreading() # To prevent thread contention
205  if self.numProcesses > 1:
206  import multiprocessing
207  self.prepareForMultiProcessing()
208  pool = multiprocessing.Pool(processes=self.numProcesses, maxtasksperchild=1)
209  mapFunc = functools.partial(_runPool, pool, self.timeout)
210  else:
211  pool = None
212  mapFunc = map
213 
214  if self.precall(parsedCmd):
215  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
216  log = parsedCmd.log
217  targetList = self.getTargetList(parsedCmd)
218  if len(targetList) > 0:
219  with profile(profileName, log):
220  # Run the task using self.__call__
221  resultList = list(mapFunc(self, targetList))
222  else:
223  log.warn("Not running the task because there is no data to process; "
224  "you may preview data using \"--show data\"")
225 
226  if pool is not None:
227  pool.close()
228  pool.join()
229 
230  return resultList
231 
bool disableImplicitThreading()
Disable threading that has not been set explicitly.
Definition: threads.cc:132
def profile(filename, log=None)
Definition: cmdLineTask.py:49
daf::base::PropertyList * list
Definition: fits.cc:833

◆ runTask()

def lsst.pipe.base.cmdLineTask.TaskRunner.runTask (   self,
  task,
  dataRef,
  kwargs 
)
Make the actual call to `runDataRef` for this task.

Parameters
----------
task : `lsst.pipe.base.CmdLineTask` class
    The class of the task to run.
dataRef
    Butler data reference that contains the data the task will process.
kwargs
    Any additional keyword arguments.  See `TaskRunner.getTargetList` above.

Notes
-----
The default implementation of `TaskRunner.runTask` works for any command-line task which has a
runDataRef method that takes a data reference and an optional set of additional keyword arguments.
This method returns the results generated by the task's `runDataRef` method.

Definition at line 428 of file cmdLineTask.py.

428  def runTask(self, task, dataRef, kwargs):
429  """Make the actual call to `runDataRef` for this task.
430 
431  Parameters
432  ----------
433  task : `lsst.pipe.base.CmdLineTask` class
434  The class of the task to run.
435  dataRef
436  Butler data reference that contains the data the task will process.
437  kwargs
438  Any additional keyword arguments. See `TaskRunner.getTargetList` above.
439 
440  Notes
441  -----
442  The default implementation of `TaskRunner.runTask` works for any command-line task which has a
443  runDataRef method that takes a data reference and an optional set of additional keyword arguments.
444  This method returns the results generated by the task's `runDataRef` method.
445 
446  """
447  return task.runDataRef(dataRef, **kwargs)
448 
449 

Member Data Documentation

◆ clobberConfig

lsst.pipe.base.cmdLineTask.TaskRunner.clobberConfig

Definition at line 161 of file cmdLineTask.py.

◆ config

lsst.pipe.base.cmdLineTask.TaskRunner.config

Definition at line 158 of file cmdLineTask.py.

◆ doBackup

lsst.pipe.base.cmdLineTask.TaskRunner.doBackup

Definition at line 162 of file cmdLineTask.py.

◆ doRaise

lsst.pipe.base.cmdLineTask.TaskRunner.doRaise

Definition at line 160 of file cmdLineTask.py.

◆ doReturnResults

lsst.pipe.base.cmdLineTask.TaskRunner.doReturnResults

Definition at line 157 of file cmdLineTask.py.

◆ log

lsst.pipe.base.cmdLineTask.TaskRunner.log

Definition at line 159 of file cmdLineTask.py.

◆ numProcesses

lsst.pipe.base.cmdLineTask.TaskRunner.numProcesses

Definition at line 163 of file cmdLineTask.py.

◆ TaskClass

lsst.pipe.base.cmdLineTask.TaskRunner.TaskClass

Definition at line 156 of file cmdLineTask.py.

◆ TIMEOUT

int lsst.pipe.base.cmdLineTask.TaskRunner.TIMEOUT = 3600*24*30
static

Definition at line 152 of file cmdLineTask.py.

◆ timeout

lsst.pipe.base.cmdLineTask.TaskRunner.timeout

Definition at line 165 of file cmdLineTask.py.


The documentation for this class was generated from the following file: