3from lsst.coadd.utils.getGen3CoaddExposureId
import getGen3CoaddExposureId
8 DeblendCoaddSourcesTask,
9 MeasureMergedCoaddSourcesTask,
10 MergeMeasurementsTask,)
13from lsst.meas.base.references
import MultiBandReferencesTask
21 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
22 doDetection =
Field(dtype=bool, default=
False,
23 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
25 doc=
"Detect sources on coadd")
27 target=MergeDetectionsTask, doc=
"Merge detections")
28 deblendCoaddSources =
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
30 doc=
"Measure merged and (optionally) deblended detections")
32 target=MergeMeasurementsTask, doc=
"Merge measurements")
34 doc=
"Forced measurement on coadded images")
36 dtype=bool, default=
False,
37 doc=(
"Are we reprocessing?\n\n"
38 "This exists as a workaround for large deblender footprints causing large memory use "
39 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster "
40 "and return to reprocess on a machine with larger memory or more time "
41 "if we consider those footprints important to recover."),
47 doc=
"Should be set to True if fakes were inserted into the data being processed."
51 Config.setDefaults(self)
56 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
57 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
58 coaddName = getattr(self, subtask).coaddName
60 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
65 """TaskRunner for running MultiBandTask
67 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,
68 except that we have a list of data references instead of a single
69 data reference being passed to the Task.run,
and we
pass the results
70 of the
'--reuse-outputs-from' command option to the Task constructor.
73 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
74 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
78 """A variant of the base version that passes a butler argument to the task's constructor
79 parsedCmd or args must be specified.
81 if parsedCmd
is not None:
82 butler = parsedCmd.butler
83 elif args
is not None:
84 dataRefList, kwargs = args
85 butler = dataRefList[0].butlerSubset.butler
87 raise RuntimeError(
"parsedCmd or args must be specified")
88 return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.
reuse)
92 """Unpickle something by calling a factory"""
93 return factory(*args, **kwargs)
97 """Multi-node driver for multiband processing"""
98 ConfigClass = MultiBandDriverConfig
99 _DefaultName =
"multiBandDriver"
100 RunnerClass = MultiBandDriverTaskRunner
102 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
104 @param[
in] butler: the butler can be used to retrieve schema
or passed to the refObjLoader constructor
105 in case it
is needed.
106 @param[
in] schema: the schema of the source detection catalog used
as input.
107 @param[
in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
108 catalog. May be
None if the butler argument
is provided
or all steps requiring a reference
109 catalog are disabled.
111 BatchPoolTask.__init__(self, **kwargs)
113 assert butler
is not None,
"Butler not provided"
114 schema = butler.get(self.config.coaddName +
115 "Coadd_det_schema", immediate=
True).schema
118 self.makeSubtask(
"detectCoaddSources")
119 self.makeSubtask(
"mergeCoaddDetections", schema=schema)
120 if self.config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
126 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
129 self.makeSubtask(
"deblendCoaddSources",
131 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
136 self.makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
138 self.mergeCoaddDetections.merged.getPeakSchema()),
139 refObjLoader=refObjLoader, butler=butler)
141 self.measureCoaddSources.schema))
143 self.mergeCoaddMeasurements.schema))
144 if self.config.hasFakes:
151 return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
152 parentTask=self._parentTask, log=self.log,
156 def _makeArgumentParser(cls, *args, **kwargs):
157 kwargs.pop(
"doBatch",
False)
158 parser = ArgumentParser(name=cls.
_DefaultName, *args, **kwargs)
159 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
160 ContainerClass=TractDataIdContainer)
161 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
162 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
167 """!Return walltime request for batch job
169 @param time: Requested time per iteration
170 @param parsedCmd: Results of argument parsing
171 @param numCores: Number of cores
174 for refList
in parsedCmd.id.refList:
175 numTargets += len(refList)
176 return time*numTargets/float(numCpus)
180 """!Run multiband processing on coadds
182 Only the master node runs this method.
184 No real MPI communication (scatter/gather) takes place: all I/O goes
185 through the disk. We want the intermediate stages on disk, and the
186 component Tasks are implemented around this, so we just follow suit.
188 @param patchRefList: Data references to run measurement
190 for patchRef
in patchRefList:
192 butler = patchRef.getButler()
195 raise RuntimeError(
"No valid patches")
198 pool.storeSet(butler=butler)
212 if self.config.doDetection:
214 for patchRef
in patchRefList:
215 if (
"detectCoaddSources" in self.
reuse and
216 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp", write=
True)):
217 self.log.info(
"Skipping detectCoaddSources for %s; output already exists." %
220 if not patchRef.datasetExists(self.
coaddType +
"Coadd"):
221 self.log.debug(
"Not processing %s; required input %sCoadd missing." %
222 (patchRef.dataId, self.config.coaddName))
224 detectionList.append(patchRef)
228 patchRefList = [patchRef
for patchRef
in patchRefList
if
229 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp")
and
230 patchRef.datasetExists(self.config.coaddName +
"Coadd_det",
231 write=self.config.doDetection)]
232 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
237 for patchRef
in patchRefList:
238 dataId = patchRef.dataId
240 tract = dataId[
"tract"]
242 assert tract == dataId[
"tract"]
244 patch = dataId[
"patch"]
245 if patch
not in patches:
247 patches[patch].append(dataId)
276 if self.config.reprocessing:
277 patchReprocessing = {}
278 for dataId, reprocess
in zip(dataIdList, reprocessed):
279 patchId = dataId[
"patch"]
280 patchReprocessing[patchId] = patchReprocessing.get(
281 patchId,
False)
or reprocess
283 reprocessDataset = self.config.coaddName +
"Coadd_multibandReprocessing"
284 for patchId
in patchReprocessing:
285 if not patchReprocessing[patchId]:
287 dataId = dict(tract=tract, patch=patchId)
288 if patchReprocessing[patchId]:
289 filename = butler.get(
290 reprocessDataset +
"_filename", dataId)[0]
291 open(filename,
'a').close()
292 elif butler.datasetExists(reprocessDataset, dataId):
295 patchReprocessing[patchId] =
True
298 pool.map(self.
runMeasurements, [dataId1
for dataId1
in dataIdList
if not self.config.reprocessing
or
299 patchReprocessing[dataId1[
"patch"]]])
301 not self.config.reprocessing
or patchReprocessing[patchId]])
302 pool.map(self.
runForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.config.reprocessing
or
303 patchReprocessing[dataId1[
"patch"]]])
306 if self.config.reprocessing:
307 for patchId
in patchReprocessing:
308 if not patchReprocessing[patchId]:
310 dataId = dict(tract=tract, patch=patchId)
311 filename = butler.get(
312 reprocessDataset +
"_filename", dataId)[0]
316 """! Run detection on a patch
318 Only slave nodes execute this method.
320 @param cache: Pool cache, containing butler
321 @param patchRef: Patch on which to do detection
323 with self.
logOperation(
"do detections on {}".format(patchRef.dataId)):
324 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
325 coadd = patchRef.get(self.
coaddType +
"Coadd", immediate=
True)
326 expId = getGen3CoaddExposureId(patchRef, coaddName=self.config.coaddName, log=self.log)
327 self.detectCoaddSources.emptyMetadata()
328 detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
329 self.detectCoaddSources.write(detResults, patchRef)
333 """!Run detection merging on a patch
335 Only slave nodes execute this method.
337 @param cache: Pool cache, containing butler
338 @param dataIdList: List of data identifiers
for the patch
in different filters
340 with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
341 dataRefList = [getDataRef(cache.butler, dataId, self.
coaddType +
"Coadd_calexp")
for
342 dataId
in dataIdList]
343 if (
"mergeCoaddDetections" in self.
reuse and
344 dataRefList[0].datasetExists(self.config.coaddName +
"Coadd_mergeDet", write=
True)):
345 self.log.info(
"Skipping mergeCoaddDetections for %s; output already exists." %
346 dataRefList[0].dataId)
348 self.mergeCoaddDetections.
runDataRef(dataRefList)
351 """Run the deblender on a list of dataId's
353 Only slave nodes execute this method.
358 Pool cache with butler.
360 Data identifier
for patch
in each band.
365 whether the patch requires reprocessing.
367 with self.
logOperation(
"deblending %s" % (dataIdList,)):
368 dataRefList = [getDataRef(cache.butler, dataId, self.
coaddType +
"Coadd_calexp")
for
369 dataId
in dataIdList]
371 if (
"deblendCoaddSources" in self.
reuse and
372 all([dataRef.datasetExists(self.config.coaddName +
"Coadd_" + self.
measurementInput,
373 write=
True)
for dataRef
in dataRefList])):
374 if not self.config.reprocessing:
375 self.log.info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
379 catalog = dataRefList[0].get(self.config.coaddName +
"Coadd_" + self.
measurementInput)
380 bigFlag = catalog[
"deblend_parentTooBig"]
382 numOldBig = bigFlag.sum()
384 self.log.info(
"No large footprints in %s" % (dataRefList[0].dataId))
388 if self.config.deblendCoaddSources.simultaneous:
389 deblender = self.deblendCoaddSources.multiBandDeblend
391 deblender = self.deblendCoaddSources.singleBandDeblend
396 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for
397 src
in catalog[bigFlag]))
398 if numNewBig == numOldBig:
399 self.log.info(
"All %d formerly large footprints continue to be large in %s" %
400 (numOldBig, dataRefList[0].dataId,))
402 self.log.info(
"Found %d large footprints to be reprocessed in %s" %
403 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
406 self.deblendCoaddSources.
runDataRef(dataRefList)
410 """Run measurement on a patch for a single filter
412 Only slave nodes execute this method.
417 Pool cache, with butler
419 Data identifier
for patch
421 with self.
logOperation(
"measurements on %s" % (dataId,)):
422 dataRef = getDataRef(cache.butler, dataId, self.
coaddType +
"Coadd_calexp")
423 if (
"measureCoaddSources" in self.
reuse and
424 not self.config.reprocessing
and
425 dataRef.datasetExists(self.config.coaddName +
"Coadd_meas", write=
True)):
426 self.log.info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
431 """!Run measurement merging on a patch
433 Only slave nodes execute this method.
435 @param cache: Pool cache, containing butler
436 @param dataIdList: List of data identifiers
for the patch
in different filters
438 with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
439 dataRefList = [getDataRef(cache.butler, dataId, self.
coaddType +
"Coadd_calexp")
for
440 dataId
in dataIdList]
441 if (
"mergeCoaddMeasurements" in self.
reuse and
442 not self.config.reprocessing
and
443 dataRefList[0].datasetExists(self.config.coaddName +
"Coadd_ref", write=
True)):
444 self.log.info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
445 dataRefList[0].dataId)
447 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
450 """!Run forced photometry on a patch for a single filter
452 Only slave nodes execute this method.
454 @param cache: Pool cache,
with butler
455 @param dataId: Data identifier
for patch
457 with self.
logOperation(
"forced photometry on %s" % (dataId,)):
458 dataRef = getDataRef(cache.butler, dataId,
460 if (
"forcedPhotCoadd" in self.
reuse and
461 not self.config.reprocessing
and
462 dataRef.datasetExists(self.config.coaddName +
"Coadd_forced_src", write=
True)):
463 self.log.info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
468 """We don't collect any metadata, so skip"""
Defines the fields and offsets for a table.
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def writeMetadata(self, dataRef)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.
def runDeblendMerged(self, cache, dataIdList)
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def runMeasurements(self, cache, dataId)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs)
def makeTask(self, parsedCmd=None, args=None)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def unpickle(factory, args, kwargs)