7 DeblendCoaddSourcesTask,
8 MeasureMergedCoaddSourcesTask,
9 MergeMeasurementsTask,)
20 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
21 doDetection =
Field(dtype=bool, default=
False,
22 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
24 doc=
"Detect sources on coadd")
26 target=MergeDetectionsTask, doc=
"Merge detections")
27 deblendCoaddSources =
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
29 doc=
"Measure merged and (optionally) deblended detections")
31 target=MergeMeasurementsTask, doc=
"Merge measurements")
33 doc=
"Forced measurement on coadded images")
35 dtype=bool, default=
False,
36 doc=(
"Are we reprocessing?\n\n"
37 "This exists as a workaround for large deblender footprints causing large memory use "
38 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
39 "and return to reprocess on a machine with larger memory or more time "
40 "if we consider those footprints important to recover."),
46 doc=
"Should be set to True if fakes were inserted into the data being processed."
50 Config.setDefaults(self)
55 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
56 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
57 coaddName = getattr(self, subtask).coaddName
59 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
64 """TaskRunner for running MultiBandTask
66 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
67 except that we have a list of data references instead of a single
68 data reference being passed to the Task.run, and we pass the results
69 of the '--reuse-outputs-from' command option to the Task constructor.
72 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
73 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
77 """A variant of the base version that passes a butler argument to the task's constructor
78 parsedCmd or args must be specified.
80 if parsedCmd
is not None:
81 butler = parsedCmd.butler
82 elif args
is not None:
83 dataRefList, kwargs = args
84 butler = dataRefList[0].butlerSubset.butler
86 raise RuntimeError(
"parsedCmd or args must be specified")
91 """Unpickle something by calling a factory"""
92 return factory(*args, **kwargs)
96 """Multi-node driver for multiband processing"""
97 ConfigClass = MultiBandDriverConfig
98 _DefaultName =
"multiBandDriver"
99 RunnerClass = MultiBandDriverTaskRunner
101 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
103 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor
104 in case it is needed.
105 @param[in] schema: the schema of the source detection catalog used as input.
106 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
107 catalog. May be None if the butler argument is provided or all steps requiring a reference
108 catalog are disabled.
110 BatchPoolTask.__init__(self, **kwargs)
112 assert butler
is not None,
"Butler not provided"
113 schema = butler.get(self.
config.coaddName +
114 "Coadd_det_schema", immediate=
True).schema
118 self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
119 if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
123 if self.
config.deblendCoaddSources.simultaneous:
128 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
133 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
138 self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
140 self.mergeCoaddDetections.merged.getPeakSchema()),
141 refObjLoader=refObjLoader, butler=butler)
143 self.measureCoaddSources.schema))
145 self.mergeCoaddMeasurements.schema))
153 return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
158 def _makeArgumentParser(cls, *args, **kwargs):
159 kwargs.pop(
"doBatch",
False)
161 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
162 ContainerClass=TractDataIdContainer)
163 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
164 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
169 """!Return walltime request for batch job
171 @param time: Requested time per iteration
172 @param parsedCmd: Results of argument parsing
173 @param numCores: Number of cores
176 for refList
in parsedCmd.id.refList:
177 numTargets += len(refList)
178 return time*numTargets/float(numCpus)
182 """!Run multiband processing on coadds
184 Only the master node runs this method.
186 No real MPI communication (scatter/gather) takes place: all I/O goes
187 through the disk. We want the intermediate stages on disk, and the
188 component Tasks are implemented around this, so we just follow suit.
190 @param patchRefList: Data references to run measurement
192 for patchRef
in patchRefList:
194 butler = patchRef.getButler()
197 raise RuntimeError(
"No valid patches")
200 pool.storeSet(butler=butler)
214 if self.
config.doDetection:
216 for patchRef
in patchRefList:
217 if (
"detectCoaddSources" in self.
reuse and
218 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp", write=
True)):
219 self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
222 if not patchRef.datasetExists(self.
coaddType +
"Coadd"):
223 self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
224 (patchRef.dataId, self.
config.coaddName))
226 detectionList.append(patchRef)
230 patchRefList = [patchRef
for patchRef
in patchRefList
if
231 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp")
and
232 patchRef.datasetExists(self.
config.coaddName +
"Coadd_det",
233 write=self.
config.doDetection)]
234 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
239 for patchRef
in patchRefList:
240 dataId = patchRef.dataId
242 tract = dataId[
"tract"]
244 assert tract == dataId[
"tract"]
246 patch = dataId[
"patch"]
247 if patch
not in patches:
249 patches[patch].
append(dataId)
278 if self.
config.reprocessing:
279 patchReprocessing = {}
280 for dataId, reprocess
in zip(dataIdList, reprocessed):
281 patchId = dataId[
"patch"]
282 patchReprocessing[patchId] = patchReprocessing.get(
283 patchId,
False)
or reprocess
285 reprocessDataset = self.
config.coaddName +
"Coadd_multibandReprocessing"
286 for patchId
in patchReprocessing:
287 if not patchReprocessing[patchId]:
289 dataId = dict(tract=tract, patch=patchId)
290 if patchReprocessing[patchId]:
291 filename = butler.get(
292 reprocessDataset +
"_filename", dataId)[0]
293 open(filename,
'a').close()
294 elif butler.datasetExists(reprocessDataset, dataId):
297 patchReprocessing[patchId] =
True
301 patchReprocessing[dataId1[
"patch"]]])
303 not self.
config.reprocessing
or patchReprocessing[patchId]])
304 pool.map(self.
runForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.
config.reprocessing
or
305 patchReprocessing[dataId1[
"patch"]]])
308 if self.
config.reprocessing:
309 for patchId
in patchReprocessing:
310 if not patchReprocessing[patchId]:
312 dataId = dict(tract=tract, patch=patchId)
313 filename = butler.get(
314 reprocessDataset +
"_filename", dataId)[0]
318 """! Run detection on a patch
320 Only slave nodes execute this method.
322 @param cache: Pool cache, containing butler
323 @param patchRef: Patch on which to do detection
326 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
327 coadd = patchRef.get(self.
coaddType +
"Coadd", immediate=
True)
328 expId = int(patchRef.get(self.
config.coaddName +
"CoaddId"))
330 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
331 self.detectCoaddSources.
write(detResults, patchRef)
335 """!Run detection merging on a patch
337 Only slave nodes execute this method.
339 @param cache: Pool cache, containing butler
340 @param dataIdList: List of data identifiers for the patch in different filters
342 with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
344 dataId
in dataIdList]
345 if (
"mergeCoaddDetections" in self.
reuse and
346 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_mergeDet", write=
True)):
347 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
348 dataRefList[0].dataId)
350 self.mergeCoaddDetections.
runDataRef(dataRefList)
353 """Run the deblender on a list of dataId's
355 Only slave nodes execute this method.
360 Pool cache with butler.
362 Data identifier for patch in each band.
367 whether the patch requires reprocessing.
369 with self.
logOperation(
"deblending %s" % (dataIdList,)):
371 dataId
in dataIdList]
373 if (
"deblendCoaddSources" in self.
reuse and
375 write=
True)
for dataRef
in dataRefList])):
376 if not self.
config.reprocessing:
377 self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
382 bigFlag = catalog[
"deblend_parentTooBig"]
384 numOldBig = bigFlag.sum()
386 self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
390 if self.
config.deblendCoaddSources.simultaneous:
391 deblender = self.deblendCoaddSources.multiBandDeblend
393 deblender = self.deblendCoaddSources.singleBandDeblend
398 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for
399 src
in catalog[bigFlag]))
400 if numNewBig == numOldBig:
401 self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
402 (numOldBig, dataRefList[0].dataId,))
404 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
405 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
408 self.deblendCoaddSources.
runDataRef(dataRefList)
412 """Run measurement on a patch for a single filter
414 Only slave nodes execute this method.
419 Pool cache, with butler
421 Data identifier for patch
423 with self.
logOperation(
"measurements on %s" % (dataId,)):
425 if (
"measureCoaddSources" in self.
reuse and
426 not self.
config.reprocessing
and
427 dataRef.datasetExists(self.
config.coaddName +
"Coadd_meas", write=
True)):
428 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
433 """!Run measurement merging on a patch
435 Only slave nodes execute this method.
437 @param cache: Pool cache, containing butler
438 @param dataIdList: List of data identifiers for the patch in different filters
440 with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
442 dataId
in dataIdList]
443 if (
"mergeCoaddMeasurements" in self.
reuse and
444 not self.
config.reprocessing
and
445 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_ref", write=
True)):
446 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
447 dataRefList[0].dataId)
449 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
452 """!Run forced photometry on a patch for a single filter
454 Only slave nodes execute this method.
456 @param cache: Pool cache, with butler
457 @param dataId: Data identifier for patch
459 with self.
logOperation(
"forced photometry on %s" % (dataId,)):
462 if (
"forcedPhotCoadd" in self.
reuse and
463 not self.
config.reprocessing
and
464 dataRef.datasetExists(self.
config.coaddName +
"Coadd_forced_src", write=
True)):
465 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
470 """We don't collect any metadata, so skip"""