1 from __future__
import absolute_import, division, print_function
4 from builtins
import zip
6 from lsst.pex.config
import Config, Field, ConfigurableField
10 DeblendCoaddSourcesTask,
11 MeasureMergedCoaddSourcesTask,
12 MergeMeasurementsTask,)
23 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
24 doDetection = Field(dtype=bool, default=
False,
25 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
26 detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
27 doc=
"Detect sources on coadd")
28 mergeCoaddDetections = ConfigurableField(
29 target=MergeDetectionsTask, doc=
"Merge detections")
30 deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
31 measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
32 doc=
"Measure merged and (optionally) deblended detections")
33 mergeCoaddMeasurements = ConfigurableField(
34 target=MergeMeasurementsTask, doc=
"Merge measurements")
35 forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
36 doc=
"Forced measurement on coadded images")
38 dtype=bool, default=
False,
39 doc=(
"Are we reprocessing?\n\n" 40 "This exists as a workaround for large deblender footprints causing large memory use " 41 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster " 42 "and return to reprocess on a machine with larger memory or more time " 43 "if we consider those footprints important to recover."),
49 doc=
"Should be set to True if fakes were inserted into the data being processed." 53 Config.setDefaults(self)
58 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
59 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
60 coaddName = getattr(self, subtask).coaddName
62 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
67 """TaskRunner for running MultiBandTask 69 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner, 70 except that we have a list of data references instead of a single 71 data reference being passed to the Task.run, and we pass the results 72 of the '--reuse-outputs-from' command option to the Task constructor. 75 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
76 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
80 """A variant of the base version that passes a butler argument to the task's constructor 81 parsedCmd or args must be specified. 83 if parsedCmd
is not None:
84 butler = parsedCmd.butler
85 elif args
is not None:
86 dataRefList, kwargs = args
87 butler = dataRefList[0].butlerSubset.butler
89 raise RuntimeError(
"parsedCmd or args must be specified")
94 """Unpickle something by calling a factory""" 95 return factory(*args, **kwargs)
99 """Multi-node driver for multiband processing""" 100 ConfigClass = MultiBandDriverConfig
101 _DefaultName =
"multiBandDriver" 102 RunnerClass = MultiBandDriverTaskRunner
104 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
106 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor 107 in case it is needed. 108 @param[in] schema: the schema of the source detection catalog used as input. 109 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 110 catalog. May be None if the butler argument is provided or all steps requiring a reference 111 catalog are disabled. 113 BatchPoolTask.__init__(self, **kwargs)
115 assert butler
is not None,
"Butler not provided" 116 schema = butler.get(self.
config.coaddName +
117 "Coadd_det_schema", immediate=
True).schema
121 self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
122 if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
126 if self.
config.deblendCoaddSources.simultaneous:
131 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'" 136 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
141 self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
143 self.mergeCoaddDetections.merged.getPeakSchema()),
144 refObjLoader=refObjLoader, butler=butler)
146 self.measureCoaddSources.schema))
148 self.mergeCoaddMeasurements.schema))
156 return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
161 def _makeArgumentParser(cls, *args, **kwargs):
162 kwargs.pop(
"doBatch",
False)
164 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
165 ContainerClass=TractDataIdContainer)
166 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
167 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
172 """!Return walltime request for batch job 174 @param time: Requested time per iteration 175 @param parsedCmd: Results of argument parsing 176 @param numCores: Number of cores 179 for refList
in parsedCmd.id.refList:
180 numTargets += len(refList)
181 return time*numTargets/float(numCpus)
185 """!Run multiband processing on coadds 187 Only the master node runs this method. 189 No real MPI communication (scatter/gather) takes place: all I/O goes 190 through the disk. We want the intermediate stages on disk, and the 191 component Tasks are implemented around this, so we just follow suit. 193 @param patchRefList: Data references to run measurement 195 for patchRef
in patchRefList:
197 butler = patchRef.getButler()
200 raise RuntimeError(
"No valid patches")
203 pool.storeSet(butler=butler)
217 if self.
config.doDetection:
219 for patchRef
in patchRefList:
220 if (
"detectCoaddSources" in self.
reuse and 221 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp", write=
True)):
222 self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
225 if not patchRef.datasetExists(self.
coaddType +
"Coadd"):
226 self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
227 (patchRef.dataId, self.
config.coaddName))
229 detectionList.append(patchRef)
233 patchRefList = [patchRef
for patchRef
in patchRefList
if 234 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp")
and 235 patchRef.datasetExists(self.
config.coaddName +
"Coadd_det",
236 write=self.
config.doDetection)]
237 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
242 for patchRef
in patchRefList:
243 dataId = patchRef.dataId
245 tract = dataId[
"tract"]
247 assert tract == dataId[
"tract"]
249 patch = dataId[
"patch"]
250 if patch
not in patches:
252 patches[patch].
append(dataId)
281 if self.
config.reprocessing:
282 patchReprocessing = {}
283 for dataId, reprocess
in zip(dataIdList, reprocessed):
284 patchId = dataId[
"patch"]
285 patchReprocessing[patchId] = patchReprocessing.get(
286 patchId,
False)
or reprocess
288 reprocessDataset = self.
config.coaddName +
"Coadd_multibandReprocessing" 289 for patchId
in patchReprocessing:
290 if not patchReprocessing[patchId]:
292 dataId = dict(tract=tract, patch=patchId)
293 if patchReprocessing[patchId]:
294 filename = butler.get(
295 reprocessDataset +
"_filename", dataId)[0]
296 open(filename,
'a').close()
297 elif butler.datasetExists(reprocessDataset, dataId):
300 patchReprocessing[patchId] =
True 304 patchReprocessing[dataId1[
"patch"]]])
306 not self.
config.reprocessing
or patchReprocessing[patchId]])
307 pool.map(self.
runForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.
config.reprocessing
or 308 patchReprocessing[dataId1[
"patch"]]])
311 if self.
config.reprocessing:
312 for patchId
in patchReprocessing:
313 if not patchReprocessing[patchId]:
315 dataId = dict(tract=tract, patch=patchId)
316 filename = butler.get(
317 reprocessDataset +
"_filename", dataId)[0]
321 """! Run detection on a patch 323 Only slave nodes execute this method. 325 @param cache: Pool cache, containing butler 326 @param patchRef: Patch on which to do detection 329 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
330 coadd = patchRef.get(self.
coaddType +
"Coadd", immediate=
True)
331 expId = int(patchRef.get(self.
config.coaddName +
"CoaddId"))
333 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
334 self.detectCoaddSources.
write(detResults, patchRef)
338 """!Run detection merging on a patch 340 Only slave nodes execute this method. 342 @param cache: Pool cache, containing butler 343 @param dataIdList: List of data identifiers for the patch in different filters 345 with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
347 dataId
in dataIdList]
348 if (
"mergeCoaddDetections" in self.
reuse and 349 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_mergeDet", write=
True)):
350 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
351 dataRefList[0].dataId)
353 self.mergeCoaddDetections.
runDataRef(dataRefList)
356 """Run the deblender on a list of dataId's 358 Only slave nodes execute this method. 363 Pool cache with butler. 365 Data identifier for patch in each band. 370 whether the patch requires reprocessing. 372 with self.
logOperation(
"deblending %s" % (dataIdList,)):
374 dataId
in dataIdList]
376 if (
"deblendCoaddSources" in self.
reuse and 378 write=
True)
for dataRef
in dataRefList])):
379 if not self.
config.reprocessing:
380 self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
385 bigFlag = catalog[
"deblend_parentTooBig"]
387 numOldBig = bigFlag.sum()
389 self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
393 if self.
config.deblendCoaddSources.simultaneous:
394 deblender = self.deblendCoaddSources.multiBandDeblend
396 deblender = self.deblendCoaddSources.singleBandDeblend
401 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for 402 src
in catalog[bigFlag]))
403 if numNewBig == numOldBig:
404 self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
405 (numOldBig, dataRefList[0].dataId,))
407 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
408 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
411 self.deblendCoaddSources.
runDataRef(dataRefList)
415 """Run measurement on a patch for a single filter 417 Only slave nodes execute this method. 422 Pool cache, with butler 424 Data identifier for patch 426 with self.
logOperation(
"measurements on %s" % (dataId,)):
428 if (
"measureCoaddSources" in self.
reuse and 429 not self.
config.reprocessing
and 430 dataRef.datasetExists(self.
config.coaddName +
"Coadd_meas", write=
True)):
431 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
436 """!Run measurement merging on a patch 438 Only slave nodes execute this method. 440 @param cache: Pool cache, containing butler 441 @param dataIdList: List of data identifiers for the patch in different filters 443 with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
445 dataId
in dataIdList]
446 if (
"mergeCoaddMeasurements" in self.
reuse and 447 not self.
config.reprocessing
and 448 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_ref", write=
True)):
449 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
450 dataRefList[0].dataId)
452 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
455 """!Run forced photometry on a patch for a single filter 457 Only slave nodes execute this method. 459 @param cache: Pool cache, with butler 460 @param dataId: Data identifier for patch 462 with self.
logOperation(
"forced photometry on %s" % (dataId,)):
465 if (
"forcedPhotCoadd" in self.
reuse and 466 not self.
config.reprocessing
and 467 dataRef.datasetExists(self.
config.coaddName +
"Coadd_forced_src", write=
True)):
468 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
473 """We don't collect any metadata, so skip"""
def write(self, patchRef, catalog)
Write the output.
Defines the fields and offsets for a table.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def makeSubtask(self, name, keyArgs)
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def writeMetadata(self, dataRef)
def runDeblendMerged(self, cache, dataIdList)
def getDataRef(butler, dataId, datasetType="raw")
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runMeasurements(self, cache, dataId)
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def makeTask(self, parsedCmd=None, args=None)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.