1 from __future__
import absolute_import, division, print_function
3 from argparse
import ArgumentError
5 from builtins
import zip
11 DeblendCoaddSourcesTask,
12 MeasureMergedCoaddSourcesTask,
13 MergeMeasurementsTask,)
27 """!Make self.refList from self.idList 29 It's difficult to make a data reference that merely points to an entire 30 tract: there is no data product solely at the tract level. Instead, we 31 generate a list of data references for patches within the tract. 33 @param namespace namespace object that is the result of an argument parser 35 datasetType = namespace.config.coaddName +
"Coadd_calexp" 37 def getPatchRefList(tract):
38 return [namespace.butler.dataRef(datasetType=datasetType,
40 filter=dataId[
"filter"],
41 patch=
"%d,%d" % patch.getIndex())
45 for dataId
in self.idList:
48 if "filter" not in dataId:
49 raise ArgumentError(
None,
"--id must include 'filter'")
51 skymap = self.getSkymap(namespace, datasetType)
54 tractId = dataId[
"tract"]
55 if tractId
not in tractRefs:
56 tractRefs[tractId] = []
58 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=datasetType,
62 patch=dataId[
'patch']))
64 tractRefs[tractId] += getPatchRefList(skymap[tractId])
66 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
73 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
74 doDetection =
Field(dtype=bool, default=
False,
75 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
77 doc=
"Detect sources on coadd")
79 target=MergeDetectionsTask, doc=
"Merge detections")
80 deblendCoaddSources =
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
82 doc=
"Measure merged and (optionally) deblended detections")
84 target=MergeMeasurementsTask, doc=
"Merge measurements")
86 doc=
"Forced measurement on coadded images")
88 dtype=bool, default=
False,
89 doc=(
"Are we reprocessing?\n\n" 90 "This exists as a workaround for large deblender footprints causing large memory use " 91 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster " 92 "and return to reprocess on a machine with larger memory or more time " 93 "if we consider those footprints important to recover."),
97 Config.setDefaults(self)
101 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
102 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
103 coaddName = getattr(self, subtask).coaddName
105 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
110 """TaskRunner for running MultiBandTask 112 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner, 113 except that we have a list of data references instead of a single 114 data reference being passed to the Task.run, and we pass the results 115 of the '--reuse-outputs-from' command option to the Task constructor. 118 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
119 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
123 """A variant of the base version that passes a butler argument to the task's constructor 124 parsedCmd or args must be specified. 126 if parsedCmd
is not None:
127 butler = parsedCmd.butler
128 elif args
is not None:
129 dataRefList, kwargs = args
130 butler = dataRefList[0].butlerSubset.butler
132 raise RuntimeError(
"parsedCmd or args must be specified")
137 """Unpickle something by calling a factory""" 138 return factory(*args, **kwargs)
142 """Multi-node driver for multiband processing""" 143 ConfigClass = MultiBandDriverConfig
144 _DefaultName =
"multiBandDriver" 145 RunnerClass = MultiBandDriverTaskRunner
147 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
149 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor 150 in case it is needed. 151 @param[in] schema: the schema of the source detection catalog used as input. 152 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 153 catalog. May be None if the butler argument is provided or all steps requiring a reference 154 catalog are disabled. 156 BatchPoolTask.__init__(self, **kwargs)
158 assert butler
is not None,
"Butler not provided" 159 schema = butler.get(self.
config.coaddName +
160 "Coadd_det_schema", immediate=
True).schema
164 self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
165 if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
169 if self.
config.deblendCoaddSources.simultaneous:
170 if self.
config.deblendCoaddSources.multiBandDeblend.conserveFlux:
172 if self.
config.deblendCoaddSources.multiBandDeblend.saveTemplates:
177 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'" 182 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
187 self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
189 self.mergeCoaddDetections.merged.getPeakSchema()),
190 refObjLoader=refObjLoader, butler=butler)
192 self.measureCoaddSources.schema))
194 self.mergeCoaddMeasurements.schema))
198 return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
203 def _makeArgumentParser(cls, *args, **kwargs):
204 kwargs.pop(
"doBatch",
False)
206 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
207 ContainerClass=TractDataIdContainer)
208 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
209 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
214 """!Return walltime request for batch job 216 @param time: Requested time per iteration 217 @param parsedCmd: Results of argument parsing 218 @param numCores: Number of cores 221 for refList
in parsedCmd.id.refList:
222 numTargets += len(refList)
223 return time*numTargets/
float(numCpus)
227 """!Run multiband processing on coadds 229 Only the master node runs this method. 231 No real MPI communication (scatter/gather) takes place: all I/O goes 232 through the disk. We want the intermediate stages on disk, and the 233 component Tasks are implemented around this, so we just follow suit. 235 @param patchRefList: Data references to run measurement 237 for patchRef
in patchRefList:
239 butler = patchRef.getButler()
242 raise RuntimeError(
"No valid patches")
245 pool.storeSet(butler=butler)
260 if self.
config.doDetection:
262 for patchRef
in patchRefList:
263 if (
"detectCoaddSources" in self.
reuse and 264 patchRef.datasetExists(self.
config.coaddName +
"Coadd_calexp", write=
True)):
265 self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
268 if not patchRef.datasetExists(self.
config.coaddName +
"Coadd"):
269 self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
270 (patchRef.dataId, self.
config.coaddName))
272 detectionList.append(patchRef)
276 patchRefList = [patchRef
for patchRef
in patchRefList
if 277 patchRef.datasetExists(self.
config.coaddName +
"Coadd_calexp")
and 278 patchRef.datasetExists(self.
config.coaddName +
"Coadd_det",
279 write=self.
config.doDetection)]
280 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
285 for patchRef
in patchRefList:
286 dataId = patchRef.dataId
288 tract = dataId[
"tract"]
290 assert tract == dataId[
"tract"]
292 patch = dataId[
"patch"]
293 if patch
not in patches:
295 patches[patch].
append(dataId)
324 if self.
config.reprocessing:
325 patchReprocessing = {}
326 for dataId, reprocess
in zip(dataIdList, reprocessed):
327 patchId = dataId[
"patch"]
328 patchReprocessing[patchId] = patchReprocessing.get(
329 patchId,
False)
or reprocess
331 reprocessDataset = self.
config.coaddName +
"Coadd_multibandReprocessing" 332 for patchId
in patchReprocessing:
333 if not patchReprocessing[patchId]:
335 dataId = dict(tract=tract, patch=patchId)
336 if patchReprocessing[patchId]:
337 filename = butler.get(
338 reprocessDataset +
"_filename", dataId)[0]
339 open(filename,
'a').close()
340 elif butler.datasetExists(reprocessDataset, dataId):
343 patchReprocessing[patchId] =
True 347 patchReprocessing[dataId1[
"patch"]]])
349 not self.
config.reprocessing
or patchReprocessing[patchId]])
350 pool.map(self.
runForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.
config.reprocessing
or 351 patchReprocessing[dataId1[
"patch"]]])
354 if self.
config.reprocessing:
355 for patchId
in patchReprocessing:
356 if not patchReprocessing[patchId]:
358 dataId = dict(tract=tract, patch=patchId)
359 filename = butler.get(
360 reprocessDataset +
"_filename", dataId)[0]
364 """! Run detection on a patch 366 Only slave nodes execute this method. 368 @param cache: Pool cache, containing butler 369 @param patchRef: Patch on which to do detection 372 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
373 coadd = patchRef.get(self.
config.coaddName +
"Coadd",
375 expId =
int(patchRef.get(self.
config.coaddName +
"CoaddId"))
377 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
378 self.detectCoaddSources.write(detResults, patchRef)
382 """!Run detection merging on a patch 384 Only slave nodes execute this method. 386 @param cache: Pool cache, containing butler 387 @param dataIdList: List of data identifiers for the patch in different filters 389 with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
390 dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName +
"Coadd_calexp")
for 391 dataId
in dataIdList]
392 if (
"mergeCoaddDetections" in self.
reuse and 393 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_mergeDet", write=
True)):
394 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
395 dataRefList[0].dataId)
397 self.mergeCoaddDetections.
runDataRef(dataRefList)
400 """Run the deblender on a list of dataId's 402 Only slave nodes execute this method. 407 Pool cache with butler. 409 Data identifier for patch in each band. 414 whether the patch requires reprocessing. 416 with self.
logOperation(
"deblending %s" % (dataIdList,)):
417 dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName +
"Coadd_calexp")
for 418 dataId
in dataIdList]
420 if (
"deblendCoaddSources" in self.
reuse and 422 write=
True)
for dataRef
in dataRefList])):
423 if not self.
config.reprocessing:
424 self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
429 bigFlag = catalog[
"deblend_parentTooBig"]
431 numOldBig = bigFlag.sum()
433 self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
437 if self.
config.deblendCoaddSources.simultaneous:
438 deblender = self.deblendCoaddSources.multiBandDeblend
440 deblender = self.deblendCoaddSources.singleBandDeblend
445 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for 446 src
in catalog[bigFlag]))
447 if numNewBig == numOldBig:
448 self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
449 (numOldBig, dataRefList[0].dataId,))
451 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
452 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
455 self.deblendCoaddSources.
runDataRef(dataRefList)
459 """Run measurement on a patch for a single filter 461 Only slave nodes execute this method. 466 Pool cache, with butler 468 Data identifier for patch 470 with self.
logOperation(
"measurements on %s" % (dataId,)):
472 self.
config.coaddName +
"Coadd_calexp")
473 if (
"measureCoaddSources" in self.
reuse and 474 not self.
config.reprocessing
and 475 dataRef.datasetExists(self.
config.coaddName +
"Coadd_meas", write=
True)):
476 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
481 """!Run measurement merging on a patch 483 Only slave nodes execute this method. 485 @param cache: Pool cache, containing butler 486 @param dataIdList: List of data identifiers for the patch in different filters 488 with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
489 dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName +
"Coadd_calexp")
for 490 dataId
in dataIdList]
491 if (
"mergeCoaddMeasurements" in self.
reuse and 492 not self.
config.reprocessing
and 493 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_ref", write=
True)):
494 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
495 dataRefList[0].dataId)
497 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
500 """!Run forced photometry on a patch for a single filter 502 Only slave nodes execute this method. 504 @param cache: Pool cache, with butler 505 @param dataId: Data identifier for patch 507 with self.
logOperation(
"forced photometry on %s" % (dataId,)):
509 self.
config.coaddName +
"Coadd_calexp")
510 if (
"forcedPhotCoadd" in self.
reuse and 511 not self.
config.reprocessing
and 512 dataRef.datasetExists(self.
config.coaddName +
"Coadd_forced_src", write=
True)):
513 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
518 """We don't collect any metadata, so skip""" Defines the fields and offsets for a table.
def makeSubtask(self, name, keyArgs)
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def writeMetadata(self, dataRef)
def runDeblendMerged(self, cache, dataIdList)
def getDataRef(butler, dataId, datasetType="raw")
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runMeasurements(self, cache, dataId)
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def makeDataRefList(self, namespace)
Make self.refList from self.idList.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
daf::base::PropertyList * list
def makeTask(self, parsedCmd=None, args=None)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.