LSSTApplications  17.0+11,17.0+34,17.0+56,17.0+57,17.0+59,17.0+7,17.0-1-g377950a+33,17.0.1-1-g114240f+2,17.0.1-1-g4d4fbc4+28,17.0.1-1-g55520dc+49,17.0.1-1-g5f4ed7e+52,17.0.1-1-g6dd7d69+17,17.0.1-1-g8de6c91+11,17.0.1-1-gb9095d2+7,17.0.1-1-ge9fec5e+5,17.0.1-1-gf4e0155+55,17.0.1-1-gfc65f5f+50,17.0.1-1-gfc6fb1f+20,17.0.1-10-g87f9f3f+1,17.0.1-11-ge9de802+16,17.0.1-16-ga14f7d5c+4,17.0.1-17-gc79d625+1,17.0.1-17-gdae4c4a+8,17.0.1-2-g26618f5+29,17.0.1-2-g54f2ebc+9,17.0.1-2-gf403422+1,17.0.1-20-g2ca2f74+6,17.0.1-23-gf3eadeb7+1,17.0.1-3-g7e86b59+39,17.0.1-3-gb5ca14a,17.0.1-3-gd08d533+40,17.0.1-30-g596af8797,17.0.1-4-g59d126d+4,17.0.1-4-gc69c472+5,17.0.1-6-g5afd9b9+4,17.0.1-7-g35889ee+1,17.0.1-7-gc7c8782+18,17.0.1-9-gc4bbfb2+3,w.2019.22
LSSTDataManagementBasePackage
cmdLineTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2015 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 __all__ = ["CmdLineTask", "TaskRunner", "ButlerInitializedTaskRunner", "LegacyTaskRunner"]
23 
24 import sys
25 import traceback
26 import functools
27 import contextlib
28 
29 import lsst.utils
30 from lsst.base import disableImplicitThreading
31 import lsst.afw.table as afwTable
32 from .task import Task, TaskError
33 from .struct import Struct
34 from .argumentParser import ArgumentParser
35 from lsst.base import Packages
36 from lsst.log import Log
37 
38 
39 def _runPool(pool, timeout, function, iterable):
40  """Wrapper around ``pool.map_async``, to handle timeout
41 
42  This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
43  http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
44  """
45  return pool.map_async(function, iterable).get(timeout)
46 
47 
48 @contextlib.contextmanager
49 def profile(filename, log=None):
50  """Context manager for profiling with cProfile.
51 
52 
53  Parameters
54  ----------
55  filename : `str`
56  Filename to which to write profile (profiling disabled if `None` or empty).
57  log : `lsst.log.Log`, optional
58  Log object for logging the profile operations.
59 
60  If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
61  it returns None), which allows additional control over profiling. You can obtain this using
62  the "as" clause, e.g.:
63 
64  with profile(filename) as prof:
65  runYourCodeHere()
66 
67  The output cumulative profile can be printed with a command-line like::
68 
69  python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
70  """
71  if not filename:
72  # Nothing to do
73  yield
74  return
75  from cProfile import Profile
76  profile = Profile()
77  if log is not None:
78  log.info("Enabling cProfile profiling")
79  profile.enable()
80  yield profile
81  profile.disable()
82  profile.dump_stats(filename)
83  if log is not None:
84  log.info("cProfile stats written to %s" % filename)
85 
86 
87 class TaskRunner:
88  """Run a command-line task, using `multiprocessing` if requested.
89 
90  Parameters
91  ----------
92  TaskClass : `lsst.pipe.base.Task` subclass
93  The class of the task to run.
94  parsedCmd : `argparse.Namespace`
95  The parsed command-line arguments, as returned by the task's argument parser's
96  `~lsst.pipe.base.ArgumentParser.parse_args` method.
97 
98  .. warning::
99 
100  Do not store ``parsedCmd``, as this instance is pickled (if multiprocessing) and parsedCmd may
101  contain non-picklable elements. It certainly contains more data than we need to send to each
102  instance of the task.
103  doReturnResults : `bool`, optional
104  Should run return the collected result from each invocation of the task? This is only intended for
105  unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
106  call it enough times) and it will fail when using multiprocessing if the returned data cannot be
107  pickled.
108 
109  Note that even if ``doReturnResults`` is False a struct with a single member "exitStatus" is returned,
110  with value 0 or 1 to be returned to the unix shell.
111 
112  Raises
113  ------
114  ImportError
115  If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be
116  imported.
117 
118  Notes
119  -----
120  Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a task runner. By default it is this
121  class, but some tasks require a subclass. See the manual :ref:`creating-a-command-line-task` for more
122  information. See `CmdLineTask.parseAndRun` to see how a task runner is used.
123 
124  You may use this task runner for your command-line task if your task has a runDataRef method that takes
125  exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of
126  this runner for your task's ``RunnerClass`` that overrides `TaskRunner.getTargetList` and possibly
127  `TaskRunner.__call__`. See `TaskRunner.getTargetList` for details.
128 
129  This design matches the common pattern for command-line tasks: the runDataRef method takes a single data
130  reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of
131  `TaskRunner` that calls these additional arguments by name.
132 
133  Instances of this class must be picklable in order to be compatible with multiprocessing. If
134  multiprocessing is requested (``parsedCmd.numProcesses > 1``) then `runDataRef` calls
135  `prepareForMultiProcessing` to jettison optional non-picklable elements. If your task runner is not
136  compatible with multiprocessing then indicate this in your task by setting class variable
137  ``canMultiprocess=False``.
138 
139  Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires specifying a timeout`__. This
140  timeout (in sec) can be specified as the ``timeout`` element in the output from
141  `~lsst.pipe.base.ArgumentParser` (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`.
142 
143  By default, we disable "implicit" threading -- ie, as provided by underlying numerical libraries such as
144  MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns
145  multiple processes and when multiple users are running on a shared system. Users can override this
146  behaviour by setting the ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable.
147 
148  .. __: http://bugs.python.org/issue8296
149  .. __: http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
150  """
151 
152  TIMEOUT = 3600*24*30
153  """Default timeout (seconds) for multiprocessing."""
154 
155  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
156  self.TaskClass = TaskClass
157  self.doReturnResults = bool(doReturnResults)
158  self.config = parsedCmd.config
159  self.log = parsedCmd.log
160  self.doRaise = bool(parsedCmd.doraise)
161  self.clobberConfig = bool(parsedCmd.clobberConfig)
162  self.doBackup = not bool(parsedCmd.noBackupConfig)
163  self.numProcesses = int(getattr(parsedCmd, 'processes', 1))
164 
165  self.timeout = getattr(parsedCmd, 'timeout', None)
166  if self.timeout is None or self.timeout <= 0:
167  self.timeout = self.TIMEOUT
168 
169  if self.numProcesses > 1:
170  if not TaskClass.canMultiprocess:
171  self.log.warn("This task does not support multiprocessing; using one process")
172  self.numProcesses = 1
173 
175  """Prepare this instance for multiprocessing
176 
177  Optional non-picklable elements are removed.
178 
179  This is only called if the task is run under multiprocessing.
180  """
181  self.log = None
182 
183  def run(self, parsedCmd):
184  """Run the task on all targets.
185 
186  Parameters
187  ----------
188  parsedCmd : `argparse.Namespace`
189  Parsed command `argparse.Namespace`.
190 
191  Returns
192  -------
193  resultList : `list`
194  A list of results returned by `TaskRunner.__call__`, or an empty list if `TaskRunner.__call__`
195  is not called (e.g. if `TaskRunner.precall` returns `False`). See `TaskRunner.__call__`
196  for details.
197 
198  Notes
199  -----
200  The task is run under multiprocessing if `TaskRunner.numProcesses` is more than 1; otherwise
201  processing is serial.
202  """
203  resultList = []
204  disableImplicitThreading() # To prevent thread contention
205  if self.numProcesses > 1:
206  import multiprocessing
208  pool = multiprocessing.Pool(processes=self.numProcesses, maxtasksperchild=1)
209  mapFunc = functools.partial(_runPool, pool, self.timeout)
210  else:
211  pool = None
212  mapFunc = map
213 
214  if self.precall(parsedCmd):
215  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
216  log = parsedCmd.log
217  targetList = self.getTargetList(parsedCmd)
218  if len(targetList) > 0:
219  with profile(profileName, log):
220  # Run the task using self.__call__
221  resultList = list(mapFunc(self, targetList))
222  else:
223  log.warn("Not running the task because there is no data to process; "
224  "you may preview data using \"--show data\"")
225 
226  if pool is not None:
227  pool.close()
228  pool.join()
229 
230  return resultList
231 
232  @staticmethod
233  def getTargetList(parsedCmd, **kwargs):
234  """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.
235 
236  Parameters
237  ----------
238  parsedCmd : `argparse.Namespace`
239  The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
240  kwargs
241  Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having
242  it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments
243  (see case (1) below).
244 
245  Notes
246  -----
247  The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any
248  command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you
249  must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly
250  `TaskRunner.__call__`. There are two cases.
251 
252  **Case 1**
253 
254  If your command-line task has a ``runDataRef`` method that takes one data reference followed by
255  additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional
256  arguments as an argument dict. To make this easier, your overridden version of
257  `~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword
258  arguments. For example, the following adds an argument dict containing a single key: "calExpList",
259  whose value is the list of data IDs for the calexp ID argument::
260 
261  def getTargetList(parsedCmd):
262  return TaskRunner.getTargetList(
263  parsedCmd,
264  calExpList=parsedCmd.calexp.idList
265  )
266 
267  It is equivalent to this slightly longer version::
268 
269  @staticmethod
270  def getTargetList(parsedCmd):
271  argDict = dict(calExpList=parsedCmd.calexp.idList)
272  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
273 
274  **Case 2**
275 
276  If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
277  `TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList`
278  returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task.
279  """
280  return [(ref, kwargs) for ref in parsedCmd.id.refList]
281 
282  def makeTask(self, parsedCmd=None, args=None):
283  """Create a Task instance.
284 
285  Parameters
286  ----------
287  parsedCmd
288  Parsed command-line options (used for extra task args by some task runners).
289  args
290  Args tuple passed to `TaskRunner.__call__` (used for extra task arguments by some task runners).
291 
292  Notes
293  -----
294  ``makeTask`` can be called with either the ``parsedCmd`` argument or ``args`` argument set to None,
295  but it must construct identical Task instances in either case.
296 
297  Subclasses may ignore this method entirely if they reimplement both `TaskRunner.precall` and
298  `TaskRunner.__call__`.
299  """
300  return self.TaskClass(config=self.config, log=self.log)
301 
302  def _precallImpl(self, task, parsedCmd):
303  """The main work of `precall`.
304 
305  We write package versions, schemas and configs, or compare these to existing files on disk if present.
306  """
307  if not parsedCmd.noVersions:
308  task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
309  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
310  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
311 
312  def precall(self, parsedCmd):
313  """Hook for code that should run exactly once, before multiprocessing.
314 
315  Notes
316  -----
317  Must return True if `TaskRunner.__call__` should subsequently be called.
318 
319  .. warning::
320 
321  Implementations must take care to ensure that no unpicklable attributes are added to the
322  TaskRunner itself, for compatibility with multiprocessing.
323 
324  The default implementation writes package versions, schemas and configs, or compares them to existing
325  files on disk if present.
326  """
327  task = self.makeTask(parsedCmd=parsedCmd)
328 
329  if self.doRaise:
330  self._precallImpl(task, parsedCmd)
331  else:
332  try:
333  self._precallImpl(task, parsedCmd)
334  except Exception as e:
335  task.log.fatal("Failed in task initialization: %s", e)
336  if not isinstance(e, TaskError):
337  traceback.print_exc(file=sys.stderr)
338  return False
339  return True
340 
341  def __call__(self, args):
342  """Run the Task on a single target.
343 
344  Parameters
345  ----------
346  args
347  Arguments for Task.runDataRef()
348 
349  Returns
350  -------
351  struct : `lsst.pipe.base.Struct`
352  Contains these fields if ``doReturnResults`` is `True`:
353 
354  - ``dataRef``: the provided data reference.
355  - ``metadata``: task metadata after execution of run.
356  - ``result``: result returned by task run, or `None` if the task fails.
357  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
358 
359  If ``doReturnResults`` is `False` the struct contains:
360 
361  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
362 
363  Notes
364  -----
365  This default implementation assumes that the ``args`` is a tuple containing a data reference and a
366  dict of keyword arguments.
367 
368  .. warning::
369 
370  If you override this method and wish to return something when ``doReturnResults`` is `False`,
371  then it must be picklable to support multiprocessing and it should be small enough that pickling
372  and unpickling do not add excessive overhead.
373  """
374  dataRef, kwargs = args
375  if self.log is None:
376  self.log = Log.getDefaultLogger()
377  if hasattr(dataRef, "dataId"):
378  self.log.MDC("LABEL", str(dataRef.dataId))
379  elif isinstance(dataRef, (list, tuple)):
380  self.log.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
381  task = self.makeTask(args=args)
382  result = None # in case the task fails
383  exitStatus = 0 # exit status for the shell
384  if self.doRaise:
385  result = self.runTask(task, dataRef, kwargs)
386  else:
387  try:
388  result = self.runTask(task, dataRef, kwargs)
389  except Exception as e:
390  # The shell exit value will be the number of dataRefs returning
391  # non-zero, so the actual value used here is lost.
392  exitStatus = 1
393 
394  # don't use a try block as we need to preserve the original exception
395  eName = type(e).__name__
396  if hasattr(dataRef, "dataId"):
397  task.log.fatal("Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
398  elif isinstance(dataRef, (list, tuple)):
399  task.log.fatal("Failed on dataIds=[%s]: %s: %s",
400  ", ".join(str(ref.dataId) for ref in dataRef), eName, e)
401  else:
402  task.log.fatal("Failed on dataRef=%s: %s: %s", dataRef, eName, e)
403 
404  if not isinstance(e, TaskError):
405  traceback.print_exc(file=sys.stderr)
406 
407  # Ensure all errors have been logged and aren't hanging around in a buffer
408  sys.stdout.flush()
409  sys.stderr.flush()
410 
411  task.writeMetadata(dataRef)
412 
413  # remove MDC so it does not show up outside of task context
414  self.log.MDCRemove("LABEL")
415 
416  if self.doReturnResults:
417  return Struct(
418  exitStatus=exitStatus,
419  dataRef=dataRef,
420  metadata=task.metadata,
421  result=result,
422  )
423  else:
424  return Struct(
425  exitStatus=exitStatus,
426  )
427 
428  def runTask(self, task, dataRef, kwargs):
429  """Make the actual call to `runDataRef` for this task.
430 
431  Parameters
432  ----------
433  task : `lsst.pipe.base.CmdLineTask` class
434  The class of the task to run.
435  dataRef
436  Butler data reference that contains the data the task will process.
437  kwargs
438  Any additional keyword arguments. See `TaskRunner.getTargetList` above.
439 
440  Notes
441  -----
442  The default implementation of `TaskRunner.runTask` works for any command-line task which has a
443  runDataRef method that takes a data reference and an optional set of additional keyword arguments.
444  This method returns the results generated by the task's `runDataRef` method.
445 
446  """
447  return task.runDataRef(dataRef, **kwargs)
448 
449 
451  r"""A `TaskRunner` for `CmdLineTask`\ s which calls the `Task`\ 's `run` method on a `dataRef` rather
452  than the `runDataRef` method.
453  """
454 
455  def runTask(self, task, dataRef, kwargs):
456  """Call `run` for this task instead of `runDataRef`. See `TaskRunner.runTask` above for details.
457  """
458  return task.run(dataRef, **kwargs)
459 
460 
462  r"""A `TaskRunner` for `CmdLineTask`\ s that require a ``butler`` keyword argument to be passed to
463  their constructor.
464  """
465 
466  def makeTask(self, parsedCmd=None, args=None):
467  """A variant of the base version that passes a butler argument to the task's constructor.
468 
469  Parameters
470  ----------
471  parsedCmd : `argparse.Namespace`
472  Parsed command-line options, as returned by the `~lsst.pipe.base.ArgumentParser`; if specified
473  then args is ignored.
474  args
475  Other arguments; if ``parsedCmd`` is `None` then this must be specified.
476 
477  Raises
478  ------
479  RuntimeError
480  Raised if ``parsedCmd`` and ``args`` are both `None`.
481  """
482  if parsedCmd is not None:
483  butler = parsedCmd.butler
484  elif args is not None:
485  dataRef, kwargs = args
486  butler = dataRef.butlerSubset.butler
487  else:
488  raise RuntimeError("parsedCmd or args must be specified")
489  return self.TaskClass(config=self.config, log=self.log, butler=butler)
490 
491 
493  """Base class for command-line tasks: tasks that may be executed from the command-line.
494 
495  Notes
496  -----
497  See :ref:`task-framework-overview` to learn what tasks are and :ref:`creating-a-command-line-task` for
498  more information about writing command-line tasks.
499 
500  Subclasses must specify the following class variables:
501 
502  - ``ConfigClass``: configuration class for your task (a subclass of `lsst.pex.config.Config`, or if your
503  task needs no configuration, then `lsst.pex.config.Config` itself).
504  - ``_DefaultName``: default name used for this task (a str).
505 
506  Subclasses may also specify the following class variables:
507 
508  - ``RunnerClass``: a task runner class. The default is ``TaskRunner``, which works for any task
509  with a runDataRef method that takes exactly one argument: a data reference. If your task does
510  not meet this requirement then you must supply a variant of ``TaskRunner``; see ``TaskRunner``
511  for more information.
512  - ``canMultiprocess``: the default is `True`; set `False` if your task does not support multiprocessing.
513 
514  Subclasses must specify a method named ``runDataRef``:
515 
516  - By default ``runDataRef`` accepts a single butler data reference, but you can specify an alternate
517  task runner (subclass of ``TaskRunner``) as the value of class variable ``RunnerClass`` if your run
518  method needs something else.
519  - ``runDataRef`` is expected to return its data in a `lsst.pipe.base.Struct`. This provides safety for
520  evolution of the task since new values may be added without harming existing code.
521  - The data returned by ``runDataRef`` must be picklable if your task is to support multiprocessing.
522  """
523  RunnerClass = TaskRunner
524  canMultiprocess = True
525 
526  @classmethod
527  def applyOverrides(cls, config):
528  """A hook to allow a task to change the values of its config *after* the camera-specific
529  overrides are loaded but before any command-line overrides are applied.
530 
531  Parameters
532  ----------
533  config : instance of task's ``ConfigClass``
534  Task configuration.
535 
536  Notes
537  -----
538  This is necessary in some cases because the camera-specific overrides may retarget subtasks,
539  wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
540 
541  .. warning::
542 
543  This is called by CmdLineTask.parseAndRun; other ways of constructing a config will not apply
544  these overrides.
545  """
546  pass
547 
548  @classmethod
549  def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
550  """Parse an argument list and run the command.
551 
552  Parameters
553  ----------
554  args : `list`, optional
555  List of command-line arguments; if `None` use `sys.argv`.
556  config : `lsst.pex.config.Config`-type, optional
557  Config for task. If `None` use `Task.ConfigClass`.
558  log : `lsst.log.Log`-type, optional
559  Log. If `None` use the default log.
560  doReturnResults : `bool`, optional
561  If `True`, return the results of this task. Default is `False`. This is only intended for
562  unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
563  call it enough times) and it will fail when using multiprocessing if the returned data cannot be
564  pickled.
565 
566  Returns
567  -------
568  struct : `lsst.pipe.base.Struct`
569  Fields are:
570 
571  - ``argumentParser``: the argument parser.
572  - ``parsedCmd``: the parsed command returned by the argument parser's
573  `lsst.pipe.base.ArgumentParser.parse_args` method.
574  - ``taskRunner``: the task runner used to run the task (an instance of `Task.RunnerClass`).
575  - ``resultList``: results returned by the task runner's ``run`` method, one entry per invocation.
576  This will typically be a list of `None` unless ``doReturnResults`` is `True`;
577  see `Task.RunnerClass` (`TaskRunner` by default) for more information.
578 
579  Notes
580  -----
581  Calling this method with no arguments specified is the standard way to run a command-line task
582  from the command-line. For an example see ``pipe_tasks`` ``bin/makeSkyMap.py`` or almost any other
583  file in that directory.
584 
585  If one or more of the dataIds fails then this routine will exit (with a status giving the
586  number of failed dataIds) rather than returning this struct; this behaviour can be
587  overridden by specifying the ``--noExit`` command-line option.
588  """
589  if args is None:
590  commandAsStr = " ".join(sys.argv)
591  args = sys.argv[1:]
592  else:
593  commandAsStr = "{}{}".format(lsst.utils.get_caller_name(skip=1), tuple(args))
594 
595  argumentParser = cls._makeArgumentParser()
596  if config is None:
597  config = cls.ConfigClass()
598  parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
599  # print this message after parsing the command so the log is fully configured
600  parsedCmd.log.info("Running: %s", commandAsStr)
601 
602  taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
603  resultList = taskRunner.run(parsedCmd)
604 
605  try:
606  nFailed = sum(((res.exitStatus != 0) for res in resultList))
607  except (TypeError, AttributeError) as e:
608  # NOTE: TypeError if resultList is None, AttributeError if it doesn't have exitStatus.
609  parsedCmd.log.warn("Unable to retrieve exit status (%s); assuming success", e)
610  nFailed = 0
611 
612  if nFailed > 0:
613  if parsedCmd.noExit:
614  parsedCmd.log.error("%d dataRefs failed; not exiting as --noExit was set", nFailed)
615  else:
616  sys.exit(nFailed)
617 
618  return Struct(
619  argumentParser=argumentParser,
620  parsedCmd=parsedCmd,
621  taskRunner=taskRunner,
622  resultList=resultList,
623  )
624 
625  @classmethod
626  def _makeArgumentParser(cls):
627  """Create and return an argument parser.
628 
629  Returns
630  -------
631  parser : `lsst.pipe.base.ArgumentParser`
632  The argument parser for this task.
633 
634  Notes
635  -----
636  By default this returns an `~lsst.pipe.base.ArgumentParser` with one ID argument named `--id` of
637  dataset type ``raw``.
638 
639  Your task subclass may need to override this method to change the dataset type or data ref level,
640  or to add additional data ID arguments. If you add additional data ID arguments or your task's
641  runDataRef method takes more than a single data reference then you will also have to provide a
642  task-specific task runner (see TaskRunner for more information).
643  """
644  parser = ArgumentParser(name=cls._DefaultName)
645  parser.add_id_argument(name="--id", datasetType="raw",
646  help="data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
647  return parser
648 
649  def writeConfig(self, butler, clobber=False, doBackup=True):
650  """Write the configuration used for processing the data, or check that an existing
651  one is equal to the new one if present.
652 
653  Parameters
654  ----------
655  butler : `lsst.daf.persistence.Butler`
656  Data butler used to write the config. The config is written to dataset type
657  `CmdLineTask._getConfigName`.
658  clobber : `bool`, optional
659  A boolean flag that controls what happens if a config already has been saved:
660  - `True`: overwrite or rename the existing config, depending on ``doBackup``.
661  - `False`: raise `TaskError` if this config does not match the existing config.
662  doBackup : bool, optional
663  Set to `True` to backup the config files if clobbering.
664  """
665  configName = self._getConfigName()
666  if configName is None:
667  return
668  if clobber:
669  butler.put(self.config, configName, doBackup=doBackup)
670  elif butler.datasetExists(configName, write=True):
671  # this may be subject to a race condition; see #2789
672  try:
673  oldConfig = butler.get(configName, immediate=True)
674  except Exception as exc:
675  raise type(exc)("Unable to read stored config file %s (%s); consider using --clobber-config" %
676  (configName, exc))
677 
678  def logConfigMismatch(msg):
679  self.log.fatal("Comparing configuration: %s", msg)
680 
681  if not self.config.compare(oldConfig, shortcut=False, output=logConfigMismatch):
682  raise TaskError(
683  ("Config does not match existing task config %r on disk; tasks configurations " +
684  "must be consistent within the same output repo (override with --clobber-config)") %
685  (configName,))
686  else:
687  butler.put(self.config, configName)
688 
689  def writeSchemas(self, butler, clobber=False, doBackup=True):
690  """Write the schemas returned by `lsst.pipe.base.Task.getAllSchemaCatalogs`.
691 
692  Parameters
693  ----------
694  butler : `lsst.daf.persistence.Butler`
695  Data butler used to write the schema. Each schema is written to the dataset type specified as the
696  key in the dict returned by `~lsst.pipe.base.Task.getAllSchemaCatalogs`.
697  clobber : `bool`, optional
698  A boolean flag that controls what happens if a schema already has been saved:
699  - `True`: overwrite or rename the existing schema, depending on ``doBackup``.
700  - `False`: raise `TaskError` if this schema does not match the existing schema.
701  doBackup : `bool`, optional
702  Set to `True` to backup the schema files if clobbering.
703 
704  Notes
705  -----
706  If ``clobber`` is `False` and an existing schema does not match a current schema,
707  then some schemas may have been saved successfully and others may not, and there is no easy way to
708  tell which is which.
709  """
710  for dataset, catalog in self.getAllSchemaCatalogs().items():
711  schemaDataset = dataset + "_schema"
712  if clobber:
713  butler.put(catalog, schemaDataset, doBackup=doBackup)
714  elif butler.datasetExists(schemaDataset, write=True):
715  oldSchema = butler.get(schemaDataset, immediate=True).getSchema()
716  if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
717  raise TaskError(
718  ("New schema does not match schema %r on disk; schemas must be " +
719  " consistent within the same output repo (override with --clobber-config)") %
720  (dataset,))
721  else:
722  butler.put(catalog, schemaDataset)
723 
724  def writeMetadata(self, dataRef):
725  """Write the metadata produced from processing the data.
726 
727  Parameters
728  ----------
729  dataRef
730  Butler data reference used to write the metadata.
731  The metadata is written to dataset type `CmdLineTask._getMetadataName`.
732  """
733  try:
734  metadataName = self._getMetadataName()
735  if metadataName is not None:
736  dataRef.put(self.getFullMetadata(), metadataName)
737  except Exception as e:
738  self.log.warn("Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
739 
740  def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages"):
741  """Compare and write package versions.
742 
743  Parameters
744  ----------
745  butler : `lsst.daf.persistence.Butler`
746  Data butler used to read/write the package versions.
747  clobber : `bool`, optional
748  A boolean flag that controls what happens if versions already have been saved:
749  - `True`: overwrite or rename the existing version info, depending on ``doBackup``.
750  - `False`: raise `TaskError` if this version info does not match the existing.
751  doBackup : `bool`, optional
752  If `True` and clobbering, old package version files are backed up.
753  dataset : `str`, optional
754  Name of dataset to read/write.
755 
756  Raises
757  ------
758  TaskError
759  Raised if there is a version mismatch with current and persisted lists of package versions.
760 
761  Notes
762  -----
763  Note that this operation is subject to a race condition.
764  """
765  packages = Packages.fromSystem()
766 
767  if clobber:
768  return butler.put(packages, dataset, doBackup=doBackup)
769  if not butler.datasetExists(dataset, write=True):
770  return butler.put(packages, dataset)
771 
772  try:
773  old = butler.get(dataset, immediate=True)
774  except Exception as exc:
775  raise type(exc)("Unable to read stored version dataset %s (%s); "
776  "consider using --clobber-versions or --no-versions" %
777  (dataset, exc))
778  # Note that because we can only detect python modules that have been imported, the stored
779  # list of products may be more or less complete than what we have now. What's important is
780  # that the products that are in common have the same version.
781  diff = packages.difference(old)
782  if diff:
783  raise TaskError(
784  "Version mismatch (" +
785  "; ".join("%s: %s vs %s" % (pkg, diff[pkg][1], diff[pkg][0]) for pkg in diff) +
786  "); consider using --clobber-versions or --no-versions")
787  # Update the old set of packages in case we have more packages that haven't been persisted.
788  extra = packages.extra(old)
789  if extra:
790  old.update(packages)
791  butler.put(old, dataset, doBackup=doBackup)
792 
793  def _getConfigName(self):
794  """Get the name of the config dataset type, or `None` if config is not to be persisted.
795 
796  Notes
797  -----
798  The name may depend on the config; that is why this is not a class method.
799  """
800  return self._DefaultName + "_config"
801 
802  def _getMetadataName(self):
803  """Get the name of the metadata dataset type, or `None` if metadata is not to be persisted.
804 
805  Notes
806  -----
807  The name may depend on the config; that is why this is not a class method.
808  """
809  return self._DefaultName + "_metadata"
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
Definition: cmdLineTask.py:549
def _precallImpl(self, task, parsedCmd)
Definition: cmdLineTask.py:302
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:428
def getFullMetadata(self)
Definition: task.py:210
def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages")
Definition: cmdLineTask.py:740
def getAllSchemaCatalogs(self)
Definition: task.py:188
def writeSchemas(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:689
bool disableImplicitThreading()
Disable threading that has not been set explicitly.
Definition: threads.cc:132
Definition: Log.h:691
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:282
table::Key< int > type
Definition: Detector.cc:167
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: cmdLineTask.py:155
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def profile(filename, log=None)
Definition: cmdLineTask.py:49
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:466
def getTargetList(parsedCmd, kwargs)
Definition: cmdLineTask.py:233
def writeConfig(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:649
std::vector< SchemaItem< Flag > > * items
daf::base::PropertyList * list
Definition: fits.cc:885
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:455