24 from lsst.pipe.base import (CmdLineTask, Struct, ArgumentParser, ButlerInitializedTaskRunner,
25 PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
27 from lsst.pex.config
import Config, Field, ConfigurableField
29 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
31 from lsst.meas.extensions.scarlet
import ScarletDeblendTask
43 from .mergeDetections
import MergeDetectionsConfig, MergeDetectionsTask
44 from .mergeMeasurements
import MergeMeasurementsConfig, MergeMeasurementsTask
45 from .multiBandUtils
import MergeSourcesRunner, CullPeaksConfig, _makeGetSchemaCatalogs
46 from .multiBandUtils
import getInputSchema, getShortFilterName, readCatalog, _makeMakeIdFactory
47 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleConfig
48 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleTask
49 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiConfig
50 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiTask
55 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter) 56 * deepCoadd_mergeDet: merged detections (tract, patch) 57 * deepCoadd_meas: measurements of merged detections (tract, patch, filter) 58 * deepCoadd_ref: reference sources (tract, patch) 59 All of these have associated *_schema catalogs that require no data ID and hold no records. 61 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in 62 the mergeDet, meas, and ref dataset Footprints: 63 * deepCoadd_peak_schema 69 defaultTemplates={
"inputCoaddName":
"deep",
"outputCoaddName":
"deep"}):
70 detectionSchema = cT.InitOutput(
71 doc=
"Schema of the detection catalog",
72 name=
"{outputCoaddName}Coadd_det_schema",
73 storageClass=
"SourceCatalog",
76 doc=
"Exposure on which detections are to be performed",
77 name=
"{inputCoaddName}Coadd",
78 storageClass=
"ExposureF",
79 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap")
81 outputBackgrounds = cT.Output(
82 doc=
"Output Backgrounds used in detection",
83 name=
"{outputCoaddName}Coadd_calexp_background",
84 storageClass=
"Background",
85 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap")
87 outputSources = cT.Output(
88 doc=
"Detected sources catalog",
89 name=
"{outputCoaddName}Coadd_det",
90 storageClass=
"SourceCatalog",
91 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap")
93 outputExposure = cT.Output(
94 doc=
"Exposure post detection",
95 name=
"{outputCoaddName}Coadd_calexp",
96 storageClass=
"ExposureF",
97 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap")
101 class DetectCoaddSourcesConfig(PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
103 @anchor DetectCoaddSourcesConfig_ 105 @brief Configuration parameters for the DetectCoaddSourcesTask 107 doScaleVariance = Field(dtype=bool, default=
True, doc=
"Scale variance plane using empirical noise?")
108 scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc=
"Variance rescaling")
109 detection = ConfigurableField(target=DynamicDetectionTask, doc=
"Source detection")
110 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
111 doInsertFakes = Field(dtype=bool, default=
False,
112 doc=
"Run fake sources injection task")
113 insertFakes = ConfigurableField(target=BaseFakeSourcesTask,
114 doc=
"Injection of fake sources for testing " 115 "purposes (must be retargeted)")
119 doc=
"Should be set to True if fake sources have been inserted into the input data." 124 self.detection.thresholdType =
"pixel_stdev" 125 self.detection.isotropicGrow =
True 127 self.detection.reEstimateBackground =
False 128 self.detection.background.useApprox =
False 129 self.detection.background.binSize = 4096
130 self.detection.background.undersampleStyle =
'REDUCE_INTERP_ORDER' 131 self.detection.doTempWideBackground =
True 141 class DetectCoaddSourcesTask(PipelineTask, CmdLineTask):
143 @anchor DetectCoaddSourcesTask_ 145 @brief Detect sources on a coadd 147 @section pipe_tasks_multiBand_Contents Contents 149 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose 150 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize 151 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run 152 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config 153 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug 154 - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example 156 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose Description 158 Command-line task that detects sources on a coadd of exposures obtained with a single filter. 160 Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise 161 properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane 162 in the coadd to match the observed variance. This is an approximate approach -- strictly, we should 163 propagate the full covariance matrix -- but it is simple and works well in practice. 165 After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref 166 SourceDetectionTask_ "detection" subtask. 169 deepCoadd{tract,patch,filter}: ExposureF 171 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints) 172 @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input 174 @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList 178 DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask. 179 You can retarget this subtask if you wish. 181 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize Task initialization 183 @copydoc \_\_init\_\_ 185 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run Invoking the Task 189 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config Configuration parameters 191 See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig" 193 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug Debug variables 195 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 196 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 199 DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to 200 @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for 201 @ref SourceDetectionTask_ "SourceDetectionTask" for further information. 203 @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example 204 of using DetectCoaddSourcesTask 206 DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of 207 the task is to update the background, detect all sources in a single band and generate a set of parent 208 footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and, 209 eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data 210 reference to the coadd to be processed. A list of the available optional arguments can be obtained by 211 calling detectCoaddSources.py with the `--help` command line argument: 213 detectCoaddSources.py --help 216 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 217 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed 218 steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows: 220 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I 222 that will process the HSC-I band data. The results are written to 223 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`. 225 It is also necessary to run: 227 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R 229 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band 230 processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask". 232 _DefaultName =
"detectCoaddSources" 233 ConfigClass = DetectCoaddSourcesConfig
234 getSchemaCatalogs = _makeGetSchemaCatalogs(
"det")
235 makeIdFactory = _makeMakeIdFactory(
"CoaddId")
238 def _makeArgumentParser(cls):
239 parser = ArgumentParser(name=cls._DefaultName)
240 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
241 ContainerClass=ExistingCoaddDataIdContainer)
244 def __init__(self, schema=None, **kwargs):
246 @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask. 248 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__): 250 @param[in] schema: initial schema for the output catalog, modified-in place to include all 251 fields set by this task. If None, the source minimal schema will be used. 252 @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__ 258 schema = afwTable.SourceTable.makeMinimalSchema()
259 if self.config.doInsertFakes:
260 self.makeSubtask(
"insertFakes")
262 self.makeSubtask(
"detection", schema=self.schema)
263 if self.config.doScaleVariance:
264 self.makeSubtask(
"scaleVariance")
268 def runDataRef(self, patchRef):
270 @brief Run detection on a coadd. 272 Invokes @ref run and then uses @ref write to output the 275 @param[in] patchRef: data reference for patch 277 if self.config.hasFakes:
278 exposure = patchRef.get(
"fakes_" + self.config.coaddName +
"Coadd", immediate=
True)
280 exposure = patchRef.get(self.config.coaddName +
"Coadd", immediate=
True)
281 expId = int(patchRef.get(self.config.coaddName +
"CoaddId"))
282 results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
283 self.write(results, patchRef)
286 def runQuantum(self, butlerQC, inputRefs, outputRefs):
287 inputs = butlerQC.get(inputRefs)
288 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch_abstract_filter", returnMaxBits=
True)
289 inputs[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
290 inputs[
"expId"] = packedId
291 outputs = self.run(**inputs)
292 butlerQC.put(outputs, outputRefs)
294 def run(self, exposure, idFactory, expId):
296 @brief Run detection on an exposure. 298 First scale the variance plane to match the observed variance 299 using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to 302 @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled, 303 depending on configuration). 304 @param[in] idFactory: IdFactory to set source identifiers 305 @param[in] expId: Exposure identifier (integer) for RNG seed 307 @return a pipe.base.Struct with fields 308 - sources: catalog of detections 309 - backgrounds: list of backgrounds 311 if self.config.doScaleVariance:
312 varScale = self.scaleVariance.
run(exposure.maskedImage)
313 exposure.getMetadata().add(
"variance_scale", varScale)
315 if self.config.doInsertFakes:
316 self.insertFakes.
run(exposure, background=backgrounds)
317 table = afwTable.SourceTable.make(self.schema, idFactory)
318 detections = self.detection.makeSourceCatalog(table, exposure, expId=expId)
319 sources = detections.sources
320 fpSets = detections.fpSets
321 if hasattr(fpSets,
"background")
and fpSets.background:
322 for bg
in fpSets.background:
323 backgrounds.append(bg)
324 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
326 def write(self, results, patchRef):
328 @brief Write out results from runDetection. 330 @param[in] exposure: Exposure to write out 331 @param[in] results: Struct returned from runDetection 332 @param[in] patchRef: data reference for patch 334 coaddName = self.config.coaddName +
"Coadd" 335 patchRef.put(results.outputBackgrounds, coaddName +
"_calexp_background")
336 patchRef.put(results.outputSources, coaddName +
"_det")
337 if self.config.hasFakes:
338 patchRef.put(results.outputExposure,
"fakes_" + coaddName +
"_calexp")
340 patchRef.put(results.outputExposure, coaddName +
"_calexp")
345 class DeblendCoaddSourcesConfig(Config):
346 """DeblendCoaddSourcesConfig 348 Configuration parameters for the `DeblendCoaddSourcesTask`. 350 singleBandDeblend = ConfigurableField(target=SourceDeblendTask,
351 doc=
"Deblend sources separately in each band")
352 multiBandDeblend = ConfigurableField(target=ScarletDeblendTask,
353 doc=
"Deblend sources simultaneously across bands")
354 simultaneous = Field(dtype=bool, default=
False, doc=
"Simultaneously deblend all bands?")
355 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
356 hasFakes = Field(dtype=bool,
358 doc=
"Should be set to True if fake sources have been inserted into the input data.")
361 Config.setDefaults(self)
362 self.singleBandDeblend.propagateAllPeaks =
True 366 """Task runner for the `MergeSourcesTask` 368 Required because the run method requires a list of 369 dataRefs rather than a single dataRef. 372 def getTargetList(parsedCmd, **kwargs):
373 """Provide a list of patch references for each patch, tract, filter combo. 380 Keyword arguments passed to the task 385 List of tuples, where each tuple is a (dataRef, kwargs) pair. 387 refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
388 kwargs[
"psfCache"] = parsedCmd.psfCache
389 return [(
list(p.values()), kwargs)
for t
in refDict.values()
for p
in t.values()]
392 class DeblendCoaddSourcesTask(CmdLineTask):
393 """Deblend the sources in a merged catalog 395 Deblend sources from master catalog in each coadd. 396 This can either be done separately in each band using the HSC-SDSS deblender 397 (`DeblendCoaddSourcesTask.config.simultaneous==False`) 398 or use SCARLET to simultaneously fit the blend in all bands 399 (`DeblendCoaddSourcesTask.config.simultaneous==True`). 400 The task will set its own `self.schema` atribute to the `Schema` of the 401 output deblended catalog. 402 This will include all fields from the input `Schema`, as well as additional fields 405 `pipe.tasks.multiband.DeblendCoaddSourcesTask Description 406 --------------------------------------------------------- 412 Butler used to read the input schemas from disk or 413 construct the reference catalog loader, if `schema` or `peakSchema` or 415 The schema of the merged detection catalog as an input to this task. 417 The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog 419 ConfigClass = DeblendCoaddSourcesConfig
420 RunnerClass = DeblendCoaddSourcesRunner
421 _DefaultName =
"deblendCoaddSources" 422 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
425 def _makeArgumentParser(cls):
426 parser = ArgumentParser(name=cls._DefaultName)
427 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
428 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
429 ContainerClass=ExistingCoaddDataIdContainer)
430 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
433 def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
434 CmdLineTask.__init__(self, **kwargs)
436 assert butler
is not None,
"Neither butler nor schema is defined" 437 schema = butler.get(self.config.coaddName +
"Coadd_mergeDet_schema", immediate=
True).schema
439 self.schemaMapper.addMinimalSchema(schema)
440 self.schema = self.schemaMapper.getOutputSchema()
441 if peakSchema
is None:
442 assert butler
is not None,
"Neither butler nor peakSchema is defined" 443 peakSchema = butler.get(self.config.coaddName +
"Coadd_peak_schema", immediate=
True).schema
445 if self.config.simultaneous:
446 self.makeSubtask(
"multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
448 self.makeSubtask(
"singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
450 def getSchemaCatalogs(self):
451 """Return a dict of empty catalogs for each catalog dataset produced by this task. 456 Dictionary of empty catalogs, with catalog names as keys. 459 return {self.config.coaddName +
"Coadd_deblendedFlux": catalog,
460 self.config.coaddName +
"Coadd_deblendedModel": catalog}
462 def runDataRef(self, patchRefList, psfCache=100):
465 Deblend each source simultaneously or separately 466 (depending on `DeblendCoaddSourcesTask.config.simultaneous`). 467 Set `is-primary` and related flags. 468 Propagate flags from individual visits. 469 Write the deblended sources out. 474 List of data references for each filter 477 if self.config.hasFakes:
478 coaddType =
"fakes_" + self.config.coaddName
480 coaddType = self.config.coaddName
482 if self.config.simultaneous:
486 for patchRef
in patchRefList:
487 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
488 filters.append(patchRef.dataId[
"filter"])
489 exposures.append(exposure)
491 sources = self.readSources(patchRef)
492 exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
493 fluxCatalogs, templateCatalogs = self.multiBandDeblend.
run(exposure, sources)
494 for n
in range(len(patchRefList)):
495 fluxCat = fluxCatalogs
if fluxCatalogs
is None else fluxCatalogs[filters[n]]
496 self.write(patchRefList[n], fluxCat, templateCatalogs[filters[n]])
499 for patchRef
in patchRefList:
500 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
501 exposure.getPsf().setCacheCapacity(psfCache)
502 sources = self.readSources(patchRef)
503 self.singleBandDeblend.
run(exposure, sources)
504 self.write(patchRef, sources)
506 def readSources(self, dataRef):
507 """Read merged catalog 509 Read the catalog of merged detections and create a catalog 514 dataRef: data reference 515 Data reference for catalog of merged detections 519 sources: `SourceCatalog` 520 List of sources in merged catalog 522 We also need to add columns to hold the measurements we're about to make 523 so we can measure in-place. 525 merged = dataRef.get(self.config.coaddName +
"Coadd_mergeDet", immediate=
True)
526 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
527 idFactory = self.makeIdFactory(dataRef)
529 idFactory.notify(s.getId())
530 table = afwTable.SourceTable.make(self.schema, idFactory)
532 sources.extend(merged, self.schemaMapper)
535 def write(self, dataRef, flux_sources, template_sources=None):
536 """Write the source catalog(s) 540 dataRef: Data Reference 541 Reference to the output catalog. 542 flux_sources: `SourceCatalog` 543 Flux conserved sources to write to file. 544 If using the single band deblender, this is the catalog 546 template_sources: `SourceCatalog` 547 Source catalog using the multiband template models 552 if flux_sources
is not None:
553 assert not self.config.simultaneous
or self.config.multiBandDeblend.conserveFlux
554 dataRef.put(flux_sources, self.config.coaddName +
"Coadd_deblendedFlux")
555 self.log.
info(
"Wrote %d sources: %s" % (len(flux_sources), dataRef.dataId))
559 if template_sources
is not None:
560 assert self.config.multiBandDeblend.saveTemplates
561 dataRef.put(template_sources, self.config.coaddName +
"Coadd_deblendedModel")
562 self.log.
info(
"Wrote %d sources: %s" % (len(template_sources), dataRef.dataId))
565 """Write the metadata produced from processing the data. 569 List of Butler data references used to write the metadata. 570 The metadata is written to dataset type `CmdLineTask._getMetadataName`. 572 for dataRef
in dataRefList:
574 metadataName = self._getMetadataName()
575 if metadataName
is not None:
576 dataRef.put(self.getFullMetadata(), metadataName)
577 except Exception
as e:
578 self.log.
warn(
"Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
580 def getExposureId(self, dataRef):
581 """Get the ExposureId from a data reference 583 return int(dataRef.get(self.config.coaddName +
"CoaddId"))
586 class MeasureMergedCoaddSourcesConnections(PipelineTaskConnections, dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap"),
587 defaultTemplates={
"inputCoaddName":
"deep",
588 "outputCoaddName":
"deep"}):
589 inputSchema = cT.InitInput(
590 doc=
"Input schema for measure merged task produced by a deblender or detection task",
591 name=
"{inputCoaddName}Coadd_deblendedFlux_schema",
592 storageClass=
"SourceCatalog" 594 outputSchema = cT.InitOutput(
595 doc=
"Output schema after all new fields are added by task",
596 name=
"{inputCoaddName}Coadd_meas_schema",
597 storageClass=
"SourceCatalog" 599 refCat = cT.PrerequisiteInput(
600 doc=
"Reference catalog used to match measured sources against known sources",
602 storageClass=
"SimpleCatalog",
603 dimensions=(
"skypix",),
608 doc=
"Input coadd image",
609 name=
"{inputCoaddName}Coadd_calexp",
610 storageClass=
"ExposureF",
611 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap")
614 doc=
"SkyMap to use in processing",
615 name=
"{inputCoaddName}Coadd_skyMap",
616 storageClass=
"SkyMap",
617 dimensions=(
"skymap",),
619 visitCatalogs = cT.Input(
620 doc=
"Source catalogs for visits which overlap input tract, patch, abstract_filter. Will be " 621 "further filtered in the task for the purpose of propagating flags from image calibration " 622 "and characterization to codd objects",
624 dimensions=(
"instrument",
"visit",
"detector"),
625 storageClass=
"SourceCatalog",
628 inputCatalog = cT.Input(
629 doc=(
"Name of the input catalog to use." 630 "If the single band deblender was used this should be 'deblendedFlux." 631 "If the multi-band deblender was used this should be 'deblendedModel, " 632 "or deblendedFlux if the multiband deblender was configured to output " 633 "deblended flux catalogs. If no deblending was performed this should " 635 name=
"{inputCoaddName}Coadd_deblendedFlux",
636 storageClass=
"SourceCatalog",
637 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap"),
639 outputSources = cT.Output(
640 doc=
"Source catalog containing all the measurement information generated in this task",
641 name=
"{outputCoaddName}Coadd_meas",
642 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap"),
643 storageClass=
"SourceCatalog",
645 matchResult = cT.Output(
646 doc=
"Match catalog produced by configured matcher, optional on doMatchSources",
647 name=
"{outputCoaddName}Coadd_measMatch",
648 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap"),
649 storageClass=
"Catalog",
651 denormMatches = cT.Output(
652 doc=
"Denormalized Match catalog produced by configured matcher, optional on " 653 "doWriteMatchesDenormalized",
654 name=
"{outputCoaddName}Coadd_measMatchFull",
655 dimensions=(
"tract",
"patch",
"abstract_filter",
"skymap"),
656 storageClass=
"Catalog",
661 if config.doPropagateFlags
is False:
662 self.inputs -=
set((
"visitCatalogs",))
664 if config.doMatchSources
is False:
665 self.outputs -=
set((
"matchResult",))
667 if config.doWriteMatchesDenormalized
is False:
668 self.outputs -=
set((
"denormMatches",))
671 class MeasureMergedCoaddSourcesConfig(PipelineTaskConfig,
672 pipelineConnections=MeasureMergedCoaddSourcesConnections):
674 @anchor MeasureMergedCoaddSourcesConfig_ 676 @brief Configuration parameters for the MeasureMergedCoaddSourcesTask 678 inputCatalog = Field(dtype=str, default=
"deblendedFlux",
679 doc=(
"Name of the input catalog to use." 680 "If the single band deblender was used this should be 'deblendedFlux." 681 "If the multi-band deblender was used this should be 'deblendedModel." 682 "If no deblending was performed this should be 'mergeDet'"))
683 measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc=
"Source measurement")
684 setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc=
"Set flags for primary tract/patch")
685 doPropagateFlags = Field(
686 dtype=bool, default=
True,
687 doc=
"Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)" 689 propagateFlags = ConfigurableField(target=PropagateVisitFlagsTask, doc=
"Propagate visit flags to coadd")
690 doMatchSources = Field(dtype=bool, default=
True, doc=
"Match sources to reference catalog?")
691 match = ConfigurableField(target=DirectMatchTask, doc=
"Matching to reference catalog")
692 doWriteMatchesDenormalized = Field(
695 doc=(
"Write reference matches in denormalized format? " 696 "This format uses more disk space, but is more convenient to read."),
698 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
699 psfCache = Field(dtype=int, default=100, doc=
"Size of psfCache")
700 checkUnitsParseStrict = Field(
701 doc=
"Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
708 doc=
"Apply aperture corrections" 710 applyApCorr = ConfigurableField(
711 target=ApplyApCorrTask,
712 doc=
"Subtask to apply aperture corrections" 714 doRunCatalogCalculation = Field(
717 doc=
'Run catalogCalculation task' 719 catalogCalculation = ConfigurableField(
720 target=CatalogCalculationTask,
721 doc=
"Subtask to run catalogCalculation plugins on catalog" 727 doc=
"Should be set to True if fake sources have been inserted into the input data." 731 def refObjLoader(self):
732 return self.match.refObjLoader
736 self.measurement.plugins.names |= [
'base_InputCount',
'base_Variance']
737 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'CLIPPED',
'SENSOR_EDGE',
739 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'CLIPPED',
'SENSOR_EDGE',
744 refCatGen2 = getattr(self.refObjLoader,
"ref_dataset_name",
None)
745 if refCatGen2
is not None and refCatGen2 != self.connections.refCat:
747 f
"Gen2 ({refCatGen2}) and Gen3 ({self.connections.refCat}) reference catalogs " 748 f
"are different. These options must be kept in sync until Gen2 is retired." 760 class MeasureMergedCoaddSourcesRunner(ButlerInitializedTaskRunner):
761 """Get the psfCache setting into MeasureMergedCoaddSourcesTask""" 763 def getTargetList(parsedCmd, **kwargs):
764 return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
767 class MeasureMergedCoaddSourcesTask(PipelineTask, CmdLineTask):
769 @anchor MeasureMergedCoaddSourcesTask_ 771 @brief Deblend sources from master catalog in each coadd seperately and measure. 773 @section pipe_tasks_multiBand_Contents Contents 775 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose 776 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize 777 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run 778 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config 779 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug 780 - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example 782 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description 784 Command-line task that uses peaks and footprints from a master catalog to perform deblending and 785 measurement in each coadd. 787 Given a master input catalog of sources (peaks and footprints) or deblender outputs 788 (including a HeavyFootprint in each band), measure each source on the 789 coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a 790 consistent set of child sources. 792 The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source 793 properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit 794 flags are propagated to the coadd sources. 796 Optionally, we can match the coadd sources to an external reference catalog. 799 deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog 800 @n deepCoadd_calexp{tract,patch,filter}: ExposureF 802 deepCoadd_meas{tract,patch,filter}: SourceCatalog 806 MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks: 809 <DT> @ref SingleFrameMeasurementTask_ "measurement" 810 <DD> Measure source properties of deblended sources.</DD> 811 <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags" 812 <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are 813 not at the edge of the field and that have either not been deblended or are the children of deblended 815 <DT> @ref PropagateVisitFlagsTask_ "propagateFlags" 816 <DD> Propagate flags set in individual visits to the coadd.</DD> 817 <DT> @ref DirectMatchTask_ "match" 818 <DD> Match input sources to a reference catalog (optional). 821 These subtasks may be retargeted as required. 823 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize Task initialization 825 @copydoc \_\_init\_\_ 827 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run Invoking the Task 831 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config Configuration parameters 833 See @ref MeasureMergedCoaddSourcesConfig_ 835 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug Debug variables 837 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 838 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 841 MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to 842 the various sub-tasks. See the documetation for individual sub-tasks for more information. 844 @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using 845 MeasureMergedCoaddSourcesTask 847 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs. 848 The next stage in the multi-band processing procedure will merge these measurements into a suitable 849 catalog for driving forced photometry. 851 Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds 853 A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the 854 `--help` command line argument: 856 measureCoaddSources.py --help 859 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 860 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 861 step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band 864 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I 866 This will process the HSC-I band data. The results are written in 867 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits 869 It is also necessary to run 871 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R 873 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band 874 procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask". 876 _DefaultName =
"measureCoaddSources" 877 ConfigClass = MeasureMergedCoaddSourcesConfig
878 RunnerClass = MeasureMergedCoaddSourcesRunner
879 getSchemaCatalogs = _makeGetSchemaCatalogs(
"meas")
880 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
883 def _makeArgumentParser(cls):
884 parser = ArgumentParser(name=cls._DefaultName)
885 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
886 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
887 ContainerClass=ExistingCoaddDataIdContainer)
888 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
891 def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
894 @brief Initialize the task. 896 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__): 897 @param[in] schema: the schema of the merged detection catalog used as input to this one 898 @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog 899 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 900 catalog. May be None if the loader can be constructed from the butler argument or all steps 901 requiring a reference catalog are disabled. 902 @param[in] butler: a butler used to read the input schemas from disk or construct the reference 903 catalog loader, if schema or peakSchema or refObjLoader is None 905 The task will set its own self.schema attribute to the schema of the output measurement catalog. 906 This will include all fields from the input schema, as well as additional fields for all the 910 self.deblended = self.config.inputCatalog.startswith(
"deblended")
911 self.inputCatalog =
"Coadd_" + self.config.inputCatalog
912 if initInputs
is not None:
913 schema = initInputs[
'inputSchema'].schema
915 assert butler
is not None,
"Neither butler nor schema is defined" 916 schema = butler.get(self.config.coaddName + self.inputCatalog +
"_schema", immediate=
True).schema
918 self.schemaMapper.addMinimalSchema(schema)
919 self.schema = self.schemaMapper.getOutputSchema()
921 self.makeSubtask(
"measurement", schema=self.schema, algMetadata=self.algMetadata)
922 self.makeSubtask(
"setPrimaryFlags", schema=self.schema)
923 if self.config.doMatchSources:
924 self.makeSubtask(
"match", butler=butler, refObjLoader=refObjLoader)
925 if self.config.doPropagateFlags:
926 self.makeSubtask(
"propagateFlags", schema=self.schema)
927 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
928 if self.config.doApCorr:
929 self.makeSubtask(
"applyApCorr", schema=self.schema)
930 if self.config.doRunCatalogCalculation:
931 self.makeSubtask(
"catalogCalculation", schema=self.schema)
935 def runQuantum(self, butlerQC, inputRefs, outputRefs):
936 inputs = butlerQC.get(inputRefs)
939 inputs.pop(
'refCat'), config=self.config.refObjLoader,
941 self.match.setRefObjLoader(refObjLoader)
945 inputs[
'exposure'].getPsf().setCacheCapacity(self.config.psfCache)
948 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch", returnMaxBits=
True)
949 inputs[
'exposureId'] = packedId
950 idFactory = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
952 table = afwTable.SourceTable.make(self.schema, idFactory)
954 sources.extend(inputs.pop(
'inputCatalog'), self.schemaMapper)
955 table = sources.getTable()
956 table.setMetadata(self.algMetadata)
957 inputs[
'sources'] = sources
959 skyMap = inputs.pop(
'skyMap')
960 tractNumber = inputRefs.inputCatalog.dataId[
'tract']
961 tractInfo = skyMap[tractNumber]
962 patchInfo = tractInfo.getPatchInfo(inputRefs.inputCatalog.dataId[
'patch'])
967 wcs=tractInfo.getWcs(),
968 bbox=patchInfo.getOuterBBox()
970 inputs[
'skyInfo'] = skyInfo
972 if self.config.doPropagateFlags:
974 ccdInputs = inputs[
'exposure'].
getInfo().getCoaddInputs().ccds
975 visitKey = ccdInputs.schema.find(
"visit").key
976 ccdKey = ccdInputs.schema.find(
"ccd").key
977 inputVisitIds =
set()
979 for ccdRecord
in ccdInputs:
980 visit = ccdRecord.get(visitKey)
981 ccd = ccdRecord.get(ccdKey)
982 inputVisitIds.add((visit, ccd))
983 ccdRecordsWcs[(visit, ccd)] = ccdRecord.getWcs()
985 inputCatalogsToKeep = []
986 inputCatalogWcsUpdate = []
987 for i, dataRef
in enumerate(inputRefs.visitCatalogs):
988 key = (dataRef.dataId[
'visit'], dataRef.dataId[
'detector'])
989 if key
in inputVisitIds:
990 inputCatalogsToKeep.append(inputs[
'visitCatalogs'][i])
991 inputCatalogWcsUpdate.append(ccdRecordsWcs[key])
992 inputs[
'visitCatalogs'] = inputCatalogsToKeep
993 inputs[
'wcsUpdates'] = inputCatalogWcsUpdate
994 inputs[
'ccdInputs'] = ccdInputs
996 outputs = self.run(**inputs)
997 butlerQC.put(outputs, outputRefs)
999 def runDataRef(self, patchRef, psfCache=100):
1001 @brief Deblend and measure. 1003 @param[in] patchRef: Patch reference. 1005 Set 'is-primary' and related flags. Propagate flags 1006 from individual visits. Optionally match the sources to a reference catalog and write the matches. 1007 Finally, write the deblended sources and measurements out. 1009 if self.config.hasFakes:
1010 coaddType =
"fakes_" + self.config.coaddName
1012 coaddType = self.config.coaddName
1013 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
1014 exposure.getPsf().setCacheCapacity(psfCache)
1015 sources = self.readSources(patchRef)
1016 table = sources.getTable()
1017 table.setMetadata(self.algMetadata)
1018 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
1020 if self.config.doPropagateFlags:
1021 ccdInputs = self.propagateFlags.getCcdInputs(exposure)
1025 results = self.run(exposure=exposure, sources=sources,
1026 ccdInputs=ccdInputs,
1027 skyInfo=skyInfo, butler=patchRef.getButler(),
1028 exposureId=self.getExposureId(patchRef))
1030 if self.config.doMatchSources:
1031 self.writeMatches(patchRef, results)
1032 self.write(patchRef, results.outputSources)
1034 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, visitCatalogs=None, wcsUpdates=None,
1036 """Run measurement algorithms on the input exposure, and optionally populate the 1037 resulting catalog with extra information. 1041 exposure : `lsst.afw.exposure.Exposure` 1042 The input exposure on which measurements are to be performed 1043 sources : `lsst.afw.table.SourceCatalog` 1044 A catalog built from the results of merged detections, or 1046 skyInfo : `lsst.pipe.base.Struct` 1047 A struct containing information about the position of the input exposure within 1048 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box 1049 exposureId : `int` or `bytes` 1050 packed unique number or bytes unique to the input exposure 1051 ccdInputs : `lsst.afw.table.ExposureCatalog` 1052 Catalog containing information on the individual visits which went into making 1054 visitCatalogs : list of `lsst.afw.table.SourceCatalogs` or `None` 1055 A list of source catalogs corresponding to measurements made on the individual 1056 visits which went into the input exposure. If None and butler is `None` then 1057 the task cannot propagate visit flags to the output catalog. 1058 wcsUpdates : list of `lsst.afw.geom.SkyWcs` or `None` 1059 If visitCatalogs is not `None` this should be a list of wcs objects which correspond 1060 to the input visits. Used to put all coordinates to common system. If `None` and 1061 butler is `None` then the task cannot propagate visit flags to the output catalog. 1062 butler : `lsst.daf.butler.Butler` or `lsst.daf.persistence.Butler` 1063 Either a gen2 or gen3 butler used to load visit catalogs 1067 results : `lsst.pipe.base.Struct` 1068 Results of running measurement task. Will contain the catalog in the 1069 sources attribute. Optionally will have results of matching to a 1070 reference catalog in the matchResults attribute, and denormalized 1071 matches in the denormMatches attribute. 1073 self.measurement.
run(sources, exposure, exposureId=exposureId)
1075 if self.config.doApCorr:
1076 self.applyApCorr.
run(
1078 apCorrMap=exposure.getInfo().getApCorrMap()
1085 if not sources.isContiguous():
1086 sources = sources.copy(deep=
True)
1088 if self.config.doRunCatalogCalculation:
1089 self.catalogCalculation.
run(sources)
1091 self.setPrimaryFlags.
run(sources, skyInfo.skyMap, skyInfo.tractInfo, skyInfo.patchInfo,
1092 includeDeblend=self.deblended)
1093 if self.config.doPropagateFlags:
1094 self.propagateFlags.
run(butler, sources, ccdInputs, exposure.getWcs(), visitCatalogs, wcsUpdates)
1098 if self.config.doMatchSources:
1099 matchResult = self.match.
run(sources, exposure.getInfo().getFilter().getName())
1101 matches.table.setMetadata(matchResult.matchMeta)
1102 results.matchResult = matches
1103 if self.config.doWriteMatchesDenormalized:
1104 if matchResult.matches:
1107 self.log.
warn(
"No matches, so generating dummy denormalized matches file")
1110 denormMatches.getMetadata().add(
"COMMENT",
1111 "This catalog is empty because no matches were found.")
1112 results.denormMatches = denormMatches
1113 results.denormMatches = denormMatches
1115 results.outputSources = sources
1118 def readSources(self, dataRef):
1120 @brief Read input sources. 1122 @param[in] dataRef: Data reference for catalog of merged detections 1123 @return List of sources in merged catalog 1125 We also need to add columns to hold the measurements we're about to make 1126 so we can measure in-place. 1128 merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=
True)
1129 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
1130 idFactory = self.makeIdFactory(dataRef)
1132 idFactory.notify(s.getId())
1133 table = afwTable.SourceTable.make(self.schema, idFactory)
1135 sources.extend(merged, self.schemaMapper)
1138 def writeMatches(self, dataRef, results):
1140 @brief Write matches of the sources to the astrometric reference catalog. 1142 @param[in] dataRef: data reference 1143 @param[in] results: results struct from run method 1145 if hasattr(results,
"matchResult"):
1146 dataRef.put(results.matchResult, self.config.coaddName +
"Coadd_measMatch")
1147 if hasattr(results,
"denormMatches"):
1148 dataRef.put(results.denormMatches, self.config.coaddName +
"Coadd_measMatchFull")
1150 def write(self, dataRef, sources):
1152 @brief Write the source catalog. 1154 @param[in] dataRef: data reference 1155 @param[in] sources: source catalog 1157 dataRef.put(sources, self.config.coaddName +
"Coadd_meas")
1158 self.log.
info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
1160 def getExposureId(self, dataRef):
1161 return int(dataRef.get(self.config.coaddName +
"CoaddId"))
1162
def write(self, patchRef, catalog)
Write the output.
Defines the fields and offsets for a table.
Class for storing ordered metadata with comments.
A mapping between the keys of two Schemas, used to copy data between them.
def denormalizeMatches(matches, matchMeta=None)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
template BaseCatalog packMatches(SourceMatchVector const &)
def __init__(self, minimum, dataRange, Q)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded...
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
daf::base::PropertyList * list