LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
cmdLineTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2015 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 __all__ = ["CmdLineTask", "TaskRunner", "ButlerInitializedTaskRunner", "LegacyTaskRunner"]
23 
24 import sys
25 import traceback
26 import functools
27 import contextlib
28 
29 import lsst.utils
30 from lsst.base import disableImplicitThreading
31 import lsst.afw.table as afwTable
32 from .task import Task, TaskError
33 from .struct import Struct
34 from .argumentParser import ArgumentParser
35 from lsst.base import Packages
36 from lsst.log import Log
37 
38 
39 def _runPool(pool, timeout, function, iterable):
40  """Wrapper around ``pool.map_async``, to handle timeout
41 
42  This is required so as to trigger an immediate interrupt on the
43  KeyboardInterrupt (Ctrl-C); see
44  http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
45  """
46  return pool.map_async(function, iterable).get(timeout)
47 
48 
49 @contextlib.contextmanager
50 def profile(filename, log=None):
51  """Context manager for profiling with cProfile.
52 
53 
54  Parameters
55  ----------
56  filename : `str`
57  Filename to which to write profile (profiling disabled if `None` or
58  empty).
59  log : `lsst.log.Log`, optional
60  Log object for logging the profile operations.
61 
62  If profiling is enabled, the context manager returns the cProfile.Profile
63  object (otherwise it returns None), which allows additional control over
64  profiling. You can obtain this using the "as" clause, e.g.:
65 
66  .. code-block:: python
67 
68  with profile(filename) as prof:
69  runYourCodeHere()
70 
71  The output cumulative profile can be printed with a command-line like:
72 
73  .. code-block:: bash
74 
75  python -c 'import pstats; \
76  pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
77  """
78  if not filename:
79  # Nothing to do
80  yield
81  return
82  from cProfile import Profile
83  profile = Profile()
84  if log is not None:
85  log.info("Enabling cProfile profiling")
86  profile.enable()
87  yield profile
88  profile.disable()
89  profile.dump_stats(filename)
90  if log is not None:
91  log.info("cProfile stats written to %s", filename)
92 
93 
94 class TaskRunner:
95  """Run a command-line task, using `multiprocessing` if requested.
96 
97  Parameters
98  ----------
99  TaskClass : `lsst.pipe.base.Task` subclass
100  The class of the task to run.
101  parsedCmd : `argparse.Namespace`
102  The parsed command-line arguments, as returned by the task's argument
103  parser's `~lsst.pipe.base.ArgumentParser.parse_args` method.
104 
105  .. warning::
106 
107  Do not store ``parsedCmd``, as this instance is pickled (if
108  multiprocessing) and parsedCmd may contain non-picklable elements.
109  It certainly contains more data than we need to send to each
110  instance of the task.
111  doReturnResults : `bool`, optional
112  Should run return the collected result from each invocation of the
113  task? This is only intended for unit tests and similar use. It can
114  easily exhaust memory (if the task returns enough data and you call it
115  enough times) and it will fail when using multiprocessing if the
116  returned data cannot be pickled.
117 
118  Note that even if ``doReturnResults`` is False a struct with a single
119  member "exitStatus" is returned, with value 0 or 1 to be returned to
120  the unix shell.
121 
122  Raises
123  ------
124  ImportError
125  Raised if multiprocessing is requested (and the task supports it) but
126  the multiprocessing library cannot be imported.
127 
128  Notes
129  -----
130  Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a
131  task runner. By default it is this class, but some tasks require a
132  subclass. See the manual :ref:`creating-a-command-line-task` for more
133  information. See `CmdLineTask.parseAndRun` to see how a task runner is
134  used.
135 
136  You may use this task runner for your command-line task if your task has a
137  ``runDataRef`` method that takes exactly one argument: a butler data
138  reference. Otherwise you must provide a task-specific subclass of
139  this runner for your task's ``RunnerClass`` that overrides
140  `TaskRunner.getTargetList` and possibly
141  `TaskRunner.__call__`. See `TaskRunner.getTargetList` for details.
142 
143  This design matches the common pattern for command-line tasks: the
144  ``runDataRef`` method takes a single data reference, of some suitable name.
145  Additional arguments are rare, and if present, require a subclass of
146  `TaskRunner` that calls these additional arguments by name.
147 
148  Instances of this class must be picklable in order to be compatible with
149  multiprocessing. If multiprocessing is requested
150  (``parsedCmd.numProcesses > 1``) then `runDataRef` calls
151  `prepareForMultiProcessing` to jettison optional non-picklable elements.
152  If your task runner is not compatible with multiprocessing then indicate
153  this in your task by setting class variable ``canMultiprocess=False``.
154 
155  Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires
156  specifying a timeout`__. This timeout (in sec) can be specified as the
157  ``timeout`` element in the output from `~lsst.pipe.base.ArgumentParser`
158  (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`.
159 
160  By default, we disable "implicit" threading -- ie, as provided by
161  underlying numerical libraries such as MKL or BLAS. This is designed to
162  avoid thread contention both when a single command line task spawns
163  multiple processes and when multiple users are running on a shared system.
164  Users can override this behaviour by setting the
165  ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable.
166 
167  .. __: http://bugs.python.org/issue8296
168  .. __: http://stackoverflow.com/questions/1408356/
169  """
170 
171  TIMEOUT = 3600*24*30
172  """Default timeout (seconds) for multiprocessing."""
173 
174  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
175  self.TaskClassTaskClass = TaskClass
176  self.doReturnResultsdoReturnResults = bool(doReturnResults)
177  self.configconfig = parsedCmd.config
178  self.loglog = parsedCmd.log
179  self.doRaisedoRaise = bool(parsedCmd.doraise)
180  self.clobberConfigclobberConfig = bool(parsedCmd.clobberConfig)
181  self.doBackupdoBackup = not bool(parsedCmd.noBackupConfig)
182  self.numProcessesnumProcesses = int(getattr(parsedCmd, 'processes', 1))
183 
184  self.timeouttimeout = getattr(parsedCmd, 'timeout', None)
185  if self.timeouttimeout is None or self.timeouttimeout <= 0:
186  self.timeouttimeout = self.TIMEOUTTIMEOUT
187 
188  if self.numProcessesnumProcesses > 1:
189  if not TaskClass.canMultiprocess:
190  self.loglog.warn("This task does not support multiprocessing; using one process")
191  self.numProcessesnumProcesses = 1
192 
194  """Prepare this instance for multiprocessing
195 
196  Optional non-picklable elements are removed.
197 
198  This is only called if the task is run under multiprocessing.
199  """
200  self.loglog = None
201 
202  def run(self, parsedCmd):
203  """Run the task on all targets.
204 
205  Parameters
206  ----------
207  parsedCmd : `argparse.Namespace`
208  Parsed command `argparse.Namespace`.
209 
210  Returns
211  -------
212  resultList : `list`
213  A list of results returned by `TaskRunner.__call__`, or an empty
214  list if `TaskRunner.__call__` is not called (e.g. if
215  `TaskRunner.precall` returns `False`). See `TaskRunner.__call__`
216  for details.
217 
218  Notes
219  -----
220  The task is run under multiprocessing if `TaskRunner.numProcesses`
221  is more than 1; otherwise processing is serial.
222  """
223  resultList = []
224  disableImplicitThreading() # To prevent thread contention
225  if self.numProcessesnumProcesses > 1:
226  import multiprocessing
227  self.prepareForMultiProcessingprepareForMultiProcessing()
228  pool = multiprocessing.Pool(processes=self.numProcessesnumProcesses, maxtasksperchild=1)
229  mapFunc = functools.partial(_runPool, pool, self.timeouttimeout)
230  else:
231  pool = None
232  mapFunc = map
233 
234  if self.precallprecall(parsedCmd):
235  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
236  log = parsedCmd.log
237  targetList = self.getTargetListgetTargetList(parsedCmd)
238  if len(targetList) > 0:
239  with profile(profileName, log):
240  # Run the task using self.__call__
241  resultList = list(mapFunc(self, targetList))
242  else:
243  log.warn("Not running the task because there is no data to process; "
244  "you may preview data using \"--show data\"")
245 
246  if pool is not None:
247  pool.close()
248  pool.join()
249 
250  return resultList
251 
252  @staticmethod
253  def getTargetList(parsedCmd, **kwargs):
254  """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`.
255 
256  Parameters
257  ----------
258  parsedCmd : `argparse.Namespace`
259  The parsed command object returned by
260  `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`.
261  kwargs
262  Any additional keyword arguments. In the default `TaskRunner` this
263  is an empty dict, but having it simplifies overriding `TaskRunner`
264  for tasks whose runDataRef method takes additional arguments
265  (see case (1) below).
266 
267  Notes
268  -----
269  The default implementation of `TaskRunner.getTargetList` and
270  `TaskRunner.__call__` works for any command-line task whose
271  ``runDataRef`` method takes exactly one argument: a data reference.
272  Otherwise you must provide a variant of TaskRunner that overrides
273  `TaskRunner.getTargetList` and possibly `TaskRunner.__call__`.
274  There are two cases.
275 
276  **Case 1**
277 
278  If your command-line task has a ``runDataRef`` method that takes one
279  data reference followed by additional arguments, then you need only
280  override `TaskRunner.getTargetList` to return the additional
281  arguments as an argument dict. To make this easier, your overridden
282  version of `~TaskRunner.getTargetList` may call
283  `TaskRunner.getTargetList` with the extra arguments as keyword
284  arguments. For example, the following adds an argument dict containing
285  a single key: "calExpList", whose value is the list of data IDs for
286  the calexp ID argument:
287 
288  .. code-block:: python
289 
290  def getTargetList(parsedCmd):
291  return TaskRunner.getTargetList(
292  parsedCmd,
293  calExpList=parsedCmd.calexp.idList
294  )
295 
296  It is equivalent to this slightly longer version:
297 
298  .. code-block:: python
299 
300  @staticmethod
301  def getTargetList(parsedCmd):
302  argDict = dict(calExpList=parsedCmd.calexp.idList)
303  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
304 
305  **Case 2**
306 
307  If your task does not meet condition (1) then you must override both
308  TaskRunner.getTargetList and `TaskRunner.__call__`. You may do this
309  however you see fit, so long as `TaskRunner.getTargetList`
310  returns a list, each of whose elements is sent to
311  `TaskRunner.__call__`, which runs your task.
312  """
313  return [(ref, kwargs) for ref in parsedCmd.id.refList]
314 
315  def makeTask(self, parsedCmd=None, args=None):
316  """Create a Task instance.
317 
318  Parameters
319  ----------
320  parsedCmd
321  Parsed command-line options (used for extra task args by some task
322  runners).
323  args
324  Args tuple passed to `TaskRunner.__call__` (used for extra task
325  arguments by some task runners).
326 
327  Notes
328  -----
329  ``makeTask`` can be called with either the ``parsedCmd`` argument or
330  ``args`` argument set to None, but it must construct identical Task
331  instances in either case.
332 
333  Subclasses may ignore this method entirely if they reimplement both
334  `TaskRunner.precall` and `TaskRunner.__call__`.
335  """
336  return self.TaskClassTaskClass(config=self.configconfig, log=self.loglog)
337 
338  def _precallImpl(self, task, parsedCmd):
339  """The main work of `precall`.
340 
341  We write package versions, schemas and configs, or compare these to
342  existing files on disk if present.
343  """
344  if not parsedCmd.noVersions:
345  task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
346  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfigclobberConfig, doBackup=self.doBackupdoBackup)
347  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfigclobberConfig, doBackup=self.doBackupdoBackup)
348 
349  def precall(self, parsedCmd):
350  """Hook for code that should run exactly once, before multiprocessing.
351 
352  Notes
353  -----
354  Must return True if `TaskRunner.__call__` should subsequently be
355  called.
356 
357  .. warning::
358 
359  Implementations must take care to ensure that no unpicklable
360  attributes are added to the TaskRunner itself, for compatibility
361  with multiprocessing.
362 
363  The default implementation writes package versions, schemas and
364  configs, or compares them to existing files on disk if present.
365  """
366  task = self.makeTaskmakeTask(parsedCmd=parsedCmd)
367 
368  if self.doRaisedoRaise:
369  self._precallImpl_precallImpl(task, parsedCmd)
370  else:
371  try:
372  self._precallImpl_precallImpl(task, parsedCmd)
373  except Exception as e:
374  task.log.fatal("Failed in task initialization: %s", e)
375  if not isinstance(e, TaskError):
376  traceback.print_exc(file=sys.stderr)
377  return False
378  return True
379 
380  def __call__(self, args):
381  """Run the Task on a single target.
382 
383  Parameters
384  ----------
385  args
386  Arguments for Task.runDataRef()
387 
388  Returns
389  -------
390  struct : `lsst.pipe.base.Struct`
391  Contains these fields if ``doReturnResults`` is `True`:
392 
393  - ``dataRef``: the provided data reference.
394  - ``metadata``: task metadata after execution of run.
395  - ``result``: result returned by task run, or `None` if the task
396  fails.
397  - ``exitStatus``: 0 if the task completed successfully, 1
398  otherwise.
399 
400  If ``doReturnResults`` is `False` the struct contains:
401 
402  - ``exitStatus``: 0 if the task completed successfully, 1
403  otherwise.
404 
405  Notes
406  -----
407  This default implementation assumes that the ``args`` is a tuple
408  containing a data reference and a dict of keyword arguments.
409 
410  .. warning::
411 
412  If you override this method and wish to return something when
413  ``doReturnResults`` is `False`, then it must be picklable to
414  support multiprocessing and it should be small enough that pickling
415  and unpickling do not add excessive overhead.
416  """
417  dataRef, kwargs = args
418  if self.loglog is None:
419  self.loglog = Log.getDefaultLogger()
420  if hasattr(dataRef, "dataId"):
421  self.loglog.MDC("LABEL", str(dataRef.dataId))
422  elif isinstance(dataRef, (list, tuple)):
423  self.loglog.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
424  task = self.makeTaskmakeTask(args=args)
425  result = None # in case the task fails
426  exitStatus = 0 # exit status for the shell
427  if self.doRaisedoRaise:
428  result = self.runTaskrunTask(task, dataRef, kwargs)
429  else:
430  try:
431  result = self.runTaskrunTask(task, dataRef, kwargs)
432  except Exception as e:
433  # The shell exit value will be the number of dataRefs returning
434  # non-zero, so the actual value used here is lost.
435  exitStatus = 1
436 
437  # don't use a try block as we need to preserve the original
438  # exception
439  eName = type(e).__name__
440  if hasattr(dataRef, "dataId"):
441  task.log.fatal("Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
442  elif isinstance(dataRef, (list, tuple)):
443  task.log.fatal("Failed on dataIds=[%s]: %s: %s",
444  ", ".join(str(ref.dataId) for ref in dataRef), eName, e)
445  else:
446  task.log.fatal("Failed on dataRef=%s: %s: %s", dataRef, eName, e)
447 
448  if not isinstance(e, TaskError):
449  traceback.print_exc(file=sys.stderr)
450 
451  # Ensure all errors have been logged and aren't hanging around in a
452  # buffer
453  sys.stdout.flush()
454  sys.stderr.flush()
455 
456  task.writeMetadata(dataRef)
457 
458  # remove MDC so it does not show up outside of task context
459  self.loglog.MDCRemove("LABEL")
460 
461  if self.doReturnResultsdoReturnResults:
462  return Struct(
463  exitStatus=exitStatus,
464  dataRef=dataRef,
465  metadata=task.metadata,
466  result=result,
467  )
468  else:
469  return Struct(
470  exitStatus=exitStatus,
471  )
472 
473  def runTask(self, task, dataRef, kwargs):
474  """Make the actual call to `runDataRef` for this task.
475 
476  Parameters
477  ----------
478  task : `lsst.pipe.base.CmdLineTask` class
479  The class of the task to run.
480  dataRef
481  Butler data reference that contains the data the task will process.
482  kwargs
483  Any additional keyword arguments. See `TaskRunner.getTargetList`
484  above.
485 
486  Notes
487  -----
488  The default implementation of `TaskRunner.runTask` works for any
489  command-line task which has a ``runDataRef`` method that takes a data
490  reference and an optional set of additional keyword arguments.
491  This method returns the results generated by the task's `runDataRef`
492  method.
493 
494  """
495  return task.runDataRef(dataRef, **kwargs)
496 
497 
499  r"""A `TaskRunner` for `CmdLineTask`\ s which calls the `Task`\ 's `run`
500  method on a `dataRef` rather than the `runDataRef` method.
501  """
502 
503  def runTask(self, task, dataRef, kwargs):
504  """Call `run` for this task instead of `runDataRef`. See
505  `TaskRunner.runTask` above for details.
506  """
507  return task.run(dataRef, **kwargs)
508 
509 
511  r"""A `TaskRunner` for `CmdLineTask`\ s that require a ``butler`` keyword
512  argument to be passed to their constructor.
513  """
514 
515  def makeTask(self, parsedCmd=None, args=None):
516  """A variant of the base version that passes a butler argument to the
517  task's constructor.
518 
519  Parameters
520  ----------
521  parsedCmd : `argparse.Namespace`
522  Parsed command-line options, as returned by the
523  `~lsst.pipe.base.ArgumentParser`; if specified then args is
524  ignored.
525  args
526  Other arguments; if ``parsedCmd`` is `None` then this must be
527  specified.
528 
529  Raises
530  ------
531  RuntimeError
532  Raised if ``parsedCmd`` and ``args`` are both `None`.
533  """
534  if parsedCmd is not None:
535  butler = parsedCmd.butler
536  elif args is not None:
537  dataRef, kwargs = args
538  butler = dataRef.butlerSubset.butler
539  else:
540  raise RuntimeError("parsedCmd or args must be specified")
541  return self.TaskClassTaskClass(config=self.configconfig, log=self.loglog, butler=butler)
542 
543 
545  """Base class for command-line tasks: tasks that may be executed from the
546  command-line.
547 
548  Notes
549  -----
550  See :ref:`task-framework-overview` to learn what tasks are and
551  :ref:`creating-a-command-line-task` for more information about writing
552  command-line tasks.
553 
554  Subclasses must specify the following class variables:
555 
556  - ``ConfigClass``: configuration class for your task (a subclass of
557  `lsst.pex.config.Config`, or if your task needs no configuration, then
558  `lsst.pex.config.Config` itself).
559  - ``_DefaultName``: default name used for this task (a `str`).
560 
561  Subclasses may also specify the following class variables:
562 
563  - ``RunnerClass``: a task runner class. The default is ``TaskRunner``,
564  which works for any task with a runDataRef method that takes exactly one
565  argument: a data reference. If your task does not meet this requirement
566  then you must supply a variant of ``TaskRunner``; see ``TaskRunner``
567  for more information.
568  - ``canMultiprocess``: the default is `True`; set `False` if your task
569  does not support multiprocessing.
570 
571  Subclasses must specify a method named ``runDataRef``:
572 
573  - By default ``runDataRef`` accepts a single butler data reference, but
574  you can specify an alternate task runner (subclass of ``TaskRunner``) as
575  the value of class variable ``RunnerClass`` if your run method needs
576  something else.
577  - ``runDataRef`` is expected to return its data in a
578  `lsst.pipe.base.Struct`. This provides safety for evolution of the task
579  since new values may be added without harming existing code.
580  - The data returned by ``runDataRef`` must be picklable if your task is to
581  support multiprocessing.
582  """
583  RunnerClass = TaskRunner
584  canMultiprocess = True
585 
586  @classmethod
587  def applyOverrides(cls, config):
588  """A hook to allow a task to change the values of its config *after*
589  the camera-specific overrides are loaded but before any command-line
590  overrides are applied.
591 
592  Parameters
593  ----------
594  config : instance of task's ``ConfigClass``
595  Task configuration.
596 
597  Notes
598  -----
599  This is necessary in some cases because the camera-specific overrides
600  may retarget subtasks, wiping out changes made in
601  ConfigClass.setDefaults. See LSST Trac ticket #2282 for more
602  discussion.
603 
604  .. warning::
605 
606  This is called by CmdLineTask.parseAndRun; other ways of
607  constructing a config will not apply these overrides.
608  """
609  pass
610 
611  @classmethod
612  def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
613  """Parse an argument list and run the command.
614 
615  Parameters
616  ----------
617  args : `list`, optional
618  List of command-line arguments; if `None` use `sys.argv`.
619  config : `lsst.pex.config.Config`-type, optional
620  Config for task. If `None` use `Task.ConfigClass`.
621  log : `lsst.log.Log`-type, optional
622  Log. If `None` use the default log.
623  doReturnResults : `bool`, optional
624  If `True`, return the results of this task. Default is `False`.
625  This is only intended for unit tests and similar use. It can
626  easily exhaust memory (if the task returns enough data and you
627  call it enough times) and it will fail when using multiprocessing
628  if the returned data cannot be pickled.
629 
630  Returns
631  -------
632  struct : `lsst.pipe.base.Struct`
633  Fields are:
634 
635  ``argumentParser``
636  the argument parser (`lsst.pipe.base.ArgumentParser`).
637  ``parsedCmd``
638  the parsed command returned by the argument parser's
639  `~lsst.pipe.base.ArgumentParser.parse_args` method
640  (`argparse.Namespace`).
641  ``taskRunner``
642  the task runner used to run the task (an instance of
643  `Task.RunnerClass`).
644  ``resultList``
645  results returned by the task runner's ``run`` method, one entry
646  per invocation (`list`). This will typically be a list of
647  `Struct`, each containing at least an ``exitStatus`` integer
648  (0 or 1); see `Task.RunnerClass` (`TaskRunner` by default) for
649  more details.
650 
651  Notes
652  -----
653  Calling this method with no arguments specified is the standard way to
654  run a command-line task from the command-line. For an example see
655  ``pipe_tasks`` ``bin/makeSkyMap.py`` or almost any other file in that
656  directory.
657 
658  If one or more of the dataIds fails then this routine will exit (with
659  a status giving the number of failed dataIds) rather than returning
660  this struct; this behaviour can be overridden by specifying the
661  ``--noExit`` command-line option.
662  """
663  if args is None:
664  commandAsStr = " ".join(sys.argv)
665  args = sys.argv[1:]
666  else:
667  commandAsStr = "{}{}".format(lsst.utils.get_caller_name(skip=1), tuple(args))
668 
669  argumentParser = cls._makeArgumentParser_makeArgumentParser()
670  if config is None:
671  config = cls.ConfigClass()
672  parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverridesapplyOverrides)
673  # print this message after parsing the command so the log is fully
674  # configured
675  parsedCmd.log.info("Running: %s", commandAsStr)
676 
677  taskRunner = cls.RunnerClassRunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
678  resultList = taskRunner.run(parsedCmd)
679 
680  try:
681  nFailed = sum(((res.exitStatus != 0) for res in resultList))
682  except (TypeError, AttributeError) as e:
683  # NOTE: TypeError if resultList is None, AttributeError if it
684  # doesn't have exitStatus.
685  parsedCmd.log.warn("Unable to retrieve exit status (%s); assuming success", e)
686  nFailed = 0
687 
688  if nFailed > 0:
689  if parsedCmd.noExit:
690  parsedCmd.log.error("%d dataRefs failed; not exiting as --noExit was set", nFailed)
691  else:
692  sys.exit(nFailed)
693 
694  return Struct(
695  argumentParser=argumentParser,
696  parsedCmd=parsedCmd,
697  taskRunner=taskRunner,
698  resultList=resultList,
699  )
700 
701  @classmethod
702  def _makeArgumentParser(cls):
703  """Create and return an argument parser.
704 
705  Returns
706  -------
707  parser : `lsst.pipe.base.ArgumentParser`
708  The argument parser for this task.
709 
710  Notes
711  -----
712  By default this returns an `~lsst.pipe.base.ArgumentParser` with one
713  ID argument named `--id` of dataset type ``raw``.
714 
715  Your task subclass may need to override this method to change the
716  dataset type or data ref level, or to add additional data ID arguments.
717  If you add additional data ID arguments or your task's runDataRef
718  method takes more than a single data reference then you will also have
719  to provide a task-specific task runner (see TaskRunner for more
720  information).
721  """
722  parser = ArgumentParser(name=cls._DefaultName)
723  parser.add_id_argument(name="--id", datasetType="raw",
724  help="data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
725  return parser
726 
727  def writeConfig(self, butler, clobber=False, doBackup=True):
728  """Write the configuration used for processing the data, or check that
729  an existing one is equal to the new one if present.
730 
731  Parameters
732  ----------
733  butler : `lsst.daf.persistence.Butler`
734  Data butler used to write the config. The config is written to
735  dataset type `CmdLineTask._getConfigName`.
736  clobber : `bool`, optional
737  A boolean flag that controls what happens if a config already has
738  been saved:
739 
740  - `True`: overwrite or rename the existing config, depending on
741  ``doBackup``.
742  - `False`: raise `TaskError` if this config does not match the
743  existing config.
744  doBackup : `bool`, optional
745  Set to `True` to backup the config files if clobbering.
746  """
747  configName = self._getConfigName_getConfigName()
748  if configName is None:
749  return
750  if clobber:
751  butler.put(self.configconfig, configName, doBackup=doBackup)
752  elif butler.datasetExists(configName, write=True):
753  # this may be subject to a race condition; see #2789
754  try:
755  oldConfig = butler.get(configName, immediate=True)
756  except Exception as exc:
757  raise type(exc)(f"Unable to read stored config file {configName} (exc); "
758  "consider using --clobber-config")
759 
760  def logConfigMismatch(msg):
761  self.loglog.fatal("Comparing configuration: %s", msg)
762 
763  if not self.configconfig.compare(oldConfig, shortcut=False, output=logConfigMismatch):
764  raise TaskError(
765  f"Config does not match existing task config {configName!r} on disk; "
766  "tasks configurations must be consistent within the same output repo "
767  "(override with --clobber-config)")
768  else:
769  butler.put(self.configconfig, configName)
770 
771  def writeSchemas(self, butler, clobber=False, doBackup=True):
772  """Write the schemas returned by
773  `lsst.pipe.base.Task.getAllSchemaCatalogs`.
774 
775  Parameters
776  ----------
777  butler : `lsst.daf.persistence.Butler`
778  Data butler used to write the schema. Each schema is written to the
779  dataset type specified as the key in the dict returned by
780  `~lsst.pipe.base.Task.getAllSchemaCatalogs`.
781  clobber : `bool`, optional
782  A boolean flag that controls what happens if a schema already has
783  been saved:
784 
785  - `True`: overwrite or rename the existing schema, depending on
786  ``doBackup``.
787  - `False`: raise `TaskError` if this schema does not match the
788  existing schema.
789  doBackup : `bool`, optional
790  Set to `True` to backup the schema files if clobbering.
791 
792  Notes
793  -----
794  If ``clobber`` is `False` and an existing schema does not match a
795  current schema, then some schemas may have been saved successfully
796  and others may not, and there is no easy way to tell which is which.
797  """
798  for dataset, catalog in self.getAllSchemaCatalogsgetAllSchemaCatalogs().items():
799  schemaDataset = dataset + "_schema"
800  if clobber:
801  butler.put(catalog, schemaDataset, doBackup=doBackup)
802  elif butler.datasetExists(schemaDataset, write=True):
803  oldSchema = butler.get(schemaDataset, immediate=True).getSchema()
804  if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
805  raise TaskError(
806  f"New schema does not match schema {dataset!r} on disk; "
807  "schemas must be consistent within the same output repo "
808  "(override with --clobber-config)")
809  else:
810  butler.put(catalog, schemaDataset)
811 
812  def writeMetadata(self, dataRef):
813  """Write the metadata produced from processing the data.
814 
815  Parameters
816  ----------
817  dataRef
818  Butler data reference used to write the metadata.
819  The metadata is written to dataset type
820  `CmdLineTask._getMetadataName`.
821  """
822  try:
823  metadataName = self._getMetadataName_getMetadataName()
824  if metadataName is not None:
825  dataRef.put(self.getFullMetadatagetFullMetadata(), metadataName)
826  except Exception as e:
827  self.loglog.warn("Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
828 
829  def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages"):
830  """Compare and write package versions.
831 
832  Parameters
833  ----------
834  butler : `lsst.daf.persistence.Butler`
835  Data butler used to read/write the package versions.
836  clobber : `bool`, optional
837  A boolean flag that controls what happens if versions already have
838  been saved:
839 
840  - `True`: overwrite or rename the existing version info, depending
841  on ``doBackup``.
842  - `False`: raise `TaskError` if this version info does not match
843  the existing.
844  doBackup : `bool`, optional
845  If `True` and clobbering, old package version files are backed up.
846  dataset : `str`, optional
847  Name of dataset to read/write.
848 
849  Raises
850  ------
851  TaskError
852  Raised if there is a version mismatch with current and persisted
853  lists of package versions.
854 
855  Notes
856  -----
857  Note that this operation is subject to a race condition.
858  """
859  packages = Packages.fromSystem()
860 
861  if clobber:
862  return butler.put(packages, dataset, doBackup=doBackup)
863  if not butler.datasetExists(dataset, write=True):
864  return butler.put(packages, dataset)
865 
866  try:
867  old = butler.get(dataset, immediate=True)
868  except Exception as exc:
869  raise type(exc)(f"Unable to read stored version dataset {dataset} ({exc}); "
870  "consider using --clobber-versions or --no-versions")
871  # Note that because we can only detect python modules that have been
872  # imported, the stored list of products may be more or less complete
873  # than what we have now. What's important is that the products that
874  # are in common have the same version.
875  diff = packages.difference(old)
876  if diff:
877  versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff)
878  raise TaskError(
879  f"Version mismatch ({versions_str}); consider using --clobber-versions or --no-versions")
880  # Update the old set of packages in case we have more packages that
881  # haven't been persisted.
882  extra = packages.extra(old)
883  if extra:
884  old.update(packages)
885  butler.put(old, dataset, doBackup=doBackup)
886 
887  def _getConfigName(self):
888  """Get the name of the config dataset type, or `None` if config is not
889  to be persisted.
890 
891  Notes
892  -----
893  The name may depend on the config; that is why this is not a class
894  method.
895  """
896  return self._DefaultName + "_config"
897 
898  def _getMetadataName(self):
899  """Get the name of the metadata dataset type, or `None` if metadata is
900  not to be persisted.
901 
902  Notes
903  -----
904  The name may depend on the config; that is why this is not a class
905  method.
906  """
907  return self._DefaultName + "_metadata"
std::vector< SchemaItem< Flag > > * items
table::Key< int > type
Definition: Detector.cc:163
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:515
def writeConfig(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:727
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
Definition: cmdLineTask.py:612
def writeSchemas(self, butler, clobber=False, doBackup=True)
Definition: cmdLineTask.py:771
def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages")
Definition: cmdLineTask.py:829
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:503
def _precallImpl(self, task, parsedCmd)
Definition: cmdLineTask.py:338
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: cmdLineTask.py:174
def getTargetList(parsedCmd, **kwargs)
Definition: cmdLineTask.py:253
def runTask(self, task, dataRef, kwargs)
Definition: cmdLineTask.py:473
def makeTask(self, parsedCmd=None, args=None)
Definition: cmdLineTask.py:315
def getAllSchemaCatalogs(self)
Definition: task.py:204
def getFullMetadata(self)
Definition: task.py:229
daf::base::PropertyList * list
Definition: fits.cc:913
bool disableImplicitThreading()
Disable threading that has not been set explicitly.
Definition: threads.cc:132
Definition: Log.h:706
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def profile(filename, log=None)
Definition: cmdLineTask.py:50