LSSTApplications  20.0.0
LSSTDataManagementBasePackage
cmdLineTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2015 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 __all__ = ["CmdLineTask", "TaskRunner", "ButlerInitializedTaskRunner", "LegacyTaskRunner"]
23 
24 import sys
25 import traceback
26 import functools
27 import contextlib
28 
29 import lsst.utils
30 from lsst.base import disableImplicitThreading
31 import lsst.afw.table as afwTable
32 from .task import Task, TaskError
33 from .struct import Struct
34 from .argumentParser import ArgumentParser
35 from lsst.base import Packages
36 from lsst.log import Log
37 
38 
39 def _runPool(pool, timeout, function, iterable):
40  """Wrapper around ``pool.map_async``, to handle timeout
41 
42  This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
43  http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
44  """
45  return pool.map_async(function, iterable).get(timeout)
46 
47 
48 @contextlib.contextmanager
49 def profile(filename, log=None):
50  """Context manager for profiling with cProfile.
51 
52 
53  Parameters
54  ----------
55  filename : `str`
56  Filename to which to write profile (profiling disabled if `None` or empty).
57  log : `lsst.log.Log`, optional
58  Log object for logging the profile operations.
59 
60  If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
61  it returns None), which allows additional control over profiling. You can obtain this using
62  the "as" clause, e.g.:
63 
64  with profile(filename) as prof:
65  runYourCodeHere()
66 
67  The output cumulative profile can be printed with a command-line like::
68 
69  python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
70  """
71  if not filename:
72  # Nothing to do
73  yield
74  return
75  from cProfile import Profile
76  profile = Profile()
77  if log is not None:
78  log.info("Enabling cProfile profiling")
79  profile.enable()
80  yield profile
81  profile.disable()
82  profile.dump_stats(filename)
83  if log is not None:
84  log.info("cProfile stats written to %s", filename)
85 
86 
87 class TaskRunner:
88  """Run a command-line task, using `multiprocessing` if requested.
89 
90  Parameters
91  ----------
92  TaskClass : `lsst.pipe.base.Task` subclass
93  The class of the task to run.
94  parsedCmd : `argparse.Namespace`
95  The parsed command-line arguments, as returned by the task's argument parser's
96  `~lsst.pipe.base.ArgumentParser.parse_args` method.
97 
98  .. warning::
99 
100  Do not store ``parsedCmd``, as this instance is pickled (if multiprocessing) and parsedCmd may
101  contain non-picklable elements. It certainly contains more data than we need to send to each
102  instance of the task.
103  doReturnResults : `bool`, optional
104  Should run return the collected result from each invocation of the task? This is only intended for
105  unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
106  call it enough times) and it will fail when using multiprocessing if the returned data cannot be
107  pickled.
108 
109  Note that even if ``doReturnResults`` is False a struct with a single member "exitStatus" is returned,
110  with value 0 or 1 to be returned to the unix shell.
111 
112  Raises
113  ------
114  ImportError
115  If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be
116  imported.
117 
118  Notes
119  -----
120  Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a task runner. By default it is this
121  class, but some tasks require a subclass. See the manual :ref:`creating-a-command-line-task` for more
122  information. See `CmdLineTask.parseAndRun` to see how a task runner is used.
123 
124  You may use this task runner for your command-line task if your task has a runDataRef method that takes
125  exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of
126  this runner for your task's ``RunnerClass`` that overrides `TaskRunner.getTargetList` and possibly
127  `TaskRunner.__call__`. See `TaskRunner.getTargetList` for details.
128 
129  This design matches the common pattern for command-line tasks: the runDataRef method takes a single data
130  reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of
131  `TaskRunner` that calls these additional arguments by name.
132 
133  Instances of this class must be picklable in order to be compatible with multiprocessing. If
134  multiprocessing is requested (``parsedCmd.numProcesses > 1``) then `runDataRef` calls
135  `prepareForMultiProcessing` to jettison optional non-picklable elements. If your task runner is not
136  compatible with multiprocessing then indicate this in your task by setting class variable
137  ``canMultiprocess=False``.
138 
139  Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires specifying a timeout`__. This
140  timeout (in sec) can be specified as the ``timeout`` element in the output from
141  `~lsst.pipe.base.ArgumentParser` (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`.
142 
143  By default, we disable "implicit" threading -- ie, as provided by underlying numerical libraries such as
144  MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns
145  multiple processes and when multiple users are running on a shared system. Users can override this
146  behaviour by setting the ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable.
147 
148  .. __: http://bugs.python.org/issue8296
149  .. __: http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
150  """
151 
152  TIMEOUT = 3600*24*30
153  """Default timeout (seconds) for multiprocessing."""
154 
155  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
156  self.TaskClass = TaskClass
157  self.doReturnResults = bool(doReturnResults)
158  self.config = parsedCmd.config
159  self.log = parsedCmd.log
160  self.doRaise = bool(parsedCmd.doraise)
161  self.clobberConfig = bool(parsedCmd.clobberConfig)
162  self.doBackup = not bool(parsedCmd.noBackupConfig)
163  self.numProcesses = int(getattr(parsedCmd, 'processes', 1))
164 
165  self.timeout = getattr(parsedCmd, 'timeout', None)
166  if self.timeout is None or self.timeout <= 0:
167  self.timeout = self.TIMEOUT
168 
169  if self.numProcesses > 1:
170  if not TaskClass.canMultiprocess:
171  self.log.warn("This task does not support multiprocessing; using one process")
172  self.numProcesses = 1
173 
175  """Prepare this instance for multiprocessing
176 
177  Optional non-picklable elements are removed.
178 
179  This is only called if the task is run under multiprocessing.
180  """
181  self.log = None
182 
183  def run(self, parsedCmd):
184  """Run the task on all targets.
185 
186  Parameters
187  ----------
188  parsedCmd : `argparse.Namespace`
189  Parsed command `argparse.Namespace`.
190 
191  Returns
192  -------
193  resultList : `list`
194  A list of results returned by `TaskRunner.__call__`, or an empty list if `TaskRunner.__call__`
195  is not called (e.g. if `TaskRunner.precall` returns `False`). See `TaskRunner.__call__`
196  for details.
197 
198  Notes
199  -----
200  The task is run under multiprocessing if `TaskRunner.numProcesses` is more than 1; otherwise
201  processing is serial.
202  """
203  resultList = []
204  disableImplicitThreading() # To prevent thread contention
205  if self.numProcesses > 1:
206  import multiprocessing
208  pool = multiprocessing.Pool(processes=self.numProcesses, maxtasksperchild=1)
209  mapFunc = functools.partial(_runPool, pool, self.timeout)
210  else:
211  pool = None
212  mapFunc = map
213 
214  if self.precall(parsedCmd):
215  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
216  log = parsedCmd.log
217  targetList = self.getTargetList(parsedCmd)
218  if len(targetList) > 0:
219  with profile(profileName, log):
220  # Run the task using self.__call__
221  resultList = list(mapFunc(self, targetList))
222  else:
223  log.warn("Not running the task because there is no data to process; "
224  "you may preview data using \"--show data\"")
225 
226  if pool is not None:
227  pool.close()
228  pool.join()
229 
230  return resultList
231 
232  @staticmethod
233  def getTargetList(parsedCmd, **kwargs):
234  """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.
235 
236  Parameters
237  ----------
238  parsedCmd : `argparse.Namespace`
239  The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
240  kwargs
241  Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having
242  it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments
243  (see case (1) below).
244 
245  Notes
246  -----
247  The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any
248  command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you
249  must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly
250  `TaskRunner.__call__`. There are two cases.
251 
252  **Case 1**
253 
254  If your command-line task has a ``runDataRef`` method that takes one data reference followed by
255  additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional
256  arguments as an argument dict. To make this easier, your overridden version of
257  `~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword
258  arguments. For example, the following adds an argument dict containing a single key: "calExpList",
259  whose value is the list of data IDs for the calexp ID argument::
260 
261  def getTargetList(parsedCmd):
262  return TaskRunner.getTargetList(
263  parsedCmd,
264  calExpList=parsedCmd.calexp.idList
265  )
266 
267  It is equivalent to this slightly longer version::
268 
269  @staticmethod
270  def getTargetList(parsedCmd):
271  argDict = dict(calExpList=parsedCmd.calexp.idList)
272  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
273 
274  **Case 2**
275 
276  If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
277  `TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList`
278  returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task.
279  """
280  return [(ref, kwargs) for ref in parsedCmd.id.refList]
281 
282  def makeTask(self, parsedCmd=None, args=None):
283  """Create a Task instance.
284 
285  Parameters
286  ----------
287  parsedCmd
288  Parsed command-line options (used for extra task args by some task runners).
289  args
290  Args tuple passed to `TaskRunner.__call__` (used for extra task arguments by some task runners).
291 
292  Notes
293  -----
294  ``makeTask`` can be called with either the ``parsedCmd`` argument or ``args`` argument set to None,
295  but it must construct identical Task instances in either case.
296 
297  Subclasses may ignore this method entirely if they reimplement both `TaskRunner.precall` and
298  `TaskRunner.__call__`.
299  """
300  return self.TaskClass(config=self.config, log=self.log)
301 
302  def _precallImpl(self, task, parsedCmd):
303  """The main work of `precall`.
304 
305  We write package versions, schemas and configs, or compare these to existing files on disk if present.
306  """
307  if not parsedCmd.noVersions:
308  task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
309  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
310  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
311 
312  def precall(self, parsedCmd):
313  """Hook for code that should run exactly once, before multiprocessing.
314 
315  Notes
316  -----
317  Must return True if `TaskRunner.__call__` should subsequently be called.
318 
319  .. warning::
320 
321  Implementations must take care to ensure that no unpicklable attributes are added to the
322  TaskRunner itself, for compatibility with multiprocessing.
323 
324  The default implementation writes package versions, schemas and configs, or compares them to existing
325  files on disk if present.
326  """
327  task = self.makeTask(parsedCmd=parsedCmd)
328 
329  if self.doRaise:
330  self._precallImpl(task, parsedCmd)
331  else:
332  try:
333  self._precallImpl(task, parsedCmd)
334  except Exception as e:
335  task.log.fatal("Failed in task initialization: %s", e)
336  if not isinstance(e, TaskError):
337  traceback.print_exc(file=sys.stderr)
338  return False
339  return True
340 
341  def __call__(self, args):
342  """Run the Task on a single target.
343 
344  Parameters
345  ----------
346  args
347  Arguments for Task.runDataRef()
348 
349  Returns
350  -------
351  struct : `lsst.pipe.base.Struct`
352  Contains these fields if ``doReturnResults`` is `True`:
353 
354  - ``dataRef``: the provided data reference.
355  - ``metadata``: task metadata after execution of run.
356  - ``result``: result returned by task run, or `None` if the task fails.
357  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
358 
359  If ``doReturnResults`` is `False` the struct contains:
360 
361  - ``exitStatus``: 0 if the task completed successfully, 1 otherwise.
362 
363  Notes
364  -----
365  This default implementation assumes that the ``args`` is a tuple containing a data reference and a
366  dict of keyword arguments.
367 
368  .. warning::
369 
370  If you override this method and wish to return something when ``doReturnResults`` is `False`,
371  then it must be picklable to support multiprocessing and it should be small enough that pickling
372  and unpickling do not add excessive overhead.
373  """
374  dataRef, kwargs = args
375  if self.log is None:
376  self.log = Log.getDefaultLogger()
377  if hasattr(dataRef, "dataId"):
378  self.log.MDC("LABEL", str(dataRef.dataId))
379  elif isinstance(dataRef, (list, tuple)):
380  self.log.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
381  task = self.makeTask(args=args)
382  result = None # in case the task fails
383  exitStatus = 0 # exit status for the shell
384  if self.doRaise:
385  result = self.runTask(task, dataRef, kwargs)
386  else:
387  try:
388  result = self.runTask(task, dataRef, kwargs)
389  except Exception as e:
390  # The shell exit value will be the number of dataRefs returning
391  # non-zero, so the actual value used here is lost.
392  exitStatus = 1
393 
394  # don't use a try block as we need to preserve the original exception
395  eName = type(e).__name__
396  if hasattr(dataRef, "dataId"):
397  task.log.fatal("Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
398  elif isinstance(dataRef, (list, tuple)):
399  task.log.fatal("Failed on dataIds=[%s]: %s: %s",
400  ", ".join(str(ref.dataId) for ref in dataRef), eName, e)
401  else:
402  task.log.fatal("Failed on dataRef=%s: %s: %s", dataRef, eName, e)
403 
404  if not isinstance(e, TaskError):
405  traceback.print_exc(file=sys.stderr)
406 
407  # Ensure all errors have been logged and aren't hanging around in a buffer
408  sys.stdout.flush()
409  sys.stderr.flush()
410 
411  task.writeMetadata(dataRef)
412 
413  # remove MDC so it does not show up outside of task context
414  self.log.MDCRemove("LABEL")
415 
416  if self.doReturnResults:
417  return Struct(
418  exitStatus=exitStatus,
419  dataRef=dataRef,
420  metadata=task.metadata,
421  result=result,
422  )
423  else:
424  return Struct(
425  exitStatus=exitStatus,
426  )
427 
428  def runTask(self, task, dataRef, kwargs):
429  """Make the actual call to `runDataRef` for this task.
430 
431  Parameters
432  ----------
433  task : `lsst.pipe.base.CmdLineTask` class
434  The class of the task to run.
435  dataRef
436  Butler data reference that contains the data the task will process.
437  kwargs
438  Any additional keyword arguments. See `TaskRunner.getTargetList` above.
439 
440  Notes
441  -----
442  The default implementation of `TaskRunner.runTask` works for any command-line task which has a
443  runDataRef method that takes a data reference and an optional set of additional keyword arguments.
444  This method returns the results generated by the task's `runDataRef` method.
445 
446  """
447  return task.runDataRef(dataRef, **kwargs)
448 
449 
451  r"""A `TaskRunner` for `CmdLineTask`\ s which calls the `Task`\ 's `run` method on a `dataRef` rather
452  than the `runDataRef` method.
453  """
454 
455  def runTask(self, task, dataRef, kwargs):
456  """Call `run` for this task instead of `runDataRef`. See `TaskRunner.runTask` above for details.
457  """
458  return task.run(dataRef, **kwargs)
459 
460 
462  r"""A `TaskRunner` for `CmdLineTask`\ s that require a ``butler`` keyword argument to be passed to
463  their constructor.
464  """
465 
466  def makeTask(self, parsedCmd=None, args=None):
467  """A variant of the base version that passes a butler argument to the task's constructor.
468 
469  Parameters
470  ----------
471  parsedCmd : `argparse.Namespace`
472  Parsed command-line options, as returned by the `~lsst.pipe.base.ArgumentParser`; if specified
473  then args is ignored.
474  args
475  Other arguments; if ``parsedCmd`` is `None` then this must be specified.
476 
477  Raises
478  ------
479  RuntimeError
480  Raised if ``parsedCmd`` and ``args`` are both `None`.
481  """
482  if parsedCmd is not None:
483  butler = parsedCmd.butler
484  elif args is not None:
485  dataRef, kwargs = args
486  butler = dataRef.butlerSubset.butler
487  else:
488  raise RuntimeError("parsedCmd or args must be specified")
489  return self.TaskClass(config=self.config, log=self.log, butler=butler)
490 
491 
493  """Base class for command-line tasks: tasks that may be executed from the command-line.
494 
495  Notes
496  -----
497  See :ref:`task-framework-overview` to learn what tasks are and :ref:`creating-a-command-line-task` for
498  more information about writing command-line tasks.
499 
500  Subclasses must specify the following class variables:
501 
502  - ``ConfigClass``: configuration class for your task (a subclass of `lsst.pex.config.Config`, or if your
503  task needs no configuration, then `lsst.pex.config.Config` itself).
504  - ``_DefaultName``: default name used for this task (a str).
505 
506  Subclasses may also specify the following class variables:
507 
508  - ``RunnerClass``: a task runner class. The default is ``TaskRunner``, which works for any task
509  with a runDataRef method that takes exactly one argument: a data reference. If your task does
510  not meet this requirement then you must supply a variant of ``TaskRunner``; see ``TaskRunner``
511  for more information.
512  - ``canMultiprocess``: the default is `True`; set `False` if your task does not support multiprocessing.
513 
514  Subclasses must specify a method named ``runDataRef``:
515 
516  - By default ``runDataRef`` accepts a single butler data reference, but you can specify an alternate
517  task runner (subclass of ``TaskRunner``) as the value of class variable ``RunnerClass`` if your run
518  method needs something else.
519  - ``runDataRef`` is expected to return its data in a `lsst.pipe.base.Struct`. This provides safety for
520  evolution of the task since new values may be added without harming existing code.
521  - The data returned by ``runDataRef`` must be picklable if your task is to support multiprocessing.
522  """
523  RunnerClass = TaskRunner
524  canMultiprocess = True
525 
526  @classmethod
527  def applyOverrides(cls, config):
528  """A hook to allow a task to change the values of its config *after* the camera-specific
529  overrides are loaded but before any command-line overrides are applied.
530 
531  Parameters
532  ----------
533  config : instance of task's ``ConfigClass``
534  Task configuration.
535 
536  Notes
537  -----
538  This is necessary in some cases because the camera-specific overrides may retarget subtasks,
539  wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
540 
541  .. warning::
542 
543  This is called by CmdLineTask.parseAndRun; other ways of constructing a config will not apply
544  these overrides.
545  """
546  pass
547 
548  @classmethod
549  def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
550  """Parse an argument list and run the command.
551 
552  Parameters
553  ----------
554  args : `list`, optional
555  List of command-line arguments; if `None` use `sys.argv`.
556  config : `lsst.pex.config.Config`-type, optional
557  Config for task. If `None` use `Task.ConfigClass`.
558  log : `lsst.log.Log`-type, optional
559  Log. If `None` use the default log.
560  doReturnResults : `bool`, optional
561  If `True`, return the results of this task. Default is `False`. This is only intended for
562  unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you
563  call it enough times) and it will fail when using multiprocessing if the returned data cannot be
564  pickled.
565 
566  Returns
567  -------
568  struct : `lsst.pipe.base.Struct`
569  Fields are:
570 
571  ``argumentParser``
572  the argument parser (`lsst.pipe.base.ArgumentParser`).
573  ``parsedCmd``
574  the parsed command returned by the argument parser's
575  `~lsst.pipe.base.ArgumentParser.parse_args` method
576  (`argparse.Namespace`).
577  ``taskRunner``
578  the task runner used to run the task (an instance of `Task.RunnerClass`).
579  ``resultList``
580  results returned by the task runner's ``run`` method, one entry
581  per invocation (`list`). This will typically be a list of
582  `Struct`, each containing at least an ``exitStatus`` integer
583  (0 or 1); see `Task.RunnerClass` (`TaskRunner` by default) for
584  more details.
585 
586  Notes
587  -----
588  Calling this method with no arguments specified is the standard way to run a command-line task
589  from the command-line. For an example see ``pipe_tasks`` ``bin/makeSkyMap.py`` or almost any other
590  file in that directory.
591 
592  If one or more of the dataIds fails then this routine will exit (with a status giving the
593  number of failed dataIds) rather than returning this struct; this behaviour can be
594  overridden by specifying the ``--noExit`` command-line option.
595  """
596  if args is None:
597  commandAsStr = " ".join(sys.argv)
598  args = sys.argv[1:]
599  else:
600  commandAsStr = "{}{}".format(lsst.utils.get_caller_name(skip=1), tuple(args))
601 
602  argumentParser = cls._makeArgumentParser()
603  if config is None:
604  config = cls.ConfigClass()
605  parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
606  # print this message after parsing the command so the log is fully configured
607  parsedCmd.log.info("Running: %s", commandAsStr)
608 
609  taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
610  resultList = taskRunner.run(parsedCmd)
611 
612  try:
613  nFailed = sum(((res.exitStatus != 0) for res in resultList))
614  except (TypeError, AttributeError) as e:
615  # NOTE: TypeError if resultList is None, AttributeError if it doesn't have exitStatus.
616  parsedCmd.log.warn("Unable to retrieve exit status (%s); assuming success", e)
617  nFailed = 0
618 
619  if nFailed > 0:
620  if parsedCmd.noExit:
621  parsedCmd.log.error("%d dataRefs failed; not exiting as --noExit was set", nFailed)
622  else:
623  sys.exit(nFailed)
624 
625  return Struct(
626  argumentParser=argumentParser,
627  parsedCmd=parsedCmd,
628  taskRunner=taskRunner,
629  resultList=resultList,
630  )
631 
632  @classmethod
633  def _makeArgumentParser(cls):
634  """Create and return an argument parser.
635 
636  Returns
637  -------
638  parser : `lsst.pipe.base.ArgumentParser`
639  The argument parser for this task.
640 
641  Notes
642  -----
643  By default this returns an `~lsst.pipe.base.ArgumentParser` with one ID argument named `--id` of
644  dataset type ``raw``.
645 
646  Your task subclass may need to override this method to change the dataset type or data ref level,
647  or to add additional data ID arguments. If you add additional data ID arguments or your task's
648  runDataRef method takes more than a single data reference then you will also have to provide a
649  task-specific task runner (see TaskRunner for more information).
650  """
651  parser = ArgumentParser(name=cls._DefaultName)
652  parser.add_id_argument(name="--id", datasetType="raw",
653  help="data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
654  return parser
655 
656  def writeConfig(self, butler, clobber=False, doBackup=True):
657  """Write the configuration used for processing the data, or check that an existing
658  one is equal to the new one if present.
659 
660  Parameters
661  ----------
662  butler : `lsst.daf.persistence.Butler`
663  Data butler used to write the config. The config is written to dataset type
664  `CmdLineTask._getConfigName`.
665  clobber : `bool`, optional
666  A boolean flag that controls what happens if a config already has been saved:
667  - `True`: overwrite or rename the existing config, depending on ``doBackup``.
668  - `False`: raise `TaskError` if this config does not match the existing config.
669  doBackup : bool, optional
670  Set to `True` to backup the config files if clobbering.
671  """
672  configName = self._getConfigName()
673  if configName is None:
674  return
675  if clobber:
676  butler.put(self.config, configName, doBackup=doBackup)
677  elif butler.datasetExists(configName, write=True):
678  # this may be subject to a race condition; see #2789
679  try:
680  oldConfig = butler.get(configName, immediate=True)
681  except Exception as exc:
682  raise type(exc)(f"Unable to read stored config file {configName} (exc); "
683  "consider using --clobber-config")
684 
685  def logConfigMismatch(msg):
686  self.log.fatal("Comparing configuration: %s", msg)
687 
688  if not self.config.compare(oldConfig, shortcut=False, output=logConfigMismatch):
689  raise TaskError(
690  f"Config does not match existing task config {configName!r} on disk; "
691  "tasks configurations must be consistent within the same output repo "
692  "(override with --clobber-config)")
693  else:
694  butler.put(self.config, configName)
695 
696  def writeSchemas(self, butler, clobber=False, doBackup=True):
697  """Write the schemas returned by `lsst.pipe.base.Task.getAllSchemaCatalogs`.
698 
699  Parameters
700  ----------
701  butler : `lsst.daf.persistence.Butler`
702  Data butler used to write the schema. Each schema is written to the dataset type specified as the
703  key in the dict returned by `~lsst.pipe.base.Task.getAllSchemaCatalogs`.
704  clobber : `bool`, optional
705  A boolean flag that controls what happens if a schema already has been saved:
706  - `True`: overwrite or rename the existing schema, depending on ``doBackup``.
707  - `False`: raise `TaskError` if this schema does not match the existing schema.
708  doBackup : `bool`, optional
709  Set to `True` to backup the schema files if clobbering.
710 
711  Notes
712  -----
713  If ``clobber`` is `False` and an existing schema does not match a current schema,
714  then some schemas may have been saved successfully and others may not, and there is no easy way to
715  tell which is which.
716  """
717  for dataset, catalog in self.getAllSchemaCatalogs().items():
718  schemaDataset = dataset + "_schema"
719  if clobber:
720  butler.put(catalog, schemaDataset, doBackup=doBackup)
721  elif butler.datasetExists(schemaDataset, write=True):
722  oldSchema = butler.get(schemaDataset, immediate=True).getSchema()
723  if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
724  raise TaskError(
725  f"New schema does not match schema {dataset!r} on disk; "
726  "schemas must be consistent within the same output repo "
727  "(override with --clobber-config)")
728  else:
729  butler.put(catalog, schemaDataset)
730 
731  def writeMetadata(self, dataRef):
732  """Write the metadata produced from processing the data.
733 
734  Parameters
735  ----------
736  dataRef
737  Butler data reference used to write the metadata.
738  The metadata is written to dataset type `CmdLineTask._getMetadataName`.
739  """
740  try:
741  metadataName = self._getMetadataName()
742  if metadataName is not None:
743  dataRef.put(self.getFullMetadata(), metadataName)
744  except Exception as e:
745  self.log.warn("Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
746 
747  def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages"):
748  """Compare and write package versions.
749 
750  Parameters
751  ----------
752  butler : `lsst.daf.persistence.Butler`
753  Data butler used to read/write the package versions.
754  clobber : `bool`, optional
755  A boolean flag that controls what happens if versions already have been saved:
756  - `True`: overwrite or rename the existing version info, depending on ``doBackup``.
757  - `False`: raise `TaskError` if this version info does not match the existing.
758  doBackup : `bool`, optional
759  If `True` and clobbering, old package version files are backed up.
760  dataset : `str`, optional
761  Name of dataset to read/write.
762 
763  Raises
764  ------
765  TaskError
766  Raised if there is a version mismatch with current and persisted lists of package versions.
767 
768  Notes
769  -----
770  Note that this operation is subject to a race condition.
771  """
772  packages = Packages.fromSystem()
773 
774  if clobber:
775  return butler.put(packages, dataset, doBackup=doBackup)
776  if not butler.datasetExists(dataset, write=True):
777  return butler.put(packages, dataset)
778 
779  try:
780  old = butler.get(dataset, immediate=True)
781  except Exception as exc:
782  raise type(exc)(f"Unable to read stored version dataset {dataset} ({exc}); "
783  "consider using --clobber-versions or --no-versions")
784  # Note that because we can only detect python modules that have been imported, the stored
785  # list of products may be more or less complete than what we have now. What's important is
786  # that the products that are in common have the same version.
787  diff = packages.difference(old)
788  if diff:
789  versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff)
790  raise TaskError(
791  f"Version mismatch ({versions_str}); consider using --clobber-versions or --no-versions")
792  # Update the old set of packages in case we have more packages that haven't been persisted.
793  extra = packages.extra(old)
794  if extra:
795  old.update(packages)
796  butler.put(old, dataset, doBackup=doBackup)
797 
798  def _getConfigName(self):
799  """Get the name of the config dataset type, or `None` if config is not to be persisted.
800 
801  Notes
802  -----
803  The name may depend on the config; that is why this is not a class method.
804  """
805  return self._DefaultName + "_config"
806 
807  def _getMetadataName(self):
808  """Get the name of the metadata dataset type, or `None` if metadata is not to be persisted.
809 
810  Notes
811  -----
812  The name may depend on the config; that is why this is not a class method.
813  """
814  return self._DefaultName + "_metadata"
lsst.pipe.base.cmdLineTask.TaskRunner.numProcesses
numProcesses
Definition: cmdLineTask.py:163
lsst.pipe.base.cmdLineTask.CmdLineTask._getMetadataName
def _getMetadataName(self)
Definition: cmdLineTask.py:807
lsst.pipe.base.cmdLineTask.CmdLineTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: cmdLineTask.py:731
lsst::log.log.logContinued.warn
def warn(fmt, *args)
Definition: logContinued.py:202
lsst.pipe.base.cmdLineTask.CmdLineTask._getConfigName
def _getConfigName(self)
Definition: cmdLineTask.py:798
lsst.pipe.base.cmdLineTask.TaskRunner.TaskClass
TaskClass
Definition: cmdLineTask.py:156
lsst::base
Definition: library.h:13
lsst.pipe.base.cmdLineTask.TaskRunner.__init__
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: cmdLineTask.py:155
lsst.pipe.base.cmdLineTask.CmdLineTask.applyOverrides
def applyOverrides(cls, config)
Definition: cmdLineTask.py:527
lsst.pipe.base.cmdLineTask.TaskRunner.run
def run(self, parsedCmd)
Definition: cmdLineTask.py:183
lsst.pipe.base.cmdLineTask.TaskRunner.getTargetList
def getTargetList(parsedCmd, **kwargs)
Definition: cmdLineTask.py:233
lsst.pipe.base.cmdLineTask.TaskRunner.__call__
def __call__(self, args)
Definition: cmdLineTask.py:341
lsst.pipe.base.cmdLineTask.LegacyTaskRunner.runTask
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:455
lsst::log.log.logContinued.MDCRemove
def MDCRemove(key)
Definition: logContinued.py:166
lsst::utils.get_caller_name
Definition: get_caller_name.py:1
lsst.pipe.base.cmdLineTask.LegacyTaskRunner
Definition: cmdLineTask.py:450
pex.config.history.format
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
lsst.pipe.base.argumentParser.ArgumentParser
Definition: argumentParser.py:407
lsst.pipe.base.cmdLineTask.TaskRunner.config
config
Definition: cmdLineTask.py:158
lsst.pipe.base.cmdLineTask.CmdLineTask.parseAndRun
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
Definition: cmdLineTask.py:549
lsst.pipe.base.task.Task.getAllSchemaCatalogs
def getAllSchemaCatalogs(self)
Definition: task.py:188
lsst.pipe.base.cmdLineTask.TaskRunner.runTask
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:428
lsst.pipe.base.cmdLineTask.profile
def profile(filename, log=None)
Definition: cmdLineTask.py:49
lsst.pipe.base.cmdLineTask.CmdLineTask.writeSchemas
def writeSchemas(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:696
lsst.pipe.base.cmdLineTask.TaskRunner.doReturnResults
doReturnResults
Definition: cmdLineTask.py:157
lsst.pipe.base.cmdLineTask.CmdLineTask.RunnerClass
RunnerClass
Definition: cmdLineTask.py:523
lsst::base::disableImplicitThreading
bool disableImplicitThreading()
Disable threading that has not been set explicitly.
Definition: threads.cc:132
lsst.pipe.base.cmdLineTask.ButlerInitializedTaskRunner.makeTask
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:466
lsst.pipe.base.cmdLineTask.TaskRunner.precall
def precall(self, parsedCmd)
Definition: cmdLineTask.py:312
lsst::log.log.logContinued.fatal
def fatal(fmt, *args)
Definition: logContinued.py:214
lsst.pipe.base.cmdLineTask.TaskRunner.doBackup
doBackup
Definition: cmdLineTask.py:162
lsst.pipe.base.cmdLineTask.TaskRunner
Definition: cmdLineTask.py:87
lsst.pipe.base.struct.Struct
Definition: struct.py:26
lsst::log.log.logContinued.MDC
def MDC(key, value)
Definition: logContinued.py:162
lsst.pipe.base.cmdLineTask.TaskRunner.makeTask
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:282
lsst.pipe.base.cmdLineTask.CmdLineTask.writePackageVersions
def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages")
Definition: cmdLineTask.py:747
lsst.pipe.base.task.Task.config
config
Definition: task.py:149
lsst.pipe.base.task.TaskError
Definition: task.py:33
lsst.pipe.base.task.Task.log
log
Definition: task.py:148
lsst::log
Definition: Log.h:706
lsst::afw::table
Definition: table.dox:3
lsst.pipe.base.cmdLineTask.TaskRunner.prepareForMultiProcessing
def prepareForMultiProcessing(self)
Definition: cmdLineTask.py:174
lsst.pipe.base.cmdLineTask.TaskRunner.TIMEOUT
int TIMEOUT
Definition: cmdLineTask.py:152
lsst::utils
Definition: Backtrace.h:29
lsst.pipe.base.task.Task.getFullMetadata
def getFullMetadata(self)
Definition: task.py:210
lsst.pipe.base.cmdLineTask.ButlerInitializedTaskRunner
Definition: cmdLineTask.py:461
items
std::vector< SchemaItem< Flag > > * items
Definition: BaseColumnView.cc:142
lsst.pipe.base.cmdLineTask.CmdLineTask.writeConfig
def writeConfig(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:656
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.base.task.Task
Definition: task.py:46
type
table::Key< int > type
Definition: Detector.cc:163
lsst.pipe.base.cmdLineTask.TaskRunner.log
log
Definition: cmdLineTask.py:159
lsst.pipe.base.cmdLineTask.TaskRunner.clobberConfig
clobberConfig
Definition: cmdLineTask.py:161
lsst.pipe.base.cmdLineTask.TaskRunner.timeout
timeout
Definition: cmdLineTask.py:165
lsst.pipe.base.cmdLineTask.TaskRunner._precallImpl
def _precallImpl(self, task, parsedCmd)
Definition: cmdLineTask.py:302
lsst.pipe.base.cmdLineTask.TaskRunner.doRaise
doRaise
Definition: cmdLineTask.py:160
lsst.pipe.base.cmdLineTask.CmdLineTask._makeArgumentParser
def _makeArgumentParser(cls)
Definition: cmdLineTask.py:633
lsst.pipe.base.cmdLineTask.CmdLineTask
Definition: cmdLineTask.py:492