LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
multiBandDriver.py
Go to the documentation of this file.
1import os
2
3from lsst.coadd.utils.getGen3CoaddExposureId import getGen3CoaddExposureId
4from lsst.pex.config import Config, Field, ConfigurableField
5from lsst.pipe.base import ArgumentParser, TaskRunner
6from lsst.pipe.tasks.multiBand import (DetectCoaddSourcesTask,
7 MergeDetectionsTask,
8 DeblendCoaddSourcesTask,
9 MeasureMergedCoaddSourcesTask,
10 MergeMeasurementsTask,)
11from lsst.ctrl.pool.parallel import BatchPoolTask
12from lsst.ctrl.pool.pool import Pool, abortOnError
13from lsst.meas.base.references import MultiBandReferencesTask
14from lsst.meas.base.forcedPhotCoadd import ForcedPhotCoaddTask
15from lsst.pipe.drivers.utils import getDataRef, TractDataIdContainer
16
17import lsst.afw.table as afwTable
18
19
21 coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
22 doDetection = Field(dtype=bool, default=False,
23 doc="Re-run detection? (requires *Coadd dataset to have been written)")
24 detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
25 doc="Detect sources on coadd")
26 mergeCoaddDetections = ConfigurableField(
27 target=MergeDetectionsTask, doc="Merge detections")
28 deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc="Deblend merged detections")
29 measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
30 doc="Measure merged and (optionally) deblended detections")
31 mergeCoaddMeasurements = ConfigurableField(
32 target=MergeMeasurementsTask, doc="Merge measurements")
33 forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
34 doc="Forced measurement on coadded images")
35 reprocessing = Field(
36 dtype=bool, default=False,
37 doc=("Are we reprocessing?\n\n"
38 "This exists as a workaround for large deblender footprints causing large memory use "
39 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
40 "and return to reprocess on a machine with larger memory or more time "
41 "if we consider those footprints important to recover."),
42 )
43
44 hasFakes = Field(
45 dtype=bool,
46 default=False,
47 doc="Should be set to True if fakes were inserted into the data being processed."
48 )
49
50 def setDefaults(self):
51 Config.setDefaults(self)
52 self.forcedPhotCoaddforcedPhotCoadd.references.retarget(MultiBandReferencesTask)
53
54 def validate(self):
55
56 for subtask in ("mergeCoaddDetections", "deblendCoaddSources", "measureCoaddSources",
57 "mergeCoaddMeasurements", "forcedPhotCoadd"):
58 coaddName = getattr(self, subtask).coaddName
59 if coaddName != self.coaddNamecoaddName:
60 raise RuntimeError("%s.coaddName (%s) doesn't match root coaddName (%s)" %
61 (subtask, coaddName, self.coaddNamecoaddName))
62
63
64class MultiBandDriverTaskRunner(TaskRunner):
65 """TaskRunner for running MultiBandTask
66
67 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
68 except that we have a list of data references instead of a single
69 data reference being passed to the Task.run, and we pass the results
70 of the '--reuse-outputs-from' command option to the Task constructor.
71 """
72
73 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
74 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
75 self.reusereuse = parsedCmd.reuse
76
77 def makeTask(self, parsedCmd=None, args=None):
78 """A variant of the base version that passes a butler argument to the task's constructor
79 parsedCmd or args must be specified.
80 """
81 if parsedCmd is not None:
82 butler = parsedCmd.butler
83 elif args is not None:
84 dataRefList, kwargs = args
85 butler = dataRefList[0].butlerSubset.butler
86 else:
87 raise RuntimeError("parsedCmd or args must be specified")
88 return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.reusereuse)
89
90
91def unpickle(factory, args, kwargs):
92 """Unpickle something by calling a factory"""
93 return factory(*args, **kwargs)
94
95
97 """Multi-node driver for multiband processing"""
98 ConfigClass = MultiBandDriverConfig
99 _DefaultName = "multiBandDriver"
100 RunnerClass = MultiBandDriverTaskRunner
101
102 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
103 """!
104 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor
105 in case it is needed.
106 @param[in] schema: the schema of the source detection catalog used as input.
107 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
108 catalog. May be None if the butler argument is provided or all steps requiring a reference
109 catalog are disabled.
110 """
111 BatchPoolTask.__init__(self, **kwargs)
112 if schema is None:
113 assert butler is not None, "Butler not provided"
114 schema = butler.get(self.config.coaddName +
115 "Coadd_det_schema", immediate=True).schema
116 self.butlerbutler = butler
117 self.reusereuse = tuple(reuse)
118 self.makeSubtask("detectCoaddSources")
119 self.makeSubtask("mergeCoaddDetections", schema=schema)
120 if self.config.measureCoaddSources.inputCatalog.startswith("deblended"):
121 # Ensure that the output from deblendCoaddSources matches the input to measureCoaddSources
122 self.measurementInputmeasurementInput = self.config.measureCoaddSources.inputCatalog
123 self.deblenderOutputdeblenderOutput = []
124 self.deblenderOutputdeblenderOutput.append("deblendedFlux")
125 if self.measurementInputmeasurementInput not in self.deblenderOutputdeblenderOutput:
126 err = "Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
127 raise ValueError(err.format(self.measurementInputmeasurementInput, self.deblenderOutputdeblenderOutput))
128
129 self.makeSubtask("deblendCoaddSources",
130 schema=afwTable.Schema(self.mergeCoaddDetections.schema),
131 peakSchema=afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
132 butler=butler)
133 measureInputSchema = afwTable.Schema(self.deblendCoaddSources.schema)
134 else:
135 measureInputSchema = afwTable.Schema(self.mergeCoaddDetections.schema)
136 self.makeSubtask("measureCoaddSources", schema=measureInputSchema,
137 peakSchema=afwTable.Schema(
138 self.mergeCoaddDetections.merged.getPeakSchema()),
139 refObjLoader=refObjLoader, butler=butler)
140 self.makeSubtask("mergeCoaddMeasurements", schema=afwTable.Schema(
141 self.measureCoaddSources.schema))
142 self.makeSubtask("forcedPhotCoadd", refSchema=afwTable.Schema(
143 self.mergeCoaddMeasurements.schema))
144 if self.config.hasFakes:
145 self.coaddTypecoaddType = "fakes_" + self.config.coaddName
146 else:
147 self.coaddTypecoaddType = self.config.coaddName
148
149 def __reduce__(self):
150 """Pickler"""
151 return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
152 parentTask=self._parentTask, log=self.log,
153 butler=self.butlerbutler, reuse=self.reusereuse))
154
155 @classmethod
156 def _makeArgumentParser(cls, *args, **kwargs):
157 kwargs.pop("doBatch", False)
158 parser = ArgumentParser(name=cls._DefaultName_DefaultName, *args, **kwargs)
159 parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
160 ContainerClass=TractDataIdContainer)
161 parser.addReuseOption(["detectCoaddSources", "mergeCoaddDetections", "measureCoaddSources",
162 "mergeCoaddMeasurements", "forcedPhotCoadd", "deblendCoaddSources"])
163 return parser
164
165 @classmethod
166 def batchWallTime(cls, time, parsedCmd, numCpus):
167 """!Return walltime request for batch job
168
169 @param time: Requested time per iteration
170 @param parsedCmd: Results of argument parsing
171 @param numCores: Number of cores
172 """
173 numTargets = 0
174 for refList in parsedCmd.id.refList:
175 numTargets += len(refList)
176 return time*numTargets/float(numCpus)
177
178 @abortOnError
179 def runDataRef(self, patchRefList):
180 """!Run multiband processing on coadds
181
182 Only the master node runs this method.
183
184 No real MPI communication (scatter/gather) takes place: all I/O goes
185 through the disk. We want the intermediate stages on disk, and the
186 component Tasks are implemented around this, so we just follow suit.
187
188 @param patchRefList: Data references to run measurement
189 """
190 for patchRef in patchRefList:
191 if patchRef:
192 butler = patchRef.getButler()
193 break
194 else:
195 raise RuntimeError("No valid patches")
196 pool = Pool("all")
197 pool.cacheClear()
198 pool.storeSet(butler=butler)
199 # MultiBand measurements require that the detection stage be completed
200 # before measurements can be made.
201 #
202 # The configuration for coaddDriver.py allows detection to be turned
203 # of in the event that fake objects are to be added during the
204 # detection process. This allows the long co-addition process to be
205 # run once, and multiple different MultiBand reruns (with different
206 # fake objects) to exist from the same base co-addition.
207 #
208 # However, we only re-run detection if doDetection is explicitly True
209 # here (this should always be the opposite of coaddDriver.doDetection);
210 # otherwise we have no way to tell reliably whether any detections
211 # present in an input repo are safe to use.
212 if self.config.doDetection:
213 detectionList = []
214 for patchRef in patchRefList:
215 if ("detectCoaddSources" in self.reusereuse and
216 patchRef.datasetExists(self.coaddTypecoaddType + "Coadd_calexp", write=True)):
217 self.log.info("Skipping detectCoaddSources for %s; output already exists." %
218 patchRef.dataId)
219 continue
220 if not patchRef.datasetExists(self.coaddTypecoaddType + "Coadd"):
221 self.log.debug("Not processing %s; required input %sCoadd missing." %
222 (patchRef.dataId, self.config.coaddName))
223 continue
224 detectionList.append(patchRef)
225
226 pool.map(self.runDetectionrunDetection, detectionList)
227
228 patchRefList = [patchRef for patchRef in patchRefList if
229 patchRef.datasetExists(self.coaddTypecoaddType + "Coadd_calexp") and
230 patchRef.datasetExists(self.config.coaddName + "Coadd_det",
231 write=self.config.doDetection)]
232 dataIdList = [patchRef.dataId for patchRef in patchRefList]
233
234 # Group by patch
235 patches = {}
236 tract = None
237 for patchRef in patchRefList:
238 dataId = patchRef.dataId
239 if tract is None:
240 tract = dataId["tract"]
241 else:
242 assert tract == dataId["tract"]
243
244 patch = dataId["patch"]
245 if patch not in patches:
246 patches[patch] = []
247 patches[patch].append(dataId)
248
249 pool.map(self.runMergeDetectionsrunMergeDetections, patches.values())
250
251 # Deblend merged detections, and test for reprocessing
252 #
253 # The reprocessing allows us to have multiple attempts at deblending large footprints. Large
254 # footprints can suck up a lot of memory in the deblender, which means that when we process on a
255 # cluster, we want to refuse to deblend them (they're flagged "deblend.parent-too-big"). But since
256 # they may have astronomically interesting data, we want the ability to go back and reprocess them
257 # with a more permissive configuration when we have more memory or processing time.
258 #
259 # self.runDeblendMerged will return whether there are any footprints in that image that required
260 # reprocessing. We need to convert that list of booleans into a dict mapping the patchId (x,y) to
261 # a boolean. That tells us whether the merge measurement and forced photometry need to be re-run on
262 # a particular patch.
263 #
264 # This determination of which patches need to be reprocessed exists only in memory (the measurements
265 # have been written, clobbering the old ones), so if there was an exception we would lose this
266 # information, leaving things in an inconsistent state (measurements, merged measurements and
267 # forced photometry old). To attempt to preserve this status, we touch a file (dataset named
268 # "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the measurements,
269 # merge and forced photometry.
270 #
271 # This is, hopefully, a temporary workaround until we can improve the
272 # deblender.
273 try:
274 reprocessed = pool.map(self.runDeblendMergedrunDeblendMerged, patches.values())
275 finally:
276 if self.config.reprocessing:
277 patchReprocessing = {}
278 for dataId, reprocess in zip(dataIdList, reprocessed):
279 patchId = dataId["patch"]
280 patchReprocessing[patchId] = patchReprocessing.get(
281 patchId, False) or reprocess
282 # Persist the determination, to make error recover easier
283 reprocessDataset = self.config.coaddName + "Coadd_multibandReprocessing"
284 for patchId in patchReprocessing:
285 if not patchReprocessing[patchId]:
286 continue
287 dataId = dict(tract=tract, patch=patchId)
288 if patchReprocessing[patchId]:
289 filename = butler.get(
290 reprocessDataset + "_filename", dataId)[0]
291 open(filename, 'a').close() # Touch file
292 elif butler.datasetExists(reprocessDataset, dataId):
293 # We must have failed at some point while reprocessing
294 # and we're starting over
295 patchReprocessing[patchId] = True
296
297 # Only process patches that have been identifiedz as needing it
298 pool.map(self.runMeasurementsrunMeasurements, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
299 patchReprocessing[dataId1["patch"]]])
300 pool.map(self.runMergeMeasurementsrunMergeMeasurements, [idList for patchId, idList in patches.items() if
301 not self.config.reprocessing or patchReprocessing[patchId]])
302 pool.map(self.runForcedPhotrunForcedPhot, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
303 patchReprocessing[dataId1["patch"]]])
304
305 # Remove persisted reprocessing determination
306 if self.config.reprocessing:
307 for patchId in patchReprocessing:
308 if not patchReprocessing[patchId]:
309 continue
310 dataId = dict(tract=tract, patch=patchId)
311 filename = butler.get(
312 reprocessDataset + "_filename", dataId)[0]
313 os.unlink(filename)
314
315 def runDetection(self, cache, patchRef):
316 """! Run detection on a patch
317
318 Only slave nodes execute this method.
319
320 @param cache: Pool cache, containing butler
321 @param patchRef: Patch on which to do detection
322 """
323 with self.logOperationlogOperation("do detections on {}".format(patchRef.dataId)):
324 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
325 coadd = patchRef.get(self.coaddTypecoaddType + "Coadd", immediate=True)
326 expId = getGen3CoaddExposureId(patchRef, coaddName=self.config.coaddName, log=self.log)
327 self.detectCoaddSources.emptyMetadata()
328 detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
329 self.detectCoaddSources.write(detResults, patchRef)
330 self.detectCoaddSources.writeMetadata(patchRef)
331
332 def runMergeDetections(self, cache, dataIdList):
333 """!Run detection merging on a patch
334
335 Only slave nodes execute this method.
336
337 @param cache: Pool cache, containing butler
338 @param dataIdList: List of data identifiers for the patch in different filters
339 """
340 with self.logOperationlogOperation("merge detections from %s" % (dataIdList,)):
341 dataRefList = [getDataRef(cache.butler, dataId, self.coaddTypecoaddType + "Coadd_calexp") for
342 dataId in dataIdList]
343 if ("mergeCoaddDetections" in self.reusereuse and
344 dataRefList[0].datasetExists(self.config.coaddName + "Coadd_mergeDet", write=True)):
345 self.log.info("Skipping mergeCoaddDetections for %s; output already exists." %
346 dataRefList[0].dataId)
347 return
348 self.mergeCoaddDetections.runDataRef(dataRefList)
349
350 def runDeblendMerged(self, cache, dataIdList):
351 """Run the deblender on a list of dataId's
352
353 Only slave nodes execute this method.
354
355 Parameters
356 ----------
357 cache: Pool cache
358 Pool cache with butler.
359 dataIdList: list
360 Data identifier for patch in each band.
361
362 Returns
363 -------
364 result: bool
365 whether the patch requires reprocessing.
366 """
367 with self.logOperationlogOperation("deblending %s" % (dataIdList,)):
368 dataRefList = [getDataRef(cache.butler, dataId, self.coaddTypecoaddType + "Coadd_calexp") for
369 dataId in dataIdList]
370 reprocessing = False # Does this patch require reprocessing?
371 if ("deblendCoaddSources" in self.reusereuse and
372 all([dataRef.datasetExists(self.config.coaddName + "Coadd_" + self.measurementInputmeasurementInput,
373 write=True) for dataRef in dataRefList])):
374 if not self.config.reprocessing:
375 self.log.info("Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
376 return False
377
378 # Footprints are the same every band, therefore we can check just one
379 catalog = dataRefList[0].get(self.config.coaddName + "Coadd_" + self.measurementInputmeasurementInput)
380 bigFlag = catalog["deblend_parentTooBig"]
381 # Footprints marked too large by the previous deblender run
382 numOldBig = bigFlag.sum()
383 if numOldBig == 0:
384 self.log.info("No large footprints in %s" % (dataRefList[0].dataId))
385 return False
386
387 # This if-statement can be removed after DM-15662
388 if self.config.deblendCoaddSources.simultaneous:
389 deblender = self.deblendCoaddSources.multiBandDeblend
390 else:
391 deblender = self.deblendCoaddSources.singleBandDeblend
392
393 # isLargeFootprint() can potentially return False for a source that is marked
394 # too big in the catalog, because of "new"/different deblender configs.
395 # numNewBig is the number of footprints that *will* be too big if reprocessed
396 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) for
397 src in catalog[bigFlag]))
398 if numNewBig == numOldBig:
399 self.log.info("All %d formerly large footprints continue to be large in %s" %
400 (numOldBig, dataRefList[0].dataId,))
401 return False
402 self.log.info("Found %d large footprints to be reprocessed in %s" %
403 (numOldBig - numNewBig, [dataRef.dataId for dataRef in dataRefList]))
404 reprocessing = True
405
406 self.deblendCoaddSources.runDataRef(dataRefList)
407 return reprocessing
408
409 def runMeasurements(self, cache, dataId):
410 """Run measurement on a patch for a single filter
411
412 Only slave nodes execute this method.
413
414 Parameters
415 ----------
416 cache: Pool cache
417 Pool cache, with butler
418 dataId: dataRef
419 Data identifier for patch
420 """
421 with self.logOperationlogOperation("measurements on %s" % (dataId,)):
422 dataRef = getDataRef(cache.butler, dataId, self.coaddTypecoaddType + "Coadd_calexp")
423 if ("measureCoaddSources" in self.reusereuse and
424 not self.config.reprocessing and
425 dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
426 self.log.info("Skipping measuretCoaddSources for %s; output already exists" % dataId)
427 return
428 self.measureCoaddSources.runDataRef(dataRef)
429
430 def runMergeMeasurements(self, cache, dataIdList):
431 """!Run measurement merging on a patch
432
433 Only slave nodes execute this method.
434
435 @param cache: Pool cache, containing butler
436 @param dataIdList: List of data identifiers for the patch in different filters
437 """
438 with self.logOperationlogOperation("merge measurements from %s" % (dataIdList,)):
439 dataRefList = [getDataRef(cache.butler, dataId, self.coaddTypecoaddType + "Coadd_calexp") for
440 dataId in dataIdList]
441 if ("mergeCoaddMeasurements" in self.reusereuse and
442 not self.config.reprocessing and
443 dataRefList[0].datasetExists(self.config.coaddName + "Coadd_ref", write=True)):
444 self.log.info("Skipping mergeCoaddMeasurements for %s; output already exists" %
445 dataRefList[0].dataId)
446 return
447 self.mergeCoaddMeasurements.runDataRef(dataRefList)
448
449 def runForcedPhot(self, cache, dataId):
450 """!Run forced photometry on a patch for a single filter
451
452 Only slave nodes execute this method.
453
454 @param cache: Pool cache, with butler
455 @param dataId: Data identifier for patch
456 """
457 with self.logOperationlogOperation("forced photometry on %s" % (dataId,)):
458 dataRef = getDataRef(cache.butler, dataId,
459 self.coaddTypecoaddType + "Coadd_calexp")
460 if ("forcedPhotCoadd" in self.reusereuse and
461 not self.config.reprocessing and
462 dataRef.datasetExists(self.config.coaddName + "Coadd_forced_src", write=True)):
463 self.log.info("Skipping forcedPhotCoadd for %s; output already exists" % dataId)
464 return
465 self.forcedPhotCoadd.runDataRef(dataRef)
466
467 def writeMetadata(self, dataRef):
468 """We don't collect any metadata, so skip"""
469 pass
Defines the fields and offsets for a table.
Definition: Schema.h:51
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:502
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def getGen3CoaddExposureId(dataRef, coaddName="deep", includeBand=True, log=None)
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Definition: getTemplate.py:596
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def unpickle(factory, args, kwargs)
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:16
def write(self, patchRef, catalog)
Write the output.