1 from __future__
import absolute_import, division
30 from .task
import Task, TaskError
31 from .struct
import Struct
32 from .argumentParser
import ArgumentParser
35 __all__ = [
"CmdLineTask",
"TaskRunner",
"ButlerInitializedTaskRunner"]
38 """Wrapper around function to catch exceptions that don't inherit from Exception
40 Such exceptions aren't caught by multiprocessing, which causes the slave
41 process to crash and you end up hitting the timeout.
49 cls, exc, tb = sys.exc_info()
51 log.warn(
"Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc()))
52 raise Exception(
"Unhandled exception: %s (%s)" % (cls.__name__, exc))
54 def _runPool(pool, timeout, function, iterable):
55 """Wrapper around pool.map_async, to handle timeout
57 This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
58 http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
60 Further wraps the function in _poolFunctionWrapper to catch exceptions
61 that don't inherit from Exception.
63 return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
65 @contextlib.contextmanager
67 """!Context manager for profiling with cProfile
69 @param filename filename to which to write profile (profiling disabled if None or empty)
70 @param log log object for logging the profile operations
72 If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
73 it returns None), which allows additional control over profiling. You can obtain this using
74 the "as" clause, e.g.:
76 with profile(filename) as prof:
79 The output cumulative profile can be printed with a command-line like:
81 python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
87 from cProfile
import Profile
90 log.info(
"Enabling cProfile profiling")
94 profile.dump_stats(filename)
96 log.info(
"cProfile stats written to %s" % filename)
100 """!Run a command-line task, using multiprocessing if requested.
102 Each command-line task (subclass of CmdLineTask) has a task runner. By default it is
103 this class, but some tasks require a subclass. See the manual "how to write a command-line task"
104 in the pipe_tasks documentation for more information.
105 See CmdLineTask.parseAndRun to see how a task runner is used.
107 You may use this task runner for your command-line task if your task has a run method
108 that takes exactly one argument: a butler data reference. Otherwise you must
109 provide a task-specific subclass of this runner for your task's `RunnerClass`
110 that overrides TaskRunner.getTargetList and possibly TaskRunner.\_\_call\_\_.
111 See TaskRunner.getTargetList for details.
113 This design matches the common pattern for command-line tasks: the run method takes a single
114 data reference, of some suitable name. Additional arguments are rare, and if present, require
115 a subclass of TaskRunner that calls these additional arguments by name.
117 Instances of this class must be picklable in order to be compatible with multiprocessing.
118 If multiprocessing is requested (parsedCmd.numProcesses > 1) then run() calls prepareForMultiProcessing
119 to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing
120 then indicate this in your task by setting class variable canMultiprocess=False.
122 Due to a python bug [1], handling a KeyboardInterrupt properly requires specifying a timeout [2]. This
123 timeout (in sec) can be specified as the "timeout" element in the output from ArgumentParser
124 (the "parsedCmd"), if available, otherwise we use TaskRunner.TIMEOUT_DEFAULT.
126 [1] http://bugs.python.org/issue8296
127 [2] http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool)
130 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
131 """!Construct a TaskRunner
133 @warning Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may
134 contain non-picklable elements. It certainly contains more data than we need to send to each
135 instance of the task.
137 @param TaskClass The class of the task to run
138 @param parsedCmd The parsed command-line arguments, as returned by the task's argument parser's
140 @param doReturnResults Should run return the collected result from each invocation of the task?
141 This is only intended for unit tests and similar use.
142 It can easily exhaust memory (if the task returns enough data and you call it enough times)
143 and it will fail when using multiprocessing if the returned data cannot be pickled.
145 @throws ImportError if multiprocessing requested (and the task supports it)
146 but the multiprocessing library cannot be imported.
156 self.
timeout = getattr(parsedCmd,
'timeout',
None)
161 if not TaskClass.canMultiprocess:
162 self.log.warn(
"This task does not support multiprocessing; using one process")
166 """!Prepare this instance for multiprocessing by removing optional non-picklable elements.
168 This is only called if the task is run under multiprocessing.
173 """!Run the task on all targets.
175 The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.
177 @return a list of results returned by TaskRunner.\_\_call\_\_, or an empty list if
178 TaskRunner.\_\_call\_\_ is not called (e.g. if TaskRunner.precall returns `False`).
179 See TaskRunner.\_\_call\_\_ for details.
183 import multiprocessing
185 pool = multiprocessing.Pool(processes=self.
numProcesses, maxtasksperchild=1)
186 mapFunc = functools.partial(_runPool, pool, self.
timeout)
192 profileName = parsedCmd.profile
if hasattr(parsedCmd,
"profile")
else None
195 if len(targetList) > 0:
196 with
profile(profileName, log):
198 resultList = mapFunc(self, targetList)
200 log.warn(
"Not running the task because there is no data to process; "
201 "you may preview data using \"--show data\"")
211 """!Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.\_\_call\_\_.
213 @param parsedCmd the parsed command object (an argparse.Namespace) returned by
214 \ref argumentParser.ArgumentParser.parse_args "ArgumentParser.parse_args".
215 @param **kwargs any additional keyword arguments. In the default TaskRunner
216 this is an empty dict, but having it simplifies overriding TaskRunner for tasks
217 whose run method takes additional arguments (see case (1) below).
219 The default implementation of TaskRunner.getTargetList and TaskRunner.\_\_call\_\_ works for any
220 command-line task whose run method takes exactly one argument: a data reference.
221 Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList
222 and possibly TaskRunner.\_\_call\_\_. There are two cases:
224 (1) If your command-line task has a `run` method that takes one data reference followed by additional
225 arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as
226 an argument dict. To make this easier, your overridden version of getTargetList may call
227 TaskRunner.getTargetList with the extra arguments as keyword arguments. For example,
228 the following adds an argument dict containing a single key: "calExpList", whose value is the list
229 of data IDs for the calexp ID argument:
233 def getTargetList(parsedCmd):
234 return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)
237 It is equivalent to this slightly longer version:
241 def getTargetList(parsedCmd):
242 argDict = dict(calExpList=parsedCmd.calexp.idList)
243 return [(dataId, argDict) for dataId in parsedCmd.id.idList]
246 (2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList
247 and TaskRunner.\_\_call\_\_. You may do this however you see fit, so long as TaskRunner.getTargetList
248 returns a list, each of whose elements is sent to TaskRunner.\_\_call\_\_, which runs your task.
250 return [(ref, kwargs)
for ref
in parsedCmd.id.refList]
253 """!Create a Task instance
255 @param[in] parsedCmd parsed command-line options (used for extra task args by some task runners)
256 @param[in] args args tuple passed to TaskRunner.\_\_call\_\_ (used for extra task arguments
257 by some task runners)
259 makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None,
260 but it must construct identical Task instances in either case.
262 Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and
263 TaskRunner.\_\_call\_\_
268 """!Hook for code that should run exactly once, before multiprocessing is invoked.
270 Must return True if TaskRunner.\_\_call\_\_ should subsequently be called.
272 @warning Implementations must take care to ensure that no unpicklable attributes are added to
273 the TaskRunner itself, for compatibility with multiprocessing.
275 The default implementation writes schemas and configs, or compares them to existing
276 files on disk if present.
278 task = self.
makeTask(parsedCmd=parsedCmd)
280 task.writeConfig(parsedCmd.butler, clobber=self.
clobberConfig)
281 task.writeSchemas(parsedCmd.butler, clobber=self.
clobberConfig)
284 task.writeConfig(parsedCmd.butler, clobber=self.
clobberConfig)
285 task.writeSchemas(parsedCmd.butler, clobber=self.
clobberConfig)
287 task.log.fatal(
"Failed in task initialization: %s" % e)
288 if not isinstance(e, TaskError):
289 traceback.print_exc(file=sys.stderr)
294 """!Run the Task on a single target.
296 This default implementation assumes that the 'args' is a tuple
297 containing a data reference and a dict of keyword arguments.
299 @warning if you override this method and wish to return something when
300 doReturnResults is false, then it must be picklable to support
301 multiprocessing and it should be small enough that pickling and
302 unpickling do not add excessive overhead.
304 @param args Arguments for Task.run()
307 - None if doReturnResults false
308 - A pipe_base Struct containing these fields if doReturnResults true:
309 - dataRef: the provided data reference
310 - metadata: task metadata after execution of run
311 - result: result returned by task run, or None if the task fails
313 dataRef, kwargs = args
317 result = task.run(dataRef, **kwargs)
320 result = task.run(dataRef, **kwargs)
322 task.log.fatal(
"Failed on dataId=%s: %s" % (dataRef.dataId, e))
323 if not isinstance(e, TaskError):
324 traceback.print_exc(file=sys.stderr)
325 task.writeMetadata(dataRef)
330 metadata = task.metadata,
335 """!A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to
339 """!A variant of the base version that passes a butler argument to the task's constructor
341 @param[in] parsedCmd parsed command-line options, as returned by the argument parser;
342 if specified then args is ignored
343 @param[in] args other arguments; if parsedCmd is None then this must be specified
345 @throw RuntimeError if parsedCmd and args are both None
347 if parsedCmd
is not None:
348 butler = parsedCmd.butler
349 elif args
is not None:
350 dataRef, kwargs = args
351 butler = dataRef.butlerSubset.butler
353 raise RuntimeError(
"parsedCmd or args must be specified")
357 """!Base class for command-line tasks: tasks that may be executed from the command line
359 See \ref pipeBase_introduction "pipe_base introduction" to learn what tasks are,
360 and \ref pipeTasks_writeCmdLineTask "how to write a command-line task" for more information
361 about writing command-line tasks.
362 If the second link is broken (as it will be before the documentation is cross-linked)
363 then look at the main page of pipe_tasks documentation for a link.
365 Subclasses must specify the following class variables:
366 * ConfigClass: configuration class for your task (a subclass of \ref lsst.pex.config.config.Config
367 "lsst.pex.config.Config", or if your task needs no configuration, then
368 \ref lsst.pex.config.config.Config "lsst.pex.config.Config" itself)
369 * _DefaultName: default name used for this task (a str)
371 Subclasses may also specify the following class variables:
372 * RunnerClass: a task runner class. The default is TaskRunner, which works for any task
373 with a run method that takes exactly one argument: a data reference. If your task does
374 not meet this requirement then you must supply a variant of TaskRunner; see TaskRunner
375 for more information.
376 * canMultiprocess: the default is True; set False if your task does not support multiprocessing.
378 Subclasses must specify a method named "run":
379 - By default `run` accepts a single butler data reference, but you can specify an alternate task runner
380 (subclass of TaskRunner) as the value of class variable `RunnerClass` if your run method needs
382 - `run` is expected to return its data in a Struct. This provides safety for evolution of the task
383 since new values may be added without harming existing code.
384 - The data returned by `run` must be picklable if your task is to support multiprocessing.
386 RunnerClass = TaskRunner
387 canMultiprocess =
True
391 """!A hook to allow a task to change the values of its config *after* the camera-specific
392 overrides are loaded but before any command-line overrides are applied.
394 This is necessary in some cases because the camera-specific overrides may retarget subtasks,
395 wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
397 @warning This is called by CmdLineTask.parseAndRun; other ways of constructing a config
398 will not apply these overrides.
400 @param[in] cls the class object
401 @param[in] config task configuration (an instance of cls.ConfigClass)
406 def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
407 """!Parse an argument list and run the command
409 Calling this method with no arguments specified is the standard way to run a command-line task
410 from the command line. For an example see pipe_tasks `bin/makeSkyMap.py` or almost any other
411 file in that directory.
413 @param cls the class object
414 @param args list of command-line arguments; if `None` use sys.argv
415 @param config config for task (instance of pex_config Config); if `None` use cls.ConfigClass()
416 @param log log (instance of lsst.pex.logging.Log); if `None` use the default log
417 @param doReturnResults Return the collected results from each invocation of the task?
418 This is only intended for unit tests and similar use.
419 It can easily exhaust memory (if the task returns enough data and you call it enough times)
420 and it will fail when using multiprocessing if the returned data cannot be pickled.
422 @return a Struct containing:
423 - argumentParser: the argument parser
424 - parsedCmd: the parsed command returned by the argument parser's parse_args method
425 - taskRunner: the task runner used to run the task (an instance of cls.RunnerClass)
426 - resultList: results returned by the task runner's run method, one entry per invocation.
427 This will typically be a list of `None` unless doReturnResults is `True`;
428 see cls.RunnerClass (TaskRunner by default) for more information.
430 argumentParser = cls._makeArgumentParser()
432 config = cls.ConfigClass()
433 parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
434 taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
435 resultList = taskRunner.run(parsedCmd)
437 argumentParser = argumentParser,
438 parsedCmd = parsedCmd,
439 taskRunner = taskRunner,
440 resultList = resultList,
445 """!Create and return an argument parser
447 @param[in] cls the class object
448 @return the argument parser for this task.
450 By default this returns an ArgumentParser with one ID argument named `--id` of dataset type "raw".
452 Your task subclass may need to override this method to change the dataset type or data ref level,
453 or to add additional data ID arguments. If you add additional data ID arguments or your task's
454 run method takes more than a single data reference then you will also have to provide a task-specific
455 task runner (see TaskRunner for more information).
457 parser = ArgumentParser(name=cls._DefaultName)
458 parser.add_id_argument(name=
"--id", datasetType=
"raw", help=
"data ID, e.g. --id visit=12345 ccd=1,2")
462 """!Write the configuration used for processing the data, or check that an existing
463 one is equal to the new one if present.
465 @param[in] butler data butler used to write the config.
466 The config is written to dataset type self._getConfigName()
467 @param[in] clobber a boolean flag that controls what happens if a config already has been saved:
468 - True: overwrite the existing config
469 - False: raise TaskError if this config does not match the existing config
472 if configName
is None:
475 butler.put(self.config, configName, doBackup=
True)
476 elif butler.datasetExists(configName):
478 oldConfig = butler.get(configName, immediate=
True)
479 output =
lambda msg: self.log.fatal(
"Comparing configuration: " + msg)
480 if not self.config.compare(oldConfig, shortcut=
False, output=output):
482 (
"Config does not match existing task config %r on disk; tasks configurations " + \
483 "must be consistent within the same output repo (override with --clobber-config)") % \
486 butler.put(self.config, configName)
489 """!Write the schemas returned by \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs"
491 @param[in] butler data butler used to write the schema.
492 Each schema is written to the dataset type specified as the key in the dict returned by
493 \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs".
494 @param[in] clobber a boolean flag that controls what happens if a schema already has been saved:
495 - True: overwrite the existing schema
496 - False: raise TaskError if this schema does not match the existing schema
498 @warning if clobber is False and an existing schema does not match a current schema,
499 then some schemas may have been saved successfully and others may not, and there is no easy way to
502 for dataset, catalog
in self.getAllSchemaCatalogs().iteritems():
503 schemaDataset = dataset +
"_schema"
505 butler.put(catalog, schemaDataset, doBackup=
True)
506 elif butler.datasetExists(schemaDataset):
507 oldSchema = butler.get(schemaDataset, immediate=
True).getSchema()
508 if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
510 (
"New schema does not match schema %r on disk; schemas must be " + \
511 " consistent within the same output repo (override with --clobber-config)") % \
514 butler.put(catalog, schemaDataset)
517 """!Write the metadata produced from processing the data
519 @param[in] dataRef butler data reference used to write the metadata.
520 The metadata is written to dataset type self._getMetadataName()
524 if metadataName
is not None:
525 dataRef.put(self.getFullMetadata(), metadataName)
527 self.log.warn(
"Could not persist metadata for dataId=%s: %s" % (dataRef.dataId, e,))
530 """!Return the name of the config dataset type, or None if config is not to be persisted
532 @note The name may depend on the config; that is why this is not a class method.
534 return self._DefaultName +
"_config"
537 """!Return the name of the metadata dataset type, or None if metadata is not to be persisted
539 @note The name may depend on the config; that is why this is not a class method.
541 return self._DefaultName +
"_metadata"
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
def __init__
Construct a TaskRunner.
def makeTask
Create a Task instance.
def _makeArgumentParser
Create and return an argument parser.
def makeTask
A variant of the base version that passes a butler argument to the task's constructor.
def run
Run the task on all targets.
def __call__
Run the Task on a single target.
def writeConfig
Write the configuration used for processing the data, or check that an existing one is equal to the n...
def _getConfigName
Return the name of the config dataset type, or None if config is not to be persisted.
def applyOverrides
A hook to allow a task to change the values of its config after the camera-specific overrides are loa...
def parseAndRun
Parse an argument list and run the command.
def _getMetadataName
Return the name of the metadata dataset type, or None if metadata is not to be persisted.
def writeMetadata
Write the metadata produced from processing the data.
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to their construc...
Run a command-line task, using multiprocessing if requested.
def profile
Context manager for profiling with cProfile.
def writeSchemas
Write the schemas returned by getAllSchemaCatalogs.
Base class for command-line tasks: tasks that may be executed from the command line.