7 DeblendCoaddSourcesTask,
8 MeasureMergedCoaddSourcesTask,
9 MergeMeasurementsTask,)
20 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
21 doDetection =
Field(dtype=bool, default=
False,
22 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
24 doc=
"Detect sources on coadd")
26 target=MergeDetectionsTask, doc=
"Merge detections")
27 deblendCoaddSources =
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
29 doc=
"Measure merged and (optionally) deblended detections")
31 target=MergeMeasurementsTask, doc=
"Merge measurements")
33 doc=
"Forced measurement on coadded images")
35 dtype=bool, default=
False,
36 doc=(
"Are we reprocessing?\n\n"
37 "This exists as a workaround for large deblender footprints causing large memory use "
38 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
39 "and return to reprocess on a machine with larger memory or more time "
40 "if we consider those footprints important to recover."),
46 doc=
"Should be set to True if fakes were inserted into the data being processed."
50 Config.setDefaults(self)
51 self.
forcedPhotCoaddforcedPhotCoadd.references.retarget(MultiBandReferencesTask)
55 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
56 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
57 coaddName = getattr(self, subtask).coaddName
59 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
60 (subtask, coaddName, self.
coaddNamecoaddName))
64 """TaskRunner for running MultiBandTask
66 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
67 except that we have a list of data references instead of a single
68 data reference being passed to the Task.run, and we pass the results
69 of the '--reuse-outputs-from' command option to the Task constructor.
72 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
73 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
74 self.
reusereuse = parsedCmd.reuse
77 """A variant of the base version that passes a butler argument to the task's constructor
78 parsedCmd or args must be specified.
80 if parsedCmd
is not None:
81 butler = parsedCmd.butler
82 elif args
is not None:
83 dataRefList, kwargs = args
84 butler = dataRefList[0].butlerSubset.butler
86 raise RuntimeError(
"parsedCmd or args must be specified")
87 return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.
reusereuse)
91 """Unpickle something by calling a factory"""
92 return factory(*args, **kwargs)
96 """Multi-node driver for multiband processing"""
97 ConfigClass = MultiBandDriverConfig
98 _DefaultName =
"multiBandDriver"
99 RunnerClass = MultiBandDriverTaskRunner
101 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
103 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor
104 in case it is needed.
105 @param[in] schema: the schema of the source detection catalog used as input.
106 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
107 catalog. May be None if the butler argument is provided or all steps requiring a reference
108 catalog are disabled.
110 BatchPoolTask.__init__(self, **kwargs)
112 assert butler
is not None,
"Butler not provided"
113 schema = butler.get(self.config.coaddName +
114 "Coadd_det_schema", immediate=
True).schema
117 self.makeSubtask(
"detectCoaddSources")
118 self.makeSubtask(
"mergeCoaddDetections", schema=schema)
119 if self.config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
125 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
128 self.makeSubtask(
"deblendCoaddSources",
130 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
135 self.makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
137 self.mergeCoaddDetections.merged.getPeakSchema()),
138 refObjLoader=refObjLoader, butler=butler)
140 self.measureCoaddSources.schema))
142 self.mergeCoaddMeasurements.schema))
143 if self.config.hasFakes:
144 self.
coaddTypecoaddType =
"fakes_" + self.config.coaddName
146 self.
coaddTypecoaddType = self.config.coaddName
150 return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
151 parentTask=self._parentTask, log=self.log,
155 def _makeArgumentParser(cls, *args, **kwargs):
156 kwargs.pop(
"doBatch",
False)
157 parser = ArgumentParser(name=cls.
_DefaultName_DefaultName, *args, **kwargs)
158 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
159 ContainerClass=TractDataIdContainer)
160 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
161 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
166 """!Return walltime request for batch job
168 @param time: Requested time per iteration
169 @param parsedCmd: Results of argument parsing
170 @param numCores: Number of cores
173 for refList
in parsedCmd.id.refList:
174 numTargets += len(refList)
175 return time*numTargets/float(numCpus)
179 """!Run multiband processing on coadds
181 Only the master node runs this method.
183 No real MPI communication (scatter/gather) takes place: all I/O goes
184 through the disk. We want the intermediate stages on disk, and the
185 component Tasks are implemented around this, so we just follow suit.
187 @param patchRefList: Data references to run measurement
189 for patchRef
in patchRefList:
191 butler = patchRef.getButler()
194 raise RuntimeError(
"No valid patches")
197 pool.storeSet(butler=butler)
211 if self.config.doDetection:
213 for patchRef
in patchRefList:
214 if (
"detectCoaddSources" in self.
reusereuse
and
215 patchRef.datasetExists(self.
coaddTypecoaddType +
"Coadd_calexp", write=
True)):
216 self.log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
219 if not patchRef.datasetExists(self.
coaddTypecoaddType +
"Coadd"):
220 self.log.
debug(
"Not processing %s; required input %sCoadd missing." %
221 (patchRef.dataId, self.config.coaddName))
223 detectionList.append(patchRef)
227 patchRefList = [patchRef
for patchRef
in patchRefList
if
228 patchRef.datasetExists(self.
coaddTypecoaddType +
"Coadd_calexp")
and
229 patchRef.datasetExists(self.config.coaddName +
"Coadd_det",
230 write=self.config.doDetection)]
231 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
236 for patchRef
in patchRefList:
237 dataId = patchRef.dataId
239 tract = dataId[
"tract"]
241 assert tract == dataId[
"tract"]
243 patch = dataId[
"patch"]
244 if patch
not in patches:
246 patches[patch].
append(dataId)
273 reprocessed = pool.map(self.
runDeblendMergedrunDeblendMerged, patches.values())
275 if self.config.reprocessing:
276 patchReprocessing = {}
277 for dataId, reprocess
in zip(dataIdList, reprocessed):
278 patchId = dataId[
"patch"]
279 patchReprocessing[patchId] = patchReprocessing.get(
280 patchId,
False)
or reprocess
282 reprocessDataset = self.config.coaddName +
"Coadd_multibandReprocessing"
283 for patchId
in patchReprocessing:
284 if not patchReprocessing[patchId]:
286 dataId = dict(tract=tract, patch=patchId)
287 if patchReprocessing[patchId]:
288 filename = butler.get(
289 reprocessDataset +
"_filename", dataId)[0]
290 open(filename,
'a').close()
291 elif butler.datasetExists(reprocessDataset, dataId):
294 patchReprocessing[patchId] =
True
297 pool.map(self.
runMeasurementsrunMeasurements, [dataId1
for dataId1
in dataIdList
if not self.config.reprocessing
or
298 patchReprocessing[dataId1[
"patch"]]])
299 pool.map(self.
runMergeMeasurementsrunMergeMeasurements, [idList
for patchId, idList
in patches.items()
if
300 not self.config.reprocessing
or patchReprocessing[patchId]])
301 pool.map(self.
runForcedPhotrunForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.config.reprocessing
or
302 patchReprocessing[dataId1[
"patch"]]])
305 if self.config.reprocessing:
306 for patchId
in patchReprocessing:
307 if not patchReprocessing[patchId]:
309 dataId = dict(tract=tract, patch=patchId)
310 filename = butler.get(
311 reprocessDataset +
"_filename", dataId)[0]
315 """! Run detection on a patch
317 Only slave nodes execute this method.
319 @param cache: Pool cache, containing butler
320 @param patchRef: Patch on which to do detection
323 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
324 coadd = patchRef.get(self.
coaddTypecoaddType +
"Coadd", immediate=
True)
325 expId = int(patchRef.get(self.config.coaddName +
"CoaddId"))
326 self.detectCoaddSources.emptyMetadata()
327 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
328 self.detectCoaddSources.
write(detResults, patchRef)
332 """!Run detection merging on a patch
334 Only slave nodes execute this method.
336 @param cache: Pool cache, containing butler
337 @param dataIdList: List of data identifiers for the patch in different filters
339 with self.
logOperationlogOperation(
"merge detections from %s" % (dataIdList,)):
340 dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType +
"Coadd_calexp")
for
341 dataId
in dataIdList]
342 if (
"mergeCoaddDetections" in self.
reusereuse
and
343 dataRefList[0].datasetExists(self.config.coaddName +
"Coadd_mergeDet", write=
True)):
344 self.log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
345 dataRefList[0].dataId)
347 self.mergeCoaddDetections.
runDataRef(dataRefList)
350 """Run the deblender on a list of dataId's
352 Only slave nodes execute this method.
357 Pool cache with butler.
359 Data identifier for patch in each band.
364 whether the patch requires reprocessing.
366 with self.
logOperationlogOperation(
"deblending %s" % (dataIdList,)):
367 dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType +
"Coadd_calexp")
for
368 dataId
in dataIdList]
370 if (
"deblendCoaddSources" in self.
reusereuse
and
371 all([dataRef.datasetExists(self.config.coaddName +
"Coadd_" + self.
measurementInputmeasurementInput,
372 write=
True)
for dataRef
in dataRefList])):
373 if not self.config.reprocessing:
374 self.log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
378 catalog = dataRefList[0].get(self.config.coaddName +
"Coadd_" + self.
measurementInputmeasurementInput)
379 bigFlag = catalog[
"deblend_parentTooBig"]
381 numOldBig = bigFlag.sum()
383 self.log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
387 if self.config.deblendCoaddSources.simultaneous:
388 deblender = self.deblendCoaddSources.multiBandDeblend
390 deblender = self.deblendCoaddSources.singleBandDeblend
395 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for
396 src
in catalog[bigFlag]))
397 if numNewBig == numOldBig:
398 self.log.
info(
"All %d formerly large footprints continue to be large in %s" %
399 (numOldBig, dataRefList[0].dataId,))
401 self.log.
info(
"Found %d large footprints to be reprocessed in %s" %
402 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
405 self.deblendCoaddSources.
runDataRef(dataRefList)
409 """Run measurement on a patch for a single filter
411 Only slave nodes execute this method.
416 Pool cache, with butler
418 Data identifier for patch
420 with self.
logOperationlogOperation(
"measurements on %s" % (dataId,)):
422 if (
"measureCoaddSources" in self.
reusereuse
and
423 not self.config.reprocessing
and
424 dataRef.datasetExists(self.config.coaddName +
"Coadd_meas", write=
True)):
425 self.log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
430 """!Run measurement merging on a patch
432 Only slave nodes execute this method.
434 @param cache: Pool cache, containing butler
435 @param dataIdList: List of data identifiers for the patch in different filters
437 with self.
logOperationlogOperation(
"merge measurements from %s" % (dataIdList,)):
438 dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType +
"Coadd_calexp")
for
439 dataId
in dataIdList]
440 if (
"mergeCoaddMeasurements" in self.
reusereuse
and
441 not self.config.reprocessing
and
442 dataRefList[0].datasetExists(self.config.coaddName +
"Coadd_ref", write=
True)):
443 self.log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
444 dataRefList[0].dataId)
446 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
449 """!Run forced photometry on a patch for a single filter
451 Only slave nodes execute this method.
453 @param cache: Pool cache, with butler
454 @param dataId: Data identifier for patch
456 with self.
logOperationlogOperation(
"forced photometry on %s" % (dataId,)):
458 self.
coaddTypecoaddType +
"Coadd_calexp")
459 if (
"forcedPhotCoadd" in self.
reusereuse
and
460 not self.config.reprocessing
and
461 dataRef.datasetExists(self.config.coaddName +
"Coadd_forced_src", write=
True)):
462 self.log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
467 """We don't collect any metadata, so skip"""
Defines the fields and offsets for a table.
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def writeMetadata(self, dataRef)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.
def runDeblendMerged(self, cache, dataIdList)
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def runMeasurements(self, cache, dataId)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs)
def makeTask(self, parsedCmd=None, args=None)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def run(self, coaddExposures, bbox, wcs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def unpickle(factory, args, kwargs)
def getDataRef(butler, dataId, datasetType="raw")