1 from __future__
import absolute_import, division
30 from .task
import Task, TaskError
31 from .struct
import Struct
32 from .argumentParser
import ArgumentParser
35 __all__ = [
"CmdLineTask",
"TaskRunner",
"ButlerInitializedTaskRunner"]
38 """Wrapper around function to catch exceptions that don't inherit from Exception
40 Such exceptions aren't caught by multiprocessing, which causes the slave
41 process to crash and you end up hitting the timeout.
49 cls, exc, tb = sys.exc_info()
51 log.warn(
"Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc()))
52 raise Exception(
"Unhandled exception: %s (%s)" % (cls.__name__, exc))
54 def _runPool(pool, timeout, function, iterable):
55 """Wrapper around pool.map_async, to handle timeout
57 This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
58 http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
60 Further wraps the function in _poolFunctionWrapper to catch exceptions
61 that don't inherit from Exception.
63 return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
65 @contextlib.contextmanager
67 """!Context manager for profiling with cProfile
69 @param filename filename to which to write profile (profiling disabled if None or empty)
70 @param log log object for logging the profile operations
72 If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
73 it returns None), which allows additional control over profiling. You can obtain this using
74 the "as" clause, e.g.:
76 with profile(filename) as prof:
79 The output cumulative profile can be printed with a command-line like:
81 python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
87 from cProfile
import Profile
90 log.info(
"Enabling cProfile profiling")
94 profile.dump_stats(filename)
96 log.info(
"cProfile stats written to %s" % filename)
100 """!Run a command-line task, using multiprocessing if requested.
102 Each command-line task (subclass of CmdLineTask) has a task runner. By default it is
103 this class, but some tasks require a subclass. See the manual "how to write a command-line task"
104 in the pipe_tasks documentation for more information.
105 See CmdLineTask.parseAndRun to see how a task runner is used.
107 You may use this task runner for your command-line task if your task has a run method
108 that takes exactly one argument: a butler data reference. Otherwise you must
109 provide a task-specific subclass of this runner for your task's `RunnerClass`
110 that overrides TaskRunner.getTargetList and possibly TaskRunner.\_\_call\_\_.
111 See TaskRunner.getTargetList for details.
113 This design matches the common pattern for command-line tasks: the run method takes a single
114 data reference, of some suitable name. Additional arguments are rare, and if present, require
115 a subclass of TaskRunner that calls these additional arguments by name.
117 Instances of this class must be picklable in order to be compatible with multiprocessing.
118 If multiprocessing is requested (parsedCmd.numProcesses > 1) then run() calls prepareForMultiProcessing
119 to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing
120 then indicate this in your task by setting class variable canMultiprocess=False.
122 Due to a python bug [1], handling a KeyboardInterrupt properly requires specifying a timeout [2]. This
123 timeout (in sec) can be specified as the "timeout" element in the output from ArgumentParser
124 (the "parsedCmd"), if available, otherwise we use TaskRunner.TIMEOUT_DEFAULT.
126 [1] http://bugs.python.org/issue8296
127 [2] http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool)
130 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
131 """!Construct a TaskRunner
133 @warning Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may
134 contain non-picklable elements. It certainly contains more data than we need to send to each
135 instance of the task.
137 @param TaskClass The class of the task to run
138 @param parsedCmd The parsed command-line arguments, as returned by the task's argument parser's
140 @param doReturnResults Should run return the collected result from each invocation of the task?
141 This is only intended for unit tests and similar use.
142 It can easily exhaust memory (if the task returns enough data and you call it enough times)
143 and it will fail when using multiprocessing if the returned data cannot be pickled.
145 @throws ImportError if multiprocessing requested (and the task supports it)
146 but the multiprocessing library cannot be imported.
156 self.
timeout = getattr(parsedCmd,
'timeout',
None)
161 if not TaskClass.canMultiprocess:
162 self.log.warn(
"This task does not support multiprocessing; using one process")
166 """!Prepare this instance for multiprocessing by removing optional non-picklable elements.
168 This is only called if the task is run under multiprocessing.
173 """!Run the task on all targets.
175 The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.
177 @return a list of results returned by TaskRunner.\_\_call\_\_, or an empty list if
178 TaskRunner.\_\_call\_\_ is not called (e.g. if TaskRunner.precall returns `False`).
179 See TaskRunner.\_\_call\_\_ for details.
182 import multiprocessing
184 pool = multiprocessing.Pool(processes=self.
numProcesses, maxtasksperchild=1)
185 mapFunc = functools.partial(_runPool, pool, self.
timeout)
191 profileName = parsedCmd.profile
if hasattr(parsedCmd,
"profile")
else None
192 log = parsedCmd.log
if hasattr(parsedCmd,
"log")
else None
193 with
profile(profileName, log):
207 """!Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.\_\_call\_\_.
209 @param parsedCmd the parsed command object (an argparse.Namespace) returned by
210 \ref argumentParser.ArgumentParser.parse_args "ArgumentParser.parse_args".
211 @param **kwargs any additional keyword arguments. In the default TaskRunner
212 this is an empty dict, but having it simplifies overriding TaskRunner for tasks
213 whose run method takes additional arguments (see case (1) below).
215 The default implementation of TaskRunner.getTargetList and TaskRunner.\_\_call\_\_ works for any
216 command-line task whose run method takes exactly one argument: a data reference.
217 Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList
218 and possibly TaskRunner.\_\_call\_\_. There are two cases:
220 (1) If your command-line task has a `run` method that takes one data reference followed by additional
221 arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as
222 an argument dict. To make this easier, your overridden version of getTargetList may call
223 TaskRunner.getTargetList with the extra arguments as keyword arguments. For example,
224 the following adds an argument dict containing a single key: "calExpList", whose value is the list
225 of data IDs for the calexp ID argument:
229 def getTargetList(parsedCmd):
230 return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)
233 It is equivalent to this slightly longer version:
237 def getTargetList(parsedCmd):
238 argDict = dict(calExpList=parsedCmd.calexp.idList)
239 return [(dataId, argDict) for dataId in parsedCmd.id.idList]
242 (2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList
243 and TaskRunner.\_\_call\_\_. You may do this however you see fit, so long as TaskRunner.getTargetList
244 returns a list, each of whose elements is sent to TaskRunner.\_\_call\_\_, which runs your task.
246 return [(ref, kwargs)
for ref
in parsedCmd.id.refList]
249 """!Create a Task instance
251 @param[in] parsedCmd parsed command-line options (used for extra task args by some task runners)
252 @param[in] args args tuple passed to TaskRunner.\_\_call\_\_ (used for extra task arguments
253 by some task runners)
255 makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None,
256 but it must construct identical Task instances in either case.
258 Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and
259 TaskRunner.\_\_call\_\_
264 """!Hook for code that should run exactly once, before multiprocessing is invoked.
266 Must return True if TaskRunner.\_\_call\_\_ should subsequently be called.
268 @warning Implementations must take care to ensure that no unpicklable attributes are added to
269 the TaskRunner itself, for compatibility with multiprocessing.
271 The default implementation writes schemas and configs, or compares them to existing
272 files on disk if present.
274 task = self.
makeTask(parsedCmd=parsedCmd)
276 task.writeConfig(parsedCmd.butler, clobber=self.
clobberConfig)
277 task.writeSchemas(parsedCmd.butler, clobber=self.
clobberConfig)
280 task.writeConfig(parsedCmd.butler, clobber=self.
clobberConfig)
281 task.writeSchemas(parsedCmd.butler, clobber=self.
clobberConfig)
283 task.log.fatal(
"Failed in task initialization: %s" % e)
284 if not isinstance(e, TaskError):
285 traceback.print_exc(file=sys.stderr)
290 """!Run the Task on a single target.
292 This default implementation assumes that the 'args' is a tuple
293 containing a data reference and a dict of keyword arguments.
295 @warning if you override this method and wish to return something when
296 doReturnResults is false, then it must be picklable to support
297 multiprocessing and it should be small enough that pickling and
298 unpickling do not add excessive overhead.
300 @param args Arguments for Task.run()
303 - None if doReturnResults false
304 - A pipe_base Struct containing these fields if doReturnResults true:
305 - dataRef: the provided data reference
306 - metadata: task metadata after execution of run
307 - result: result returned by task run, or None if the task fails
309 dataRef, kwargs = args
313 result = task.run(dataRef, **kwargs)
316 result = task.run(dataRef, **kwargs)
318 task.log.fatal(
"Failed on dataId=%s: %s" % (dataRef.dataId, e))
319 if not isinstance(e, TaskError):
320 traceback.print_exc(file=sys.stderr)
321 task.writeMetadata(dataRef)
326 metadata = task.metadata,
331 """!A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to
335 """!A variant of the base version that passes a butler argument to the task's constructor
337 @param[in] parsedCmd parsed command-line options, as returned by the argument parser;
338 if specified then args is ignored
339 @param[in] args other arguments; if parsedCmd is None then this must be specified
341 @throw RuntimeError if parsedCmd and args are both None
343 if parsedCmd
is not None:
344 butler = parsedCmd.butler
345 elif args
is not None:
346 dataRef, kwargs = args
347 butler = dataRef.butlerSubset.butler
349 raise RuntimeError(
"parsedCmd or args must be specified")
353 """!Base class for command-line tasks: tasks that may be executed from the command line
355 See \ref pipeBase_introduction "pipe_base introduction" to learn what tasks are,
356 and \ref pipeTasks_writeCmdLineTask "how to write a command-line task" for more information
357 about writing command-line tasks.
358 If the second link is broken (as it will be before the documentation is cross-linked)
359 then look at the main page of pipe_tasks documentation for a link.
361 Subclasses must specify the following class variables:
362 * ConfigClass: configuration class for your task (a subclass of \ref lsst.pex.config.config.Config
363 "lsst.pex.config.Config", or if your task needs no configuration, then
364 \ref lsst.pex.config.config.Config "lsst.pex.config.Config" itself)
365 * _DefaultName: default name used for this task (a str)
367 Subclasses may also specify the following class variables:
368 * RunnerClass: a task runner class. The default is TaskRunner, which works for any task
369 with a run method that takes exactly one argument: a data reference. If your task does
370 not meet this requirement then you must supply a variant of TaskRunner; see TaskRunner
371 for more information.
372 * canMultiprocess: the default is True; set False if your task does not support multiprocessing.
374 Subclasses must specify a method named "run":
375 - By default `run` accepts a single butler data reference, but you can specify an alternate task runner
376 (subclass of TaskRunner) as the value of class variable `RunnerClass` if your run method needs
378 - `run` is expected to return its data in a Struct. This provides safety for evolution of the task
379 since new values may be added without harming existing code.
380 - The data returned by `run` must be picklable if your task is to support multiprocessing.
382 RunnerClass = TaskRunner
383 canMultiprocess =
True
387 """!A hook to allow a task to change the values of its config *after* the camera-specific
388 overrides are loaded but before any command-line overrides are applied.
390 This is necessary in some cases because the camera-specific overrides may retarget subtasks,
391 wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
393 @warning This is called by CmdLineTask.parseAndRun; other ways of constructing a config
394 will not apply these overrides.
396 @param[in] cls the class object
397 @param[in] config task configuration (an instance of cls.ConfigClass)
402 def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
403 """!Parse an argument list and run the command
405 Calling this method with no arguments specified is the standard way to run a command-line task
406 from the command line. For an example see pipe_tasks `bin/makeSkyMap.py` or almost any other
407 file in that directory.
409 @param cls the class object
410 @param args list of command-line arguments; if `None` use sys.argv
411 @param config config for task (instance of pex_config Config); if `None` use cls.ConfigClass()
412 @param log log (instance of lsst.pex.logging.Log); if `None` use the default log
413 @param doReturnResults Return the collected results from each invocation of the task?
414 This is only intended for unit tests and similar use.
415 It can easily exhaust memory (if the task returns enough data and you call it enough times)
416 and it will fail when using multiprocessing if the returned data cannot be pickled.
418 @return a Struct containing:
419 - argumentParser: the argument parser
420 - parsedCmd: the parsed command returned by the argument parser's parse_args method
421 - taskRunner: the task runner used to run the task (an instance of cls.RunnerClass)
422 - resultList: results returned by the task runner's run method, one entry per invocation.
423 This will typically be a list of `None` unless doReturnResults is `True`;
424 see cls.RunnerClass (TaskRunner by default) for more information.
426 argumentParser = cls._makeArgumentParser()
428 config = cls.ConfigClass()
429 parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
430 taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
431 resultList = taskRunner.run(parsedCmd)
433 argumentParser = argumentParser,
434 parsedCmd = parsedCmd,
435 taskRunner = taskRunner,
436 resultList = resultList,
441 """!Create and return an argument parser
443 @param[in] cls the class object
444 @return the argument parser for this task.
446 By default this returns an ArgumentParser with one ID argument named `--id` of dataset type "raw".
448 Your task subclass may need to override this method to change the dataset type or data ref level,
449 or to add additional data ID arguments. If you add additional data ID arguments or your task's
450 run method takes more than a single data reference then you will also have to provide a task-specific
451 task runner (see TaskRunner for more information).
453 parser = ArgumentParser(name=cls._DefaultName)
454 parser.add_id_argument(name=
"--id", datasetType=
"raw", help=
"data ID, e.g. --id visit=12345 ccd=1,2")
458 """!Write the configuration used for processing the data, or check that an existing
459 one is equal to the new one if present.
461 @param[in] butler data butler used to write the config.
462 The config is written to dataset type self._getConfigName()
463 @param[in] clobber a boolean flag that controls what happens if a config already has been saved:
464 - True: overwrite the existing config
465 - False: raise TaskError if this config does not match the existing config
468 if configName
is None:
471 butler.put(self.config, configName, doBackup=
True)
472 elif butler.datasetExists(configName):
474 oldConfig = butler.get(configName, immediate=
True)
475 output =
lambda msg: self.log.fatal(
"Comparing configuration: " + msg)
476 if not self.config.compare(oldConfig, shortcut=
False, output=output):
478 (
"Config does not match existing task config %r on disk; tasks configurations " + \
479 "must be consistent within the same output repo (override with --clobber-config)") % \
482 butler.put(self.config, configName)
485 """!Write the schemas returned by \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs"
487 @param[in] butler data butler used to write the schema.
488 Each schema is written to the dataset type specified as the key in the dict returned by
489 \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs".
490 @param[in] clobber a boolean flag that controls what happens if a schema already has been saved:
491 - True: overwrite the existing schema
492 - False: raise TaskError if this schema does not match the existing schema
494 @warning if clobber is False and an existing schema does not match a current schema,
495 then some schemas may have been saved successfully and others may not, and there is no easy way to
498 for dataset, catalog
in self.getAllSchemaCatalogs().iteritems():
499 schemaDataset = dataset +
"_schema"
501 butler.put(catalog, schemaDataset, doBackup=
True)
502 elif butler.datasetExists(schemaDataset):
503 oldSchema = butler.get(schemaDataset, immediate=
True).getSchema()
504 if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
506 (
"New schema does not match schema %r on disk; schemas must be " + \
507 " consistent within the same output repo (override with --clobber-config)") % \
510 butler.put(catalog, schemaDataset)
513 """!Write the metadata produced from processing the data
515 @param[in] dataRef butler data reference used to write the metadata.
516 The metadata is written to dataset type self._getMetadataName()
520 if metadataName
is not None:
521 dataRef.put(self.getFullMetadata(), metadataName)
523 self.log.warn(
"Could not persist metadata for dataId=%s: %s" % (dataRef.dataId, e,))
526 """!Return the name of the config dataset type, or None if config is not to be persisted
528 @note The name may depend on the config; that is why this is not a class method.
530 return self._DefaultName +
"_config"
533 """!Return the name of the metadata dataset type, or None if metadata is not to be persisted
535 @note The name may depend on the config; that is why this is not a class method.
537 return self._DefaultName +
"_metadata"
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
def __init__
Construct a TaskRunner.
def makeTask
Create a Task instance.
def _makeArgumentParser
Create and return an argument parser.
def makeTask
A variant of the base version that passes a butler argument to the task's constructor.
def run
Run the task on all targets.
def __call__
Run the Task on a single target.
def writeConfig
Write the configuration used for processing the data, or check that an existing one is equal to the n...
def _getConfigName
Return the name of the config dataset type, or None if config is not to be persisted.
def applyOverrides
A hook to allow a task to change the values of its config after the camera-specific overrides are loa...
def parseAndRun
Parse an argument list and run the command.
def _getMetadataName
Return the name of the metadata dataset type, or None if metadata is not to be persisted.
def writeMetadata
Write the metadata produced from processing the data.
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to their construc...
Run a command-line task, using multiprocessing if requested.
def profile
Context manager for profiling with cProfile.
def writeSchemas
Write the schemas returned by getAllSchemaCatalogs.
Base class for command-line tasks: tasks that may be executed from the command line.