LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
multiBandDriver.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 import os
3 from argparse import ArgumentError
4 
5 from builtins import zip
6 
7 from lsst.pex.config import Config, Field, ConfigurableField
8 from lsst.pipe.base import ArgumentParser, TaskRunner
9 from lsst.pipe.tasks.multiBand import (DetectCoaddSourcesTask,
10  MergeDetectionsTask,
11  DeblendCoaddSourcesTask,
12  MeasureMergedCoaddSourcesTask,
13  MergeMeasurementsTask,)
14 from lsst.ctrl.pool.parallel import BatchPoolTask
15 from lsst.ctrl.pool.pool import Pool, abortOnError
16 from lsst.meas.base.references import MultiBandReferencesTask
17 from lsst.meas.base.forcedPhotCoadd import ForcedPhotCoaddTask
18 from lsst.pipe.drivers.utils import getDataRef, TractDataIdContainer
19 from lsst.pipe.tasks.coaddBase import CoaddDataIdContainer
20 
21 import lsst.afw.table as afwTable
22 
23 
24 class MultiBandDataIdContainer(CoaddDataIdContainer):
25 
26  def makeDataRefList(self, namespace):
27  """!Make self.refList from self.idList
28 
29  It's difficult to make a data reference that merely points to an entire
30  tract: there is no data product solely at the tract level. Instead, we
31  generate a list of data references for patches within the tract.
32 
33  @param namespace namespace object that is the result of an argument parser
34  """
35  datasetType = namespace.config.coaddName + "Coadd_calexp"
36 
37  def getPatchRefList(tract):
38  return [namespace.butler.dataRef(datasetType=datasetType,
39  tract=tract.getId(),
40  filter=dataId["filter"],
41  patch="%d,%d" % patch.getIndex())
42  for patch in tract]
43 
44  tractRefs = {} # Data references for each tract
45  for dataId in self.idList:
46  # There's no registry of coadds by filter, so we need to be given
47  # the filter
48  if "filter" not in dataId:
49  raise ArgumentError(None, "--id must include 'filter'")
50 
51  skymap = self.getSkymap(namespace, datasetType)
52 
53  if "tract" in dataId:
54  tractId = dataId["tract"]
55  if tractId not in tractRefs:
56  tractRefs[tractId] = []
57  if "patch" in dataId:
58  tractRefs[tractId].append(namespace.butler.dataRef(datasetType=datasetType,
59  tract=tractId,
60  filter=dataId[
61  'filter'],
62  patch=dataId['patch']))
63  else:
64  tractRefs[tractId] += getPatchRefList(skymap[tractId])
65  else:
66  tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
67  for tract in skymap)
68 
69  self.refList = list(tractRefs.values())
70 
71 
73  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
74  doDetection = Field(dtype=bool, default=False,
75  doc="Re-run detection? (requires *Coadd dataset to have been written)")
76  detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
77  doc="Detect sources on coadd")
78  mergeCoaddDetections = ConfigurableField(
79  target=MergeDetectionsTask, doc="Merge detections")
80  deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc="Deblend merged detections")
81  measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
82  doc="Measure merged and (optionally) deblended detections")
83  mergeCoaddMeasurements = ConfigurableField(
84  target=MergeMeasurementsTask, doc="Merge measurements")
85  forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
86  doc="Forced measurement on coadded images")
87  reprocessing = Field(
88  dtype=bool, default=False,
89  doc=("Are we reprocessing?\n\n"
90  "This exists as a workaround for large deblender footprints causing large memory use "
91  "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
92  "and return to reprocess on a machine with larger memory or more time "
93  "if we consider those footprints important to recover."),
94  )
95 
96  def setDefaults(self):
97  Config.setDefaults(self)
98  self.forcedPhotCoadd.references.retarget(MultiBandReferencesTask)
99 
100  def validate(self):
101  for subtask in ("mergeCoaddDetections", "deblendCoaddSources", "measureCoaddSources",
102  "mergeCoaddMeasurements", "forcedPhotCoadd"):
103  coaddName = getattr(self, subtask).coaddName
104  if coaddName != self.coaddName:
105  raise RuntimeError("%s.coaddName (%s) doesn't match root coaddName (%s)" %
106  (subtask, coaddName, self.coaddName))
107 
108 
110  """TaskRunner for running MultiBandTask
111 
112  This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
113  except that we have a list of data references instead of a single
114  data reference being passed to the Task.run, and we pass the results
115  of the '--reuse-outputs-from' command option to the Task constructor.
116  """
117 
118  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
119  TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
120  self.reuse = parsedCmd.reuse
121 
122  def makeTask(self, parsedCmd=None, args=None):
123  """A variant of the base version that passes a butler argument to the task's constructor
124  parsedCmd or args must be specified.
125  """
126  if parsedCmd is not None:
127  butler = parsedCmd.butler
128  elif args is not None:
129  dataRefList, kwargs = args
130  butler = dataRefList[0].butlerSubset.butler
131  else:
132  raise RuntimeError("parsedCmd or args must be specified")
133  return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.reuse)
134 
135 
136 def unpickle(factory, args, kwargs):
137  """Unpickle something by calling a factory"""
138  return factory(*args, **kwargs)
139 
140 
142  """Multi-node driver for multiband processing"""
143  ConfigClass = MultiBandDriverConfig
144  _DefaultName = "multiBandDriver"
145  RunnerClass = MultiBandDriverTaskRunner
146 
147  def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
148  """!
149  @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor
150  in case it is needed.
151  @param[in] schema: the schema of the source detection catalog used as input.
152  @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
153  catalog. May be None if the butler argument is provided or all steps requiring a reference
154  catalog are disabled.
155  """
156  BatchPoolTask.__init__(self, **kwargs)
157  if schema is None:
158  assert butler is not None, "Butler not provided"
159  schema = butler.get(self.config.coaddName +
160  "Coadd_det_schema", immediate=True).schema
161  self.butler = butler
162  self.reuse = tuple(reuse)
163  self.makeSubtask("detectCoaddSources")
164  self.makeSubtask("mergeCoaddDetections", schema=schema)
165  if self.config.measureCoaddSources.inputCatalog.startswith("deblended"):
166  # Ensure that the output from deblendCoaddSources matches the input to measureCoaddSources
167  self.measurementInput = self.config.measureCoaddSources.inputCatalog
168  self.deblenderOutput = []
169  if self.config.deblendCoaddSources.simultaneous:
170  if self.config.deblendCoaddSources.multiBandDeblend.conserveFlux:
171  self.deblenderOutput.append("deblendedFlux")
172  if self.config.deblendCoaddSources.multiBandDeblend.saveTemplates:
173  self.deblenderOutput.append("deblendedModel")
174  else:
175  self.deblenderOutput.append("deblendedFlux")
176  if self.measurementInput not in self.deblenderOutput:
177  err = "Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
178  raise ValueError(err.format(self.measurementInput, self.deblenderOutput))
179 
180  self.makeSubtask("deblendCoaddSources",
181  schema=afwTable.Schema(self.mergeCoaddDetections.schema),
182  peakSchema=afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
183  butler=butler)
184  measureInputSchema = afwTable.Schema(self.deblendCoaddSources.schema)
185  else:
186  measureInputSchema = afwTable.Schema(self.mergeCoaddDetections.schema)
187  self.makeSubtask("measureCoaddSources", schema=measureInputSchema,
188  peakSchema=afwTable.Schema(
189  self.mergeCoaddDetections.merged.getPeakSchema()),
190  refObjLoader=refObjLoader, butler=butler)
191  self.makeSubtask("mergeCoaddMeasurements", schema=afwTable.Schema(
192  self.measureCoaddSources.schema))
193  self.makeSubtask("forcedPhotCoadd", refSchema=afwTable.Schema(
194  self.mergeCoaddMeasurements.schema))
195 
196  def __reduce__(self):
197  """Pickler"""
198  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
199  parentTask=self._parentTask, log=self.log,
200  butler=self.butler, reuse=self.reuse))
201 
202  @classmethod
203  def _makeArgumentParser(cls, *args, **kwargs):
204  kwargs.pop("doBatch", False)
205  parser = ArgumentParser(name=cls._DefaultName, *args, **kwargs)
206  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
207  ContainerClass=TractDataIdContainer)
208  parser.addReuseOption(["detectCoaddSources", "mergeCoaddDetections", "measureCoaddSources",
209  "mergeCoaddMeasurements", "forcedPhotCoadd", "deblendCoaddSources"])
210  return parser
211 
212  @classmethod
213  def batchWallTime(cls, time, parsedCmd, numCpus):
214  """!Return walltime request for batch job
215 
216  @param time: Requested time per iteration
217  @param parsedCmd: Results of argument parsing
218  @param numCores: Number of cores
219  """
220  numTargets = 0
221  for refList in parsedCmd.id.refList:
222  numTargets += len(refList)
223  return time*numTargets/float(numCpus)
224 
225  @abortOnError
226  def runDataRef(self, patchRefList):
227  """!Run multiband processing on coadds
228 
229  Only the master node runs this method.
230 
231  No real MPI communication (scatter/gather) takes place: all I/O goes
232  through the disk. We want the intermediate stages on disk, and the
233  component Tasks are implemented around this, so we just follow suit.
234 
235  @param patchRefList: Data references to run measurement
236  """
237  for patchRef in patchRefList:
238  if patchRef:
239  butler = patchRef.getButler()
240  break
241  else:
242  raise RuntimeError("No valid patches")
243  pool = Pool("all")
244  pool.cacheClear()
245  pool.storeSet(butler=butler)
246 
247  # MultiBand measurements require that the detection stage be completed
248  # before measurements can be made.
249  #
250  # The configuration for coaddDriver.py allows detection to be turned
251  # of in the event that fake objects are to be added during the
252  # detection process. This allows the long co-addition process to be
253  # run once, and multiple different MultiBand reruns (with different
254  # fake objects) to exist from the same base co-addition.
255  #
256  # However, we only re-run detection if doDetection is explicitly True
257  # here (this should always be the opposite of coaddDriver.doDetection);
258  # otherwise we have no way to tell reliably whether any detections
259  # present in an input repo are safe to use.
260  if self.config.doDetection:
261  detectionList = []
262  for patchRef in patchRefList:
263  if ("detectCoaddSources" in self.reuse and
264  patchRef.datasetExists(self.config.coaddName + "Coadd_calexp", write=True)):
265  self.log.info("Skipping detectCoaddSources for %s; output already exists." %
266  patchRef.dataId)
267  continue
268  if not patchRef.datasetExists(self.config.coaddName + "Coadd"):
269  self.log.debug("Not processing %s; required input %sCoadd missing." %
270  (patchRef.dataId, self.config.coaddName))
271  continue
272  detectionList.append(patchRef)
273 
274  pool.map(self.runDetection, detectionList)
275 
276  patchRefList = [patchRef for patchRef in patchRefList if
277  patchRef.datasetExists(self.config.coaddName + "Coadd_calexp") and
278  patchRef.datasetExists(self.config.coaddName + "Coadd_det",
279  write=self.config.doDetection)]
280  dataIdList = [patchRef.dataId for patchRef in patchRefList]
281 
282  # Group by patch
283  patches = {}
284  tract = None
285  for patchRef in patchRefList:
286  dataId = patchRef.dataId
287  if tract is None:
288  tract = dataId["tract"]
289  else:
290  assert tract == dataId["tract"]
291 
292  patch = dataId["patch"]
293  if patch not in patches:
294  patches[patch] = []
295  patches[patch].append(dataId)
296 
297  pool.map(self.runMergeDetections, patches.values())
298 
299  # Deblend merged detections, and test for reprocessing
300  #
301  # The reprocessing allows us to have multiple attempts at deblending large footprints. Large
302  # footprints can suck up a lot of memory in the deblender, which means that when we process on a
303  # cluster, we want to refuse to deblend them (they're flagged "deblend.parent-too-big"). But since
304  # they may have astronomically interesting data, we want the ability to go back and reprocess them
305  # with a more permissive configuration when we have more memory or processing time.
306  #
307  # self.runDeblendMerged will return whether there are any footprints in that image that required
308  # reprocessing. We need to convert that list of booleans into a dict mapping the patchId (x,y) to
309  # a boolean. That tells us whether the merge measurement and forced photometry need to be re-run on
310  # a particular patch.
311  #
312  # This determination of which patches need to be reprocessed exists only in memory (the measurements
313  # have been written, clobbering the old ones), so if there was an exception we would lose this
314  # information, leaving things in an inconsistent state (measurements, merged measurements and
315  # forced photometry old). To attempt to preserve this status, we touch a file (dataset named
316  # "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the measurements,
317  # merge and forced photometry.
318  #
319  # This is, hopefully, a temporary workaround until we can improve the
320  # deblender.
321  try:
322  reprocessed = pool.map(self.runDeblendMerged, patches.values())
323  finally:
324  if self.config.reprocessing:
325  patchReprocessing = {}
326  for dataId, reprocess in zip(dataIdList, reprocessed):
327  patchId = dataId["patch"]
328  patchReprocessing[patchId] = patchReprocessing.get(
329  patchId, False) or reprocess
330  # Persist the determination, to make error recover easier
331  reprocessDataset = self.config.coaddName + "Coadd_multibandReprocessing"
332  for patchId in patchReprocessing:
333  if not patchReprocessing[patchId]:
334  continue
335  dataId = dict(tract=tract, patch=patchId)
336  if patchReprocessing[patchId]:
337  filename = butler.get(
338  reprocessDataset + "_filename", dataId)[0]
339  open(filename, 'a').close() # Touch file
340  elif butler.datasetExists(reprocessDataset, dataId):
341  # We must have failed at some point while reprocessing
342  # and we're starting over
343  patchReprocessing[patchId] = True
344 
345  # Only process patches that have been identified as needing it
346  pool.map(self.runMeasurements, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
347  patchReprocessing[dataId1["patch"]]])
348  pool.map(self.runMergeMeasurements, [idList for patchId, idList in patches.items() if
349  not self.config.reprocessing or patchReprocessing[patchId]])
350  pool.map(self.runForcedPhot, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
351  patchReprocessing[dataId1["patch"]]])
352 
353  # Remove persisted reprocessing determination
354  if self.config.reprocessing:
355  for patchId in patchReprocessing:
356  if not patchReprocessing[patchId]:
357  continue
358  dataId = dict(tract=tract, patch=patchId)
359  filename = butler.get(
360  reprocessDataset + "_filename", dataId)[0]
361  os.unlink(filename)
362 
363  def runDetection(self, cache, patchRef):
364  """! Run detection on a patch
365 
366  Only slave nodes execute this method.
367 
368  @param cache: Pool cache, containing butler
369  @param patchRef: Patch on which to do detection
370  """
371  with self.logOperation("do detections on {}".format(patchRef.dataId)):
372  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
373  coadd = patchRef.get(self.config.coaddName + "Coadd",
374  immediate=True)
375  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
376  self.detectCoaddSources.emptyMetadata()
377  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
378  self.detectCoaddSources.write(detResults, patchRef)
379  self.detectCoaddSources.writeMetadata(patchRef)
380 
381  def runMergeDetections(self, cache, dataIdList):
382  """!Run detection merging on a patch
383 
384  Only slave nodes execute this method.
385 
386  @param cache: Pool cache, containing butler
387  @param dataIdList: List of data identifiers for the patch in different filters
388  """
389  with self.logOperation("merge detections from %s" % (dataIdList,)):
390  dataRefList = [getDataRef(cache.butler, dataId, self.config.coaddName + "Coadd_calexp") for
391  dataId in dataIdList]
392  if ("mergeCoaddDetections" in self.reuse and
393  dataRefList[0].datasetExists(self.config.coaddName + "Coadd_mergeDet", write=True)):
394  self.log.info("Skipping mergeCoaddDetections for %s; output already exists." %
395  dataRefList[0].dataId)
396  return
397  self.mergeCoaddDetections.runDataRef(dataRefList)
398 
399  def runDeblendMerged(self, cache, dataIdList):
400  """Run the deblender on a list of dataId's
401 
402  Only slave nodes execute this method.
403 
404  Parameters
405  ----------
406  cache: Pool cache
407  Pool cache with butler.
408  dataIdList: list
409  Data identifier for patch in each band.
410 
411  Returns
412  -------
413  result: bool
414  whether the patch requires reprocessing.
415  """
416  with self.logOperation("deblending %s" % (dataIdList,)):
417  dataRefList = [getDataRef(cache.butler, dataId, self.config.coaddName + "Coadd_calexp") for
418  dataId in dataIdList]
419  reprocessing = False # Does this patch require reprocessing?
420  if ("deblendCoaddSources" in self.reuse and
421  all([dataRef.datasetExists(self.config.coaddName + "Coadd_" + self.measurementInput,
422  write=True) for dataRef in dataRefList])):
423  if not self.config.reprocessing:
424  self.log.info("Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
425  return False
426 
427  # Footprints are the same every band, therefore we can check just one
428  catalog = dataRefList[0].get(self.config.coaddName + "Coadd_" + self.measurementInput)
429  bigFlag = catalog["deblend_parentTooBig"]
430  # Footprints marked too large by the previous deblender run
431  numOldBig = bigFlag.sum()
432  if numOldBig == 0:
433  self.log.info("No large footprints in %s" % (dataRefList[0].dataId))
434  return False
435 
436  # This if-statement can be removed after DM-15662
437  if self.config.deblendCoaddSources.simultaneous:
438  deblender = self.deblendCoaddSources.multiBandDeblend
439  else:
440  deblender = self.deblendCoaddSources.singleBandDeblend
441 
442  # isLargeFootprint() can potentially return False for a source that is marked
443  # too big in the catalog, because of "new"/different deblender configs.
444  # numNewBig is the number of footprints that *will* be too big if reprocessed
445  numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) for
446  src in catalog[bigFlag]))
447  if numNewBig == numOldBig:
448  self.log.info("All %d formerly large footprints continue to be large in %s" %
449  (numOldBig, dataRefList[0].dataId,))
450  return False
451  self.log.info("Found %d large footprints to be reprocessed in %s" %
452  (numOldBig - numNewBig, [dataRef.dataId for dataRef in dataRefList]))
453  reprocessing = True
454 
455  self.deblendCoaddSources.runDataRef(dataRefList)
456  return reprocessing
457 
458  def runMeasurements(self, cache, dataId):
459  """Run measurement on a patch for a single filter
460 
461  Only slave nodes execute this method.
462 
463  Parameters
464  ----------
465  cache: Pool cache
466  Pool cache, with butler
467  dataId: dataRef
468  Data identifier for patch
469  """
470  with self.logOperation("measurements on %s" % (dataId,)):
471  dataRef = getDataRef(cache.butler, dataId,
472  self.config.coaddName + "Coadd_calexp")
473  if ("measureCoaddSources" in self.reuse and
474  not self.config.reprocessing and
475  dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
476  self.log.info("Skipping measuretCoaddSources for %s; output already exists" % dataId)
477  return
478  self.measureCoaddSources.runDataRef(dataRef)
479 
480  def runMergeMeasurements(self, cache, dataIdList):
481  """!Run measurement merging on a patch
482 
483  Only slave nodes execute this method.
484 
485  @param cache: Pool cache, containing butler
486  @param dataIdList: List of data identifiers for the patch in different filters
487  """
488  with self.logOperation("merge measurements from %s" % (dataIdList,)):
489  dataRefList = [getDataRef(cache.butler, dataId, self.config.coaddName + "Coadd_calexp") for
490  dataId in dataIdList]
491  if ("mergeCoaddMeasurements" in self.reuse and
492  not self.config.reprocessing and
493  dataRefList[0].datasetExists(self.config.coaddName + "Coadd_ref", write=True)):
494  self.log.info("Skipping mergeCoaddMeasurements for %s; output already exists" %
495  dataRefList[0].dataId)
496  return
497  self.mergeCoaddMeasurements.runDataRef(dataRefList)
498 
499  def runForcedPhot(self, cache, dataId):
500  """!Run forced photometry on a patch for a single filter
501 
502  Only slave nodes execute this method.
503 
504  @param cache: Pool cache, with butler
505  @param dataId: Data identifier for patch
506  """
507  with self.logOperation("forced photometry on %s" % (dataId,)):
508  dataRef = getDataRef(cache.butler, dataId,
509  self.config.coaddName + "Coadd_calexp")
510  if ("forcedPhotCoadd" in self.reuse and
511  not self.config.reprocessing and
512  dataRef.datasetExists(self.config.coaddName + "Coadd_forced_src", write=True)):
513  self.log.info("Skipping forcedPhotCoadd for %s; output already exists" % dataId)
514  return
515  self.forcedPhotCoadd.runDataRef(dataRef)
516 
517  def writeMetadata(self, dataRef):
518  """We don't collect any metadata, so skip"""
519  pass
Defines the fields and offsets for a table.
Definition: Schema.h:50
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def emptyMetadata(self)
Definition: task.py:153
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def makeDataRefList(self, namespace)
Make self.refList from self.idList.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
daf::base::PropertyList * list
Definition: fits.cc:833
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.