3 from lsst.pex.config
import Config, Field, ConfigurableField
7 DeblendCoaddSourcesTask,
8 MeasureMergedCoaddSourcesTask,
9 MergeMeasurementsTask,)
20 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
21 doDetection = Field(dtype=bool, default=
False,
22 doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
23 detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
24 doc=
"Detect sources on coadd")
25 mergeCoaddDetections = ConfigurableField(
26 target=MergeDetectionsTask, doc=
"Merge detections")
27 deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
28 measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
29 doc=
"Measure merged and (optionally) deblended detections")
30 mergeCoaddMeasurements = ConfigurableField(
31 target=MergeMeasurementsTask, doc=
"Merge measurements")
32 forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
33 doc=
"Forced measurement on coadded images")
35 dtype=bool, default=
False,
36 doc=(
"Are we reprocessing?\n\n" 37 "This exists as a workaround for large deblender footprints causing large memory use " 38 "and/or very slow processing. We refuse to deblend those footprints when running on a cluster " 39 "and return to reprocess on a machine with larger memory or more time " 40 "if we consider those footprints important to recover."),
46 doc=
"Should be set to True if fakes were inserted into the data being processed." 50 Config.setDefaults(self)
55 for subtask
in (
"mergeCoaddDetections",
"deblendCoaddSources",
"measureCoaddSources",
56 "mergeCoaddMeasurements",
"forcedPhotCoadd"):
57 coaddName = getattr(self, subtask).coaddName
59 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
64 """TaskRunner for running MultiBandTask 66 This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner, 67 except that we have a list of data references instead of a single 68 data reference being passed to the Task.run, and we pass the results 69 of the '--reuse-outputs-from' command option to the Task constructor. 72 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
73 TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
77 """A variant of the base version that passes a butler argument to the task's constructor 78 parsedCmd or args must be specified. 80 if parsedCmd
is not None:
81 butler = parsedCmd.butler
82 elif args
is not None:
83 dataRefList, kwargs = args
84 butler = dataRefList[0].butlerSubset.butler
86 raise RuntimeError(
"parsedCmd or args must be specified")
91 """Unpickle something by calling a factory""" 92 return factory(*args, **kwargs)
96 """Multi-node driver for multiband processing""" 97 ConfigClass = MultiBandDriverConfig
98 _DefaultName =
"multiBandDriver" 99 RunnerClass = MultiBandDriverTaskRunner
101 def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
103 @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor 104 in case it is needed. 105 @param[in] schema: the schema of the source detection catalog used as input. 106 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 107 catalog. May be None if the butler argument is provided or all steps requiring a reference 108 catalog are disabled. 110 BatchPoolTask.__init__(self, **kwargs)
112 assert butler
is not None,
"Butler not provided" 113 schema = butler.get(self.
config.coaddName +
114 "Coadd_det_schema", immediate=
True).schema
118 self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
119 if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
123 if self.
config.deblendCoaddSources.simultaneous:
128 err =
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'" 133 peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
138 self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
140 self.mergeCoaddDetections.merged.getPeakSchema()),
141 refObjLoader=refObjLoader, butler=butler)
143 self.measureCoaddSources.schema))
145 self.mergeCoaddMeasurements.schema))
153 return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
158 def _makeArgumentParser(cls, *args, **kwargs):
159 kwargs.pop(
"doBatch",
False)
161 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
162 ContainerClass=TractDataIdContainer)
163 parser.addReuseOption([
"detectCoaddSources",
"mergeCoaddDetections",
"measureCoaddSources",
164 "mergeCoaddMeasurements",
"forcedPhotCoadd",
"deblendCoaddSources"])
169 """!Return walltime request for batch job 171 @param time: Requested time per iteration 172 @param parsedCmd: Results of argument parsing 173 @param numCores: Number of cores 176 for refList
in parsedCmd.id.refList:
177 numTargets += len(refList)
178 return time*numTargets/float(numCpus)
182 """!Run multiband processing on coadds 184 Only the master node runs this method. 186 No real MPI communication (scatter/gather) takes place: all I/O goes 187 through the disk. We want the intermediate stages on disk, and the 188 component Tasks are implemented around this, so we just follow suit. 190 @param patchRefList: Data references to run measurement 192 for patchRef
in patchRefList:
194 butler = patchRef.getButler()
197 raise RuntimeError(
"No valid patches")
200 pool.storeSet(butler=butler)
214 if self.
config.doDetection:
216 for patchRef
in patchRefList:
217 if (
"detectCoaddSources" in self.
reuse and 218 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp", write=
True)):
219 self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
222 if not patchRef.datasetExists(self.
coaddType +
"Coadd"):
223 self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
224 (patchRef.dataId, self.
config.coaddName))
226 detectionList.append(patchRef)
230 patchRefList = [patchRef
for patchRef
in patchRefList
if 231 patchRef.datasetExists(self.
coaddType +
"Coadd_calexp")
and 232 patchRef.datasetExists(self.
config.coaddName +
"Coadd_det",
233 write=self.
config.doDetection)]
234 dataIdList = [patchRef.dataId
for patchRef
in patchRefList]
239 for patchRef
in patchRefList:
240 dataId = patchRef.dataId
242 tract = dataId[
"tract"]
244 assert tract == dataId[
"tract"]
246 patch = dataId[
"patch"]
247 if patch
not in patches:
249 patches[patch].
append(dataId)
278 if self.
config.reprocessing:
279 patchReprocessing = {}
280 for dataId, reprocess
in zip(dataIdList, reprocessed):
281 patchId = dataId[
"patch"]
282 patchReprocessing[patchId] = patchReprocessing.get(
283 patchId,
False)
or reprocess
285 reprocessDataset = self.
config.coaddName +
"Coadd_multibandReprocessing" 286 for patchId
in patchReprocessing:
287 if not patchReprocessing[patchId]:
289 dataId = dict(tract=tract, patch=patchId)
290 if patchReprocessing[patchId]:
291 filename = butler.get(
292 reprocessDataset +
"_filename", dataId)[0]
293 open(filename,
'a').close()
294 elif butler.datasetExists(reprocessDataset, dataId):
297 patchReprocessing[patchId] =
True 301 patchReprocessing[dataId1[
"patch"]]])
303 not self.
config.reprocessing
or patchReprocessing[patchId]])
304 pool.map(self.
runForcedPhot, [dataId1
for dataId1
in dataIdList
if not self.
config.reprocessing
or 305 patchReprocessing[dataId1[
"patch"]]])
308 if self.
config.reprocessing:
309 for patchId
in patchReprocessing:
310 if not patchReprocessing[patchId]:
312 dataId = dict(tract=tract, patch=patchId)
313 filename = butler.get(
314 reprocessDataset +
"_filename", dataId)[0]
318 """! Run detection on a patch 320 Only slave nodes execute this method. 322 @param cache: Pool cache, containing butler 323 @param patchRef: Patch on which to do detection 326 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
327 coadd = patchRef.get(self.
coaddType +
"Coadd", immediate=
True)
328 expId = int(patchRef.get(self.
config.coaddName +
"CoaddId"))
330 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
331 self.detectCoaddSources.
write(detResults, patchRef)
335 """!Run detection merging on a patch 337 Only slave nodes execute this method. 339 @param cache: Pool cache, containing butler 340 @param dataIdList: List of data identifiers for the patch in different filters 342 with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
344 dataId
in dataIdList]
345 if (
"mergeCoaddDetections" in self.
reuse and 346 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_mergeDet", write=
True)):
347 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
348 dataRefList[0].dataId)
350 self.mergeCoaddDetections.
runDataRef(dataRefList)
353 """Run the deblender on a list of dataId's 355 Only slave nodes execute this method. 360 Pool cache with butler. 362 Data identifier for patch in each band. 367 whether the patch requires reprocessing. 369 with self.
logOperation(
"deblending %s" % (dataIdList,)):
371 dataId
in dataIdList]
373 if (
"deblendCoaddSources" in self.
reuse and 375 write=
True)
for dataRef
in dataRefList])):
376 if not self.
config.reprocessing:
377 self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
382 bigFlag = catalog[
"deblend_parentTooBig"]
384 numOldBig = bigFlag.sum()
386 self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
390 if self.
config.deblendCoaddSources.simultaneous:
391 deblender = self.deblendCoaddSources.multiBandDeblend
393 deblender = self.deblendCoaddSources.singleBandDeblend
398 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint())
for 399 src
in catalog[bigFlag]))
400 if numNewBig == numOldBig:
401 self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
402 (numOldBig, dataRefList[0].dataId,))
404 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
405 (numOldBig - numNewBig, [dataRef.dataId
for dataRef
in dataRefList]))
408 self.deblendCoaddSources.
runDataRef(dataRefList)
412 """Run measurement on a patch for a single filter 414 Only slave nodes execute this method. 419 Pool cache, with butler 421 Data identifier for patch 423 with self.
logOperation(
"measurements on %s" % (dataId,)):
425 if (
"measureCoaddSources" in self.
reuse and 426 not self.
config.reprocessing
and 427 dataRef.datasetExists(self.
config.coaddName +
"Coadd_meas", write=
True)):
428 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
433 """!Run measurement merging on a patch 435 Only slave nodes execute this method. 437 @param cache: Pool cache, containing butler 438 @param dataIdList: List of data identifiers for the patch in different filters 440 with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
442 dataId
in dataIdList]
443 if (
"mergeCoaddMeasurements" in self.
reuse and 444 not self.
config.reprocessing
and 445 dataRefList[0].datasetExists(self.
config.coaddName +
"Coadd_ref", write=
True)):
446 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
447 dataRefList[0].dataId)
449 self.mergeCoaddMeasurements.
runDataRef(dataRefList)
452 """!Run forced photometry on a patch for a single filter 454 Only slave nodes execute this method. 456 @param cache: Pool cache, with butler 457 @param dataId: Data identifier for patch 459 with self.
logOperation(
"forced photometry on %s" % (dataId,)):
462 if (
"forcedPhotCoadd" in self.
reuse and 463 not self.
config.reprocessing
and 464 dataRef.datasetExists(self.
config.coaddName +
"Coadd_forced_src", write=
True)):
465 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
470 """We don't collect any metadata, so skip"""
def write(self, patchRef, catalog)
Write the output.
Defines the fields and offsets for a table.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def makeSubtask(self, name, keyArgs)
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def writeMetadata(self, dataRef)
def runDeblendMerged(self, cache, dataIdList)
def getDataRef(butler, dataId, datasetType="raw")
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runMeasurements(self, cache, dataId)
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def makeTask(self, parsedCmd=None, args=None)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.