LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
multiBandDriver.py
Go to the documentation of this file.
1 import os
2 
3 from lsst.pex.config import Config, Field, ConfigurableField
4 from lsst.pipe.base import ArgumentParser, TaskRunner
5 from lsst.pipe.tasks.multiBand import (DetectCoaddSourcesTask,
6  MergeDetectionsTask,
7  DeblendCoaddSourcesTask,
8  MeasureMergedCoaddSourcesTask,
9  MergeMeasurementsTask,)
10 from lsst.ctrl.pool.parallel import BatchPoolTask
11 from lsst.ctrl.pool.pool import Pool, abortOnError
12 from lsst.meas.base.references import MultiBandReferencesTask
13 from lsst.meas.base.forcedPhotCoadd import ForcedPhotCoaddTask
14 from lsst.pipe.drivers.utils import getDataRef, TractDataIdContainer
15 
16 import lsst.afw.table as afwTable
17 
18 
19 class MultiBandDriverConfig(Config):
20  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
21  doDetection = Field(dtype=bool, default=False,
22  doc="Re-run detection? (requires *Coadd dataset to have been written)")
23  detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
24  doc="Detect sources on coadd")
25  mergeCoaddDetections = ConfigurableField(
26  target=MergeDetectionsTask, doc="Merge detections")
27  deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc="Deblend merged detections")
28  measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
29  doc="Measure merged and (optionally) deblended detections")
30  mergeCoaddMeasurements = ConfigurableField(
31  target=MergeMeasurementsTask, doc="Merge measurements")
32  forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
33  doc="Forced measurement on coadded images")
34  reprocessing = Field(
35  dtype=bool, default=False,
36  doc=("Are we reprocessing?\n\n"
37  "This exists as a workaround for large deblender footprints causing large memory use "
38  "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
39  "and return to reprocess on a machine with larger memory or more time "
40  "if we consider those footprints important to recover."),
41  )
42 
43  hasFakes = Field(
44  dtype=bool,
45  default=False,
46  doc="Should be set to True if fakes were inserted into the data being processed."
47  )
48 
49  def setDefaults(self):
50  Config.setDefaults(self)
51  self.forcedPhotCoadd.references.retarget(MultiBandReferencesTask)
52 
53  def validate(self):
54 
55  for subtask in ("mergeCoaddDetections", "deblendCoaddSources", "measureCoaddSources",
56  "mergeCoaddMeasurements", "forcedPhotCoadd"):
57  coaddName = getattr(self, subtask).coaddName
58  if coaddName != self.coaddName:
59  raise RuntimeError("%s.coaddName (%s) doesn't match root coaddName (%s)" %
60  (subtask, coaddName, self.coaddName))
61 
62 
64  """TaskRunner for running MultiBandTask
65 
66  This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
67  except that we have a list of data references instead of a single
68  data reference being passed to the Task.run, and we pass the results
69  of the '--reuse-outputs-from' command option to the Task constructor.
70  """
71 
72  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
73  TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
74  self.reuse = parsedCmd.reuse
75 
76  def makeTask(self, parsedCmd=None, args=None):
77  """A variant of the base version that passes a butler argument to the task's constructor
78  parsedCmd or args must be specified.
79  """
80  if parsedCmd is not None:
81  butler = parsedCmd.butler
82  elif args is not None:
83  dataRefList, kwargs = args
84  butler = dataRefList[0].butlerSubset.butler
85  else:
86  raise RuntimeError("parsedCmd or args must be specified")
87  return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.reuse)
88 
89 
90 def unpickle(factory, args, kwargs):
91  """Unpickle something by calling a factory"""
92  return factory(*args, **kwargs)
93 
94 
96  """Multi-node driver for multiband processing"""
97  ConfigClass = MultiBandDriverConfig
98  _DefaultName = "multiBandDriver"
99  RunnerClass = MultiBandDriverTaskRunner
100 
101  def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
102  """!
103  @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor
104  in case it is needed.
105  @param[in] schema: the schema of the source detection catalog used as input.
106  @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
107  catalog. May be None if the butler argument is provided or all steps requiring a reference
108  catalog are disabled.
109  """
110  BatchPoolTask.__init__(self, **kwargs)
111  if schema is None:
112  assert butler is not None, "Butler not provided"
113  schema = butler.get(self.config.coaddName +
114  "Coadd_det_schema", immediate=True).schema
115  self.butler = butler
116  self.reuse = tuple(reuse)
117  self.makeSubtask("detectCoaddSources")
118  self.makeSubtask("mergeCoaddDetections", schema=schema)
119  if self.config.measureCoaddSources.inputCatalog.startswith("deblended"):
120  # Ensure that the output from deblendCoaddSources matches the input to measureCoaddSources
121  self.measurementInput = self.config.measureCoaddSources.inputCatalog
122  self.deblenderOutput = []
123  if self.config.deblendCoaddSources.simultaneous:
124  self.deblenderOutput.append("deblendedModel")
125  else:
126  self.deblenderOutput.append("deblendedFlux")
127  if self.measurementInput not in self.deblenderOutput:
128  err = "Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
129  raise ValueError(err.format(self.measurementInput, self.deblenderOutput))
130 
131  self.makeSubtask("deblendCoaddSources",
132  schema=afwTable.Schema(self.mergeCoaddDetections.schema),
133  peakSchema=afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
134  butler=butler)
135  measureInputSchema = afwTable.Schema(self.deblendCoaddSources.schema)
136  else:
137  measureInputSchema = afwTable.Schema(self.mergeCoaddDetections.schema)
138  self.makeSubtask("measureCoaddSources", schema=measureInputSchema,
139  peakSchema=afwTable.Schema(
140  self.mergeCoaddDetections.merged.getPeakSchema()),
141  refObjLoader=refObjLoader, butler=butler)
142  self.makeSubtask("mergeCoaddMeasurements", schema=afwTable.Schema(
143  self.measureCoaddSources.schema))
144  self.makeSubtask("forcedPhotCoadd", refSchema=afwTable.Schema(
145  self.mergeCoaddMeasurements.schema))
146  if self.config.hasFakes:
147  self.coaddType = "fakes_" + self.config.coaddName
148  else:
149  self.coaddType = self.config.coaddName
150 
151  def __reduce__(self):
152  """Pickler"""
153  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
154  parentTask=self._parentTask, log=self.log,
155  butler=self.butler, reuse=self.reuse))
156 
157  @classmethod
158  def _makeArgumentParser(cls, *args, **kwargs):
159  kwargs.pop("doBatch", False)
160  parser = ArgumentParser(name=cls._DefaultName, *args, **kwargs)
161  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
162  ContainerClass=TractDataIdContainer)
163  parser.addReuseOption(["detectCoaddSources", "mergeCoaddDetections", "measureCoaddSources",
164  "mergeCoaddMeasurements", "forcedPhotCoadd", "deblendCoaddSources"])
165  return parser
166 
167  @classmethod
168  def batchWallTime(cls, time, parsedCmd, numCpus):
169  """!Return walltime request for batch job
170 
171  @param time: Requested time per iteration
172  @param parsedCmd: Results of argument parsing
173  @param numCores: Number of cores
174  """
175  numTargets = 0
176  for refList in parsedCmd.id.refList:
177  numTargets += len(refList)
178  return time*numTargets/float(numCpus)
179 
180  @abortOnError
181  def runDataRef(self, patchRefList):
182  """!Run multiband processing on coadds
183 
184  Only the master node runs this method.
185 
186  No real MPI communication (scatter/gather) takes place: all I/O goes
187  through the disk. We want the intermediate stages on disk, and the
188  component Tasks are implemented around this, so we just follow suit.
189 
190  @param patchRefList: Data references to run measurement
191  """
192  for patchRef in patchRefList:
193  if patchRef:
194  butler = patchRef.getButler()
195  break
196  else:
197  raise RuntimeError("No valid patches")
198  pool = Pool("all")
199  pool.cacheClear()
200  pool.storeSet(butler=butler)
201  # MultiBand measurements require that the detection stage be completed
202  # before measurements can be made.
203  #
204  # The configuration for coaddDriver.py allows detection to be turned
205  # of in the event that fake objects are to be added during the
206  # detection process. This allows the long co-addition process to be
207  # run once, and multiple different MultiBand reruns (with different
208  # fake objects) to exist from the same base co-addition.
209  #
210  # However, we only re-run detection if doDetection is explicitly True
211  # here (this should always be the opposite of coaddDriver.doDetection);
212  # otherwise we have no way to tell reliably whether any detections
213  # present in an input repo are safe to use.
214  if self.config.doDetection:
215  detectionList = []
216  for patchRef in patchRefList:
217  if ("detectCoaddSources" in self.reuse and
218  patchRef.datasetExists(self.coaddType + "Coadd_calexp", write=True)):
219  self.log.info("Skipping detectCoaddSources for %s; output already exists." %
220  patchRef.dataId)
221  continue
222  if not patchRef.datasetExists(self.coaddType + "Coadd"):
223  self.log.debug("Not processing %s; required input %sCoadd missing." %
224  (patchRef.dataId, self.config.coaddName))
225  continue
226  detectionList.append(patchRef)
227 
228  pool.map(self.runDetection, detectionList)
229 
230  patchRefList = [patchRef for patchRef in patchRefList if
231  patchRef.datasetExists(self.coaddType + "Coadd_calexp") and
232  patchRef.datasetExists(self.config.coaddName + "Coadd_det",
233  write=self.config.doDetection)]
234  dataIdList = [patchRef.dataId for patchRef in patchRefList]
235 
236  # Group by patch
237  patches = {}
238  tract = None
239  for patchRef in patchRefList:
240  dataId = patchRef.dataId
241  if tract is None:
242  tract = dataId["tract"]
243  else:
244  assert tract == dataId["tract"]
245 
246  patch = dataId["patch"]
247  if patch not in patches:
248  patches[patch] = []
249  patches[patch].append(dataId)
250 
251  pool.map(self.runMergeDetections, patches.values())
252 
253  # Deblend merged detections, and test for reprocessing
254  #
255  # The reprocessing allows us to have multiple attempts at deblending large footprints. Large
256  # footprints can suck up a lot of memory in the deblender, which means that when we process on a
257  # cluster, we want to refuse to deblend them (they're flagged "deblend.parent-too-big"). But since
258  # they may have astronomically interesting data, we want the ability to go back and reprocess them
259  # with a more permissive configuration when we have more memory or processing time.
260  #
261  # self.runDeblendMerged will return whether there are any footprints in that image that required
262  # reprocessing. We need to convert that list of booleans into a dict mapping the patchId (x,y) to
263  # a boolean. That tells us whether the merge measurement and forced photometry need to be re-run on
264  # a particular patch.
265  #
266  # This determination of which patches need to be reprocessed exists only in memory (the measurements
267  # have been written, clobbering the old ones), so if there was an exception we would lose this
268  # information, leaving things in an inconsistent state (measurements, merged measurements and
269  # forced photometry old). To attempt to preserve this status, we touch a file (dataset named
270  # "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the measurements,
271  # merge and forced photometry.
272  #
273  # This is, hopefully, a temporary workaround until we can improve the
274  # deblender.
275  try:
276  reprocessed = pool.map(self.runDeblendMerged, patches.values())
277  finally:
278  if self.config.reprocessing:
279  patchReprocessing = {}
280  for dataId, reprocess in zip(dataIdList, reprocessed):
281  patchId = dataId["patch"]
282  patchReprocessing[patchId] = patchReprocessing.get(
283  patchId, False) or reprocess
284  # Persist the determination, to make error recover easier
285  reprocessDataset = self.config.coaddName + "Coadd_multibandReprocessing"
286  for patchId in patchReprocessing:
287  if not patchReprocessing[patchId]:
288  continue
289  dataId = dict(tract=tract, patch=patchId)
290  if patchReprocessing[patchId]:
291  filename = butler.get(
292  reprocessDataset + "_filename", dataId)[0]
293  open(filename, 'a').close() # Touch file
294  elif butler.datasetExists(reprocessDataset, dataId):
295  # We must have failed at some point while reprocessing
296  # and we're starting over
297  patchReprocessing[patchId] = True
298 
299  # Only process patches that have been identifiedz as needing it
300  pool.map(self.runMeasurements, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
301  patchReprocessing[dataId1["patch"]]])
302  pool.map(self.runMergeMeasurements, [idList for patchId, idList in patches.items() if
303  not self.config.reprocessing or patchReprocessing[patchId]])
304  pool.map(self.runForcedPhot, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
305  patchReprocessing[dataId1["patch"]]])
306 
307  # Remove persisted reprocessing determination
308  if self.config.reprocessing:
309  for patchId in patchReprocessing:
310  if not patchReprocessing[patchId]:
311  continue
312  dataId = dict(tract=tract, patch=patchId)
313  filename = butler.get(
314  reprocessDataset + "_filename", dataId)[0]
315  os.unlink(filename)
316 
317  def runDetection(self, cache, patchRef):
318  """! Run detection on a patch
319 
320  Only slave nodes execute this method.
321 
322  @param cache: Pool cache, containing butler
323  @param patchRef: Patch on which to do detection
324  """
325  with self.logOperation("do detections on {}".format(patchRef.dataId)):
326  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
327  coadd = patchRef.get(self.coaddType + "Coadd", immediate=True)
328  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
329  self.detectCoaddSources.emptyMetadata()
330  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
331  self.detectCoaddSources.write(detResults, patchRef)
332  self.detectCoaddSources.writeMetadata(patchRef)
333 
334  def runMergeDetections(self, cache, dataIdList):
335  """!Run detection merging on a patch
336 
337  Only slave nodes execute this method.
338 
339  @param cache: Pool cache, containing butler
340  @param dataIdList: List of data identifiers for the patch in different filters
341  """
342  with self.logOperation("merge detections from %s" % (dataIdList,)):
343  dataRefList = [getDataRef(cache.butler, dataId, self.coaddType + "Coadd_calexp") for
344  dataId in dataIdList]
345  if ("mergeCoaddDetections" in self.reuse and
346  dataRefList[0].datasetExists(self.config.coaddName + "Coadd_mergeDet", write=True)):
347  self.log.info("Skipping mergeCoaddDetections for %s; output already exists." %
348  dataRefList[0].dataId)
349  return
350  self.mergeCoaddDetections.runDataRef(dataRefList)
351 
352  def runDeblendMerged(self, cache, dataIdList):
353  """Run the deblender on a list of dataId's
354 
355  Only slave nodes execute this method.
356 
357  Parameters
358  ----------
359  cache: Pool cache
360  Pool cache with butler.
361  dataIdList: list
362  Data identifier for patch in each band.
363 
364  Returns
365  -------
366  result: bool
367  whether the patch requires reprocessing.
368  """
369  with self.logOperation("deblending %s" % (dataIdList,)):
370  dataRefList = [getDataRef(cache.butler, dataId, self.coaddType + "Coadd_calexp") for
371  dataId in dataIdList]
372  reprocessing = False # Does this patch require reprocessing?
373  if ("deblendCoaddSources" in self.reuse and
374  all([dataRef.datasetExists(self.config.coaddName + "Coadd_" + self.measurementInput,
375  write=True) for dataRef in dataRefList])):
376  if not self.config.reprocessing:
377  self.log.info("Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
378  return False
379 
380  # Footprints are the same every band, therefore we can check just one
381  catalog = dataRefList[0].get(self.config.coaddName + "Coadd_" + self.measurementInput)
382  bigFlag = catalog["deblend_parentTooBig"]
383  # Footprints marked too large by the previous deblender run
384  numOldBig = bigFlag.sum()
385  if numOldBig == 0:
386  self.log.info("No large footprints in %s" % (dataRefList[0].dataId))
387  return False
388 
389  # This if-statement can be removed after DM-15662
390  if self.config.deblendCoaddSources.simultaneous:
391  deblender = self.deblendCoaddSources.multiBandDeblend
392  else:
393  deblender = self.deblendCoaddSources.singleBandDeblend
394 
395  # isLargeFootprint() can potentially return False for a source that is marked
396  # too big in the catalog, because of "new"/different deblender configs.
397  # numNewBig is the number of footprints that *will* be too big if reprocessed
398  numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) for
399  src in catalog[bigFlag]))
400  if numNewBig == numOldBig:
401  self.log.info("All %d formerly large footprints continue to be large in %s" %
402  (numOldBig, dataRefList[0].dataId,))
403  return False
404  self.log.info("Found %d large footprints to be reprocessed in %s" %
405  (numOldBig - numNewBig, [dataRef.dataId for dataRef in dataRefList]))
406  reprocessing = True
407 
408  self.deblendCoaddSources.runDataRef(dataRefList)
409  return reprocessing
410 
411  def runMeasurements(self, cache, dataId):
412  """Run measurement on a patch for a single filter
413 
414  Only slave nodes execute this method.
415 
416  Parameters
417  ----------
418  cache: Pool cache
419  Pool cache, with butler
420  dataId: dataRef
421  Data identifier for patch
422  """
423  with self.logOperation("measurements on %s" % (dataId,)):
424  dataRef = getDataRef(cache.butler, dataId, self.coaddType + "Coadd_calexp")
425  if ("measureCoaddSources" in self.reuse and
426  not self.config.reprocessing and
427  dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
428  self.log.info("Skipping measuretCoaddSources for %s; output already exists" % dataId)
429  return
430  self.measureCoaddSources.runDataRef(dataRef)
431 
432  def runMergeMeasurements(self, cache, dataIdList):
433  """!Run measurement merging on a patch
434 
435  Only slave nodes execute this method.
436 
437  @param cache: Pool cache, containing butler
438  @param dataIdList: List of data identifiers for the patch in different filters
439  """
440  with self.logOperation("merge measurements from %s" % (dataIdList,)):
441  dataRefList = [getDataRef(cache.butler, dataId, self.coaddType + "Coadd_calexp") for
442  dataId in dataIdList]
443  if ("mergeCoaddMeasurements" in self.reuse and
444  not self.config.reprocessing and
445  dataRefList[0].datasetExists(self.config.coaddName + "Coadd_ref", write=True)):
446  self.log.info("Skipping mergeCoaddMeasurements for %s; output already exists" %
447  dataRefList[0].dataId)
448  return
449  self.mergeCoaddMeasurements.runDataRef(dataRefList)
450 
451  def runForcedPhot(self, cache, dataId):
452  """!Run forced photometry on a patch for a single filter
453 
454  Only slave nodes execute this method.
455 
456  @param cache: Pool cache, with butler
457  @param dataId: Data identifier for patch
458  """
459  with self.logOperation("forced photometry on %s" % (dataId,)):
460  dataRef = getDataRef(cache.butler, dataId,
461  self.coaddType + "Coadd_calexp")
462  if ("forcedPhotCoadd" in self.reuse and
463  not self.config.reprocessing and
464  dataRef.datasetExists(self.config.coaddName + "Coadd_forced_src", write=True)):
465  self.log.info("Skipping forcedPhotCoadd for %s; output already exists" % dataId)
466  return
467  self.forcedPhotCoadd.runDataRef(dataRef)
468 
469  def writeMetadata(self, dataRef):
470  """We don't collect any metadata, so skip"""
471  pass
def write(self, patchRef, catalog)
Write the output.
Defines the fields and offsets for a table.
Definition: Schema.h:50
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def emptyMetadata(self)
Definition: task.py:153
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:16
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.