24 from lsst.pipe.base import (CmdLineTask, Struct, ArgumentParser, ButlerInitializedTaskRunner,
25 PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
29 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
31 from lsst.meas.extensions.scarlet
import ScarletDeblendTask
44 from .mergeDetections
import MergeDetectionsConfig, MergeDetectionsTask
45 from .mergeMeasurements
import MergeMeasurementsConfig, MergeMeasurementsTask
46 from .multiBandUtils
import MergeSourcesRunner, CullPeaksConfig, _makeGetSchemaCatalogs
47 from .multiBandUtils
import getInputSchema, getShortFilterName, readCatalog, _makeMakeIdFactory
48 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleConfig
49 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleTask
50 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiConfig
51 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiTask
56 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
57 * deepCoadd_mergeDet: merged detections (tract, patch)
58 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
59 * deepCoadd_ref: reference sources (tract, patch)
60 All of these have associated *_schema catalogs that require no data ID and hold no records.
62 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
63 the mergeDet, meas, and ref dataset Footprints:
64 * deepCoadd_peak_schema
70 dimensions=(
"tract",
"patch",
"band",
"skymap"),
71 defaultTemplates={
"inputCoaddName":
"deep",
"outputCoaddName":
"deep"}):
72 detectionSchema = cT.InitOutput(
73 doc=
"Schema of the detection catalog",
74 name=
"{outputCoaddName}Coadd_det_schema",
75 storageClass=
"SourceCatalog",
78 doc=
"Exposure on which detections are to be performed",
79 name=
"{inputCoaddName}Coadd",
80 storageClass=
"ExposureF",
81 dimensions=(
"tract",
"patch",
"band",
"skymap")
83 outputBackgrounds = cT.Output(
84 doc=
"Output Backgrounds used in detection",
85 name=
"{outputCoaddName}Coadd_calexp_background",
86 storageClass=
"Background",
87 dimensions=(
"tract",
"patch",
"band",
"skymap")
89 outputSources = cT.Output(
90 doc=
"Detected sources catalog",
91 name=
"{outputCoaddName}Coadd_det",
92 storageClass=
"SourceCatalog",
93 dimensions=(
"tract",
"patch",
"band",
"skymap")
95 outputExposure = cT.Output(
96 doc=
"Exposure post detection",
97 name=
"{outputCoaddName}Coadd_calexp",
98 storageClass=
"ExposureF",
99 dimensions=(
"tract",
"patch",
"band",
"skymap")
103 class DetectCoaddSourcesConfig(
PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
105 @anchor DetectCoaddSourcesConfig_
107 @brief Configuration parameters for the DetectCoaddSourcesTask
109 doScaleVariance =
Field(dtype=bool, default=
True, doc=
"Scale variance plane using empirical noise?")
110 scaleVariance =
ConfigurableField(target=ScaleVarianceTask, doc=
"Variance rescaling")
111 detection =
ConfigurableField(target=DynamicDetectionTask, doc=
"Source detection")
112 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
113 doInsertFakes =
Field(dtype=bool, default=
False,
114 doc=
"Run fake sources injection task")
116 doc=
"Injection of fake sources for testing "
117 "purposes (must be retargeted)")
121 doc=
"Should be set to True if fake sources have been inserted into the input data."
126 self.detection.thresholdType =
"pixel_stdev"
127 self.detection.isotropicGrow =
True
129 self.detection.reEstimateBackground =
False
130 self.detection.background.useApprox =
False
131 self.detection.background.binSize = 4096
132 self.detection.background.undersampleStyle =
'REDUCE_INTERP_ORDER'
133 self.detection.doTempWideBackground =
True
145 @anchor DetectCoaddSourcesTask_
147 @brief Detect sources on a coadd
149 @section pipe_tasks_multiBand_Contents Contents
151 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose
152 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize
153 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run
154 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config
155 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug
156 - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example
158 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose Description
160 Command-line task that detects sources on a coadd of exposures obtained with a single filter.
162 Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise
163 properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane
164 in the coadd to match the observed variance. This is an approximate approach -- strictly, we should
165 propagate the full covariance matrix -- but it is simple and works well in practice.
167 After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref
168 SourceDetectionTask_ "detection" subtask.
171 deepCoadd{tract,patch,filter}: ExposureF
173 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
174 @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input
176 @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList
180 DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask.
181 You can retarget this subtask if you wish.
183 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize Task initialization
185 @copydoc \_\_init\_\_
187 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run Invoking the Task
191 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config Configuration parameters
193 See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig"
195 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug Debug variables
197 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
198 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
201 DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to
202 @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for
203 @ref SourceDetectionTask_ "SourceDetectionTask" for further information.
205 @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example
206 of using DetectCoaddSourcesTask
208 DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of
209 the task is to update the background, detect all sources in a single band and generate a set of parent
210 footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and,
211 eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data
212 reference to the coadd to be processed. A list of the available optional arguments can be obtained by
213 calling detectCoaddSources.py with the `--help` command line argument:
215 detectCoaddSources.py --help
218 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
219 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed
220 steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows:
222 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
224 that will process the HSC-I band data. The results are written to
225 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`.
227 It is also necessary to run:
229 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
231 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
232 processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask".
234 _DefaultName =
"detectCoaddSources"
235 ConfigClass = DetectCoaddSourcesConfig
236 getSchemaCatalogs = _makeGetSchemaCatalogs(
"det")
237 makeIdFactory = _makeMakeIdFactory(
"CoaddId")
240 def _makeArgumentParser(cls):
242 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
243 ContainerClass=ExistingCoaddDataIdContainer)
246 def __init__(self, schema=None, **kwargs):
248 @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask.
250 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
252 @param[in] schema: initial schema for the output catalog, modified-in place to include all
253 fields set by this task. If None, the source minimal schema will be used.
254 @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__
258 super().__init__(**kwargs)
260 schema = afwTable.SourceTable.makeMinimalSchema()
261 if self.config.doInsertFakes:
262 self.makeSubtask(
"insertFakes")
264 self.makeSubtask(
"detection", schema=self.schema)
265 if self.config.doScaleVariance:
266 self.makeSubtask(
"scaleVariance")
270 def runDataRef(self, patchRef):
272 @brief Run detection on a coadd.
274 Invokes @ref run and then uses @ref write to output the
277 @param[in] patchRef: data reference for patch
279 if self.config.hasFakes:
280 exposure = patchRef.get(
"fakes_" + self.config.coaddName +
"Coadd", immediate=
True)
282 exposure = patchRef.get(self.config.coaddName +
"Coadd", immediate=
True)
283 expId = int(patchRef.get(self.config.coaddName +
"CoaddId"))
284 results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
285 self.write(results, patchRef)
288 def runQuantum(self, butlerQC, inputRefs, outputRefs):
289 inputs = butlerQC.get(inputRefs)
290 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch_band", returnMaxBits=
True)
291 inputs[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
292 inputs[
"expId"] = packedId
293 outputs = self.run(**inputs)
294 butlerQC.put(outputs, outputRefs)
296 def run(self, exposure, idFactory, expId):
298 @brief Run detection on an exposure.
300 First scale the variance plane to match the observed variance
301 using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to
304 @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled,
305 depending on configuration).
306 @param[in] idFactory: IdFactory to set source identifiers
307 @param[in] expId: Exposure identifier (integer) for RNG seed
309 @return a pipe.base.Struct with fields
310 - sources: catalog of detections
311 - backgrounds: list of backgrounds
313 if self.config.doScaleVariance:
314 varScale = self.scaleVariance.
run(exposure.maskedImage)
315 exposure.getMetadata().add(
"VARIANCE_SCALE", varScale)
317 if self.config.doInsertFakes:
318 self.insertFakes.
run(exposure, background=backgrounds)
319 table = afwTable.SourceTable.make(self.schema, idFactory)
320 detections = self.detection.
run(table, exposure, expId=expId)
321 sources = detections.sources
322 fpSets = detections.fpSets
323 if hasattr(fpSets,
"background")
and fpSets.background:
324 for bg
in fpSets.background:
325 backgrounds.append(bg)
326 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
328 def write(self, results, patchRef):
330 @brief Write out results from runDetection.
332 @param[in] exposure: Exposure to write out
333 @param[in] results: Struct returned from runDetection
334 @param[in] patchRef: data reference for patch
336 coaddName = self.config.coaddName +
"Coadd"
337 patchRef.put(results.outputBackgrounds, coaddName +
"_calexp_background")
338 patchRef.put(results.outputSources, coaddName +
"_det")
339 if self.config.hasFakes:
340 patchRef.put(results.outputExposure,
"fakes_" + coaddName +
"_calexp")
342 patchRef.put(results.outputExposure, coaddName +
"_calexp")
347 class DeblendCoaddSourcesConfig(
Config):
348 """DeblendCoaddSourcesConfig
350 Configuration parameters for the `DeblendCoaddSourcesTask`.
353 doc=
"Deblend sources separately in each band")
355 doc=
"Deblend sources simultaneously across bands")
356 simultaneous =
Field(dtype=bool, default=
False, doc=
"Simultaneously deblend all bands?")
357 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
358 hasFakes =
Field(dtype=bool,
360 doc=
"Should be set to True if fake sources have been inserted into the input data.")
363 Config.setDefaults(self)
364 self.singleBandDeblend.propagateAllPeaks =
True
368 """Task runner for the `MergeSourcesTask`
370 Required because the run method requires a list of
371 dataRefs rather than a single dataRef.
374 def getTargetList(parsedCmd, **kwargs):
375 """Provide a list of patch references for each patch, tract, filter combo.
382 Keyword arguments passed to the task
387 List of tuples, where each tuple is a (dataRef, kwargs) pair.
389 refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
390 kwargs[
"psfCache"] = parsedCmd.psfCache
391 return [(
list(p.values()), kwargs)
for t
in refDict.values()
for p
in t.values()]
395 """Deblend the sources in a merged catalog
397 Deblend sources from master catalog in each coadd.
398 This can either be done separately in each band using the HSC-SDSS deblender
399 (`DeblendCoaddSourcesTask.config.simultaneous==False`)
400 or use SCARLET to simultaneously fit the blend in all bands
401 (`DeblendCoaddSourcesTask.config.simultaneous==True`).
402 The task will set its own `self.schema` atribute to the `Schema` of the
403 output deblended catalog.
404 This will include all fields from the input `Schema`, as well as additional fields
407 `pipe.tasks.multiband.DeblendCoaddSourcesTask Description
408 ---------------------------------------------------------
414 Butler used to read the input schemas from disk or
415 construct the reference catalog loader, if `schema` or `peakSchema` or
417 The schema of the merged detection catalog as an input to this task.
419 The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog
421 ConfigClass = DeblendCoaddSourcesConfig
422 RunnerClass = DeblendCoaddSourcesRunner
423 _DefaultName =
"deblendCoaddSources"
424 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
427 def _makeArgumentParser(cls):
429 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
430 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
431 ContainerClass=ExistingCoaddDataIdContainer)
432 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
435 def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
436 CmdLineTask.__init__(self, **kwargs)
438 assert butler
is not None,
"Neither butler nor schema is defined"
439 schema = butler.get(self.config.coaddName +
"Coadd_mergeDet_schema", immediate=
True).schema
441 self.schemaMapper.addMinimalSchema(schema)
442 self.schema = self.schemaMapper.getOutputSchema()
443 if peakSchema
is None:
444 assert butler
is not None,
"Neither butler nor peakSchema is defined"
445 peakSchema = butler.get(self.config.coaddName +
"Coadd_peak_schema", immediate=
True).schema
447 if self.config.simultaneous:
448 self.makeSubtask(
"multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
450 self.makeSubtask(
"singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
452 def getSchemaCatalogs(self):
453 """Return a dict of empty catalogs for each catalog dataset produced by this task.
458 Dictionary of empty catalogs, with catalog names as keys.
461 return {self.config.coaddName +
"Coadd_deblendedFlux": catalog,
462 self.config.coaddName +
"Coadd_deblendedModel": catalog}
464 def runDataRef(self, patchRefList, psfCache=100):
467 Deblend each source simultaneously or separately
468 (depending on `DeblendCoaddSourcesTask.config.simultaneous`).
469 Set `is-primary` and related flags.
470 Propagate flags from individual visits.
471 Write the deblended sources out.
476 List of data references for each filter
479 if self.config.hasFakes:
480 coaddType =
"fakes_" + self.config.coaddName
482 coaddType = self.config.coaddName
484 if self.config.simultaneous:
488 for patchRef
in patchRefList:
489 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
490 filters.append(patchRef.dataId[
"filter"])
491 exposures.append(exposure)
493 sources = self.readSources(patchRef)
494 exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
495 templateCatalogs = self.multiBandDeblend.
run(exposure, sources)
496 for n
in range(len(patchRefList)):
497 self.write(patchRefList[n], templateCatalogs[filters[n]])
500 for patchRef
in patchRefList:
501 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
502 exposure.getPsf().setCacheCapacity(psfCache)
503 sources = self.readSources(patchRef)
504 self.singleBandDeblend.
run(exposure, sources)
505 self.write(patchRef, sources)
507 def readSources(self, dataRef):
508 """Read merged catalog
510 Read the catalog of merged detections and create a catalog
515 dataRef: data reference
516 Data reference for catalog of merged detections
520 sources: `SourceCatalog`
521 List of sources in merged catalog
523 We also need to add columns to hold the measurements we're about to make
524 so we can measure in-place.
526 merged = dataRef.get(self.config.coaddName +
"Coadd_mergeDet", immediate=
True)
527 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
528 idFactory = self.makeIdFactory(dataRef)
530 idFactory.notify(s.getId())
531 table = afwTable.SourceTable.make(self.schema, idFactory)
533 sources.extend(merged, self.schemaMapper)
536 def write(self, dataRef, sources):
537 """Write the source catalog(s)
541 dataRef: Data Reference
542 Reference to the output catalog.
543 sources: `SourceCatalog`
544 Flux conserved sources to write to file.
545 If using the single band deblender, this is the catalog
547 template_sources: `SourceCatalog`
548 Source catalog using the multiband template models
551 dataRef.put(sources, self.config.coaddName +
"Coadd_deblendedFlux")
552 self.log.
info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
555 """Write the metadata produced from processing the data.
559 List of Butler data references used to write the metadata.
560 The metadata is written to dataset type `CmdLineTask._getMetadataName`.
562 for dataRef
in dataRefList:
564 metadataName = self._getMetadataName()
565 if metadataName
is not None:
566 dataRef.put(self.getFullMetadata(), metadataName)
567 except Exception
as e:
568 self.log.
warn(
"Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
570 def getExposureId(self, dataRef):
571 """Get the ExposureId from a data reference
573 return int(dataRef.get(self.config.coaddName +
"CoaddId"))
576 class MeasureMergedCoaddSourcesConnections(
PipelineTaskConnections, dimensions=(
"tract",
"patch",
"band",
"skymap"),
577 defaultTemplates={
"inputCoaddName":
"deep",
578 "outputCoaddName":
"deep"}):
579 inputSchema = cT.InitInput(
580 doc=
"Input schema for measure merged task produced by a deblender or detection task",
581 name=
"{inputCoaddName}Coadd_deblendedFlux_schema",
582 storageClass=
"SourceCatalog"
584 outputSchema = cT.InitOutput(
585 doc=
"Output schema after all new fields are added by task",
586 name=
"{inputCoaddName}Coadd_meas_schema",
587 storageClass=
"SourceCatalog"
589 refCat = cT.PrerequisiteInput(
590 doc=
"Reference catalog used to match measured sources against known sources",
592 storageClass=
"SimpleCatalog",
593 dimensions=(
"skypix",),
598 doc=
"Input coadd image",
599 name=
"{inputCoaddName}Coadd_calexp",
600 storageClass=
"ExposureF",
601 dimensions=(
"tract",
"patch",
"band",
"skymap")
604 doc=
"SkyMap to use in processing",
605 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
606 storageClass=
"SkyMap",
607 dimensions=(
"skymap",),
609 visitCatalogs = cT.Input(
610 doc=
"Source catalogs for visits which overlap input tract, patch, band. Will be "
611 "further filtered in the task for the purpose of propagating flags from image calibration "
612 "and characterization to codd objects",
614 dimensions=(
"instrument",
"visit",
"detector"),
615 storageClass=
"SourceCatalog",
618 inputCatalog = cT.Input(
619 doc=(
"Name of the input catalog to use."
620 "If the single band deblender was used this should be 'deblendedFlux."
621 "If the multi-band deblender was used this should be 'deblendedModel, "
622 "or deblendedFlux if the multiband deblender was configured to output "
623 "deblended flux catalogs. If no deblending was performed this should "
625 name=
"{inputCoaddName}Coadd_deblendedFlux",
626 storageClass=
"SourceCatalog",
627 dimensions=(
"tract",
"patch",
"band",
"skymap"),
629 outputSources = cT.Output(
630 doc=
"Source catalog containing all the measurement information generated in this task",
631 name=
"{outputCoaddName}Coadd_meas",
632 dimensions=(
"tract",
"patch",
"band",
"skymap"),
633 storageClass=
"SourceCatalog",
635 matchResult = cT.Output(
636 doc=
"Match catalog produced by configured matcher, optional on doMatchSources",
637 name=
"{outputCoaddName}Coadd_measMatch",
638 dimensions=(
"tract",
"patch",
"band",
"skymap"),
639 storageClass=
"Catalog",
641 denormMatches = cT.Output(
642 doc=
"Denormalized Match catalog produced by configured matcher, optional on "
643 "doWriteMatchesDenormalized",
644 name=
"{outputCoaddName}Coadd_measMatchFull",
645 dimensions=(
"tract",
"patch",
"band",
"skymap"),
646 storageClass=
"Catalog",
649 def __init__(self, *, config=None):
650 super().__init__(config=config)
651 if config.doPropagateFlags
is False:
652 self.inputs -=
set((
"visitCatalogs",))
654 if config.doMatchSources
is False:
655 self.outputs -=
set((
"matchResult",))
657 if config.doWriteMatchesDenormalized
is False:
658 self.outputs -=
set((
"denormMatches",))
662 pipelineConnections=MeasureMergedCoaddSourcesConnections):
664 @anchor MeasureMergedCoaddSourcesConfig_
666 @brief Configuration parameters for the MeasureMergedCoaddSourcesTask
668 inputCatalog =
Field(dtype=str, default=
"deblendedFlux",
669 doc=(
"Name of the input catalog to use."
670 "If the single band deblender was used this should be 'deblendedFlux."
671 "If the multi-band deblender was used this should be 'deblendedModel."
672 "If no deblending was performed this should be 'mergeDet'"))
673 measurement =
ConfigurableField(target=SingleFrameMeasurementTask, doc=
"Source measurement")
674 setPrimaryFlags =
ConfigurableField(target=SetPrimaryFlagsTask, doc=
"Set flags for primary tract/patch")
675 doPropagateFlags =
Field(
676 dtype=bool, default=
True,
677 doc=
"Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
679 propagateFlags =
ConfigurableField(target=PropagateVisitFlagsTask, doc=
"Propagate visit flags to coadd")
680 doMatchSources =
Field(dtype=bool, default=
True, doc=
"Match sources to reference catalog?")
681 match =
ConfigurableField(target=DirectMatchTask, doc=
"Matching to reference catalog")
682 doWriteMatchesDenormalized =
Field(
685 doc=(
"Write reference matches in denormalized format? "
686 "This format uses more disk space, but is more convenient to read."),
688 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
689 psfCache =
Field(dtype=int, default=100, doc=
"Size of psfCache")
690 checkUnitsParseStrict =
Field(
691 doc=
"Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
698 doc=
"Apply aperture corrections"
701 target=ApplyApCorrTask,
702 doc=
"Subtask to apply aperture corrections"
704 doRunCatalogCalculation =
Field(
707 doc=
'Run catalogCalculation task'
710 target=CatalogCalculationTask,
711 doc=
"Subtask to run catalogCalculation plugins on catalog"
717 doc=
"Should be set to True if fake sources have been inserted into the input data."
721 def refObjLoader(self):
722 return self.match.refObjLoader
726 self.measurement.plugins.names |= [
'base_InputCount',
728 'base_LocalPhotoCalib',
730 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'CLIPPED',
'SENSOR_EDGE',
732 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'CLIPPED',
'SENSOR_EDGE',
737 refCatGen2 = getattr(self.refObjLoader,
"ref_dataset_name",
None)
738 if refCatGen2
is not None and refCatGen2 != self.connections.refCat:
740 f
"Gen2 ({refCatGen2}) and Gen3 ({self.connections.refCat}) reference catalogs "
741 f
"are different. These options must be kept in sync until Gen2 is retired."
754 """Get the psfCache setting into MeasureMergedCoaddSourcesTask"""
756 def getTargetList(parsedCmd, **kwargs):
757 return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
762 @anchor MeasureMergedCoaddSourcesTask_
764 @brief Deblend sources from master catalog in each coadd seperately and measure.
766 @section pipe_tasks_multiBand_Contents Contents
768 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose
769 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize
770 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run
771 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config
772 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug
773 - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example
775 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description
777 Command-line task that uses peaks and footprints from a master catalog to perform deblending and
778 measurement in each coadd.
780 Given a master input catalog of sources (peaks and footprints) or deblender outputs
781 (including a HeavyFootprint in each band), measure each source on the
782 coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a
783 consistent set of child sources.
785 The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source
786 properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit
787 flags are propagated to the coadd sources.
789 Optionally, we can match the coadd sources to an external reference catalog.
792 deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog
793 @n deepCoadd_calexp{tract,patch,filter}: ExposureF
795 deepCoadd_meas{tract,patch,filter}: SourceCatalog
799 MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks:
802 <DT> @ref SingleFrameMeasurementTask_ "measurement"
803 <DD> Measure source properties of deblended sources.</DD>
804 <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags"
805 <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are
806 not at the edge of the field and that have either not been deblended or are the children of deblended
808 <DT> @ref PropagateVisitFlagsTask_ "propagateFlags"
809 <DD> Propagate flags set in individual visits to the coadd.</DD>
810 <DT> @ref DirectMatchTask_ "match"
811 <DD> Match input sources to a reference catalog (optional).
814 These subtasks may be retargeted as required.
816 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize Task initialization
818 @copydoc \_\_init\_\_
820 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run Invoking the Task
824 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config Configuration parameters
826 See @ref MeasureMergedCoaddSourcesConfig_
828 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug Debug variables
830 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
831 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
834 MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to
835 the various sub-tasks. See the documetation for individual sub-tasks for more information.
837 @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using
838 MeasureMergedCoaddSourcesTask
840 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs.
841 The next stage in the multi-band processing procedure will merge these measurements into a suitable
842 catalog for driving forced photometry.
844 Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds
846 A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the
847 `--help` command line argument:
849 measureCoaddSources.py --help
852 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
853 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
854 step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band
857 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
859 This will process the HSC-I band data. The results are written in
860 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits
862 It is also necessary to run
864 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
866 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
867 procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask".
869 _DefaultName =
"measureCoaddSources"
870 ConfigClass = MeasureMergedCoaddSourcesConfig
871 RunnerClass = MeasureMergedCoaddSourcesRunner
872 getSchemaCatalogs = _makeGetSchemaCatalogs(
"meas")
873 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
876 def _makeArgumentParser(cls):
878 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
879 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
880 ContainerClass=ExistingCoaddDataIdContainer)
881 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
884 def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
887 @brief Initialize the task.
889 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
890 @param[in] schema: the schema of the merged detection catalog used as input to this one
891 @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
892 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
893 catalog. May be None if the loader can be constructed from the butler argument or all steps
894 requiring a reference catalog are disabled.
895 @param[in] butler: a butler used to read the input schemas from disk or construct the reference
896 catalog loader, if schema or peakSchema or refObjLoader is None
898 The task will set its own self.schema attribute to the schema of the output measurement catalog.
899 This will include all fields from the input schema, as well as additional fields for all the
902 super().__init__(**kwargs)
903 self.deblended = self.config.inputCatalog.startswith(
"deblended")
904 self.inputCatalog =
"Coadd_" + self.config.inputCatalog
905 if initInputs
is not None:
906 schema = initInputs[
'inputSchema'].schema
908 assert butler
is not None,
"Neither butler nor schema is defined"
909 schema = butler.get(self.config.coaddName + self.inputCatalog +
"_schema", immediate=
True).schema
911 self.schemaMapper.addMinimalSchema(schema)
912 self.schema = self.schemaMapper.getOutputSchema()
914 self.makeSubtask(
"measurement", schema=self.schema, algMetadata=self.algMetadata)
915 self.makeSubtask(
"setPrimaryFlags", schema=self.schema)
916 if self.config.doMatchSources:
917 self.makeSubtask(
"match", butler=butler, refObjLoader=refObjLoader)
918 if self.config.doPropagateFlags:
919 self.makeSubtask(
"propagateFlags", schema=self.schema)
920 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
921 if self.config.doApCorr:
922 self.makeSubtask(
"applyApCorr", schema=self.schema)
923 if self.config.doRunCatalogCalculation:
924 self.makeSubtask(
"catalogCalculation", schema=self.schema)
928 def runQuantum(self, butlerQC, inputRefs, outputRefs):
929 inputs = butlerQC.get(inputRefs)
932 inputs.pop(
'refCat'), config=self.config.refObjLoader,
934 self.match.setRefObjLoader(refObjLoader)
938 inputs[
'exposure'].getPsf().setCacheCapacity(self.config.psfCache)
941 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch", returnMaxBits=
True)
942 inputs[
'exposureId'] = packedId
943 idFactory = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
945 table = afwTable.SourceTable.make(self.schema, idFactory)
947 sources.extend(inputs.pop(
'inputCatalog'), self.schemaMapper)
948 table = sources.getTable()
949 table.setMetadata(self.algMetadata)
950 inputs[
'sources'] = sources
952 skyMap = inputs.pop(
'skyMap')
953 tractNumber = inputRefs.inputCatalog.dataId[
'tract']
954 tractInfo = skyMap[tractNumber]
955 patchInfo = tractInfo.getPatchInfo(inputRefs.inputCatalog.dataId[
'patch'])
960 wcs=tractInfo.getWcs(),
961 bbox=patchInfo.getOuterBBox()
963 inputs[
'skyInfo'] = skyInfo
965 if self.config.doPropagateFlags:
967 ccdInputs = inputs[
'exposure'].
getInfo().getCoaddInputs().ccds
968 visitKey = ccdInputs.schema.find(
"visit").key
969 ccdKey = ccdInputs.schema.find(
"ccd").key
970 inputVisitIds =
set()
972 for ccdRecord
in ccdInputs:
973 visit = ccdRecord.get(visitKey)
974 ccd = ccdRecord.get(ccdKey)
975 inputVisitIds.add((visit, ccd))
976 ccdRecordsWcs[(visit, ccd)] = ccdRecord.getWcs()
978 inputCatalogsToKeep = []
979 inputCatalogWcsUpdate = []
980 for i, dataRef
in enumerate(inputRefs.visitCatalogs):
981 key = (dataRef.dataId[
'visit'], dataRef.dataId[
'detector'])
982 if key
in inputVisitIds:
983 inputCatalogsToKeep.append(inputs[
'visitCatalogs'][i])
984 inputCatalogWcsUpdate.append(ccdRecordsWcs[key])
985 inputs[
'visitCatalogs'] = inputCatalogsToKeep
986 inputs[
'wcsUpdates'] = inputCatalogWcsUpdate
987 inputs[
'ccdInputs'] = ccdInputs
989 outputs = self.run(**inputs)
990 butlerQC.put(outputs, outputRefs)
992 def runDataRef(self, patchRef, psfCache=100):
994 @brief Deblend and measure.
996 @param[in] patchRef: Patch reference.
998 Set 'is-primary' and related flags. Propagate flags
999 from individual visits. Optionally match the sources to a reference catalog and write the matches.
1000 Finally, write the deblended sources and measurements out.
1002 if self.config.hasFakes:
1003 coaddType =
"fakes_" + self.config.coaddName
1005 coaddType = self.config.coaddName
1006 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
1007 exposure.getPsf().setCacheCapacity(psfCache)
1008 sources = self.readSources(patchRef)
1009 table = sources.getTable()
1010 table.setMetadata(self.algMetadata)
1011 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
1013 if self.config.doPropagateFlags:
1014 ccdInputs = self.propagateFlags.getCcdInputs(exposure)
1018 results = self.run(exposure=exposure, sources=sources,
1019 ccdInputs=ccdInputs,
1020 skyInfo=skyInfo, butler=patchRef.getButler(),
1021 exposureId=self.getExposureId(patchRef))
1023 if self.config.doMatchSources:
1024 self.writeMatches(patchRef, results)
1025 self.write(patchRef, results.outputSources)
1027 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, visitCatalogs=None, wcsUpdates=None,
1029 """Run measurement algorithms on the input exposure, and optionally populate the
1030 resulting catalog with extra information.
1034 exposure : `lsst.afw.exposure.Exposure`
1035 The input exposure on which measurements are to be performed
1036 sources : `lsst.afw.table.SourceCatalog`
1037 A catalog built from the results of merged detections, or
1039 skyInfo : `lsst.pipe.base.Struct`
1040 A struct containing information about the position of the input exposure within
1041 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box
1042 exposureId : `int` or `bytes`
1043 packed unique number or bytes unique to the input exposure
1044 ccdInputs : `lsst.afw.table.ExposureCatalog`
1045 Catalog containing information on the individual visits which went into making
1047 visitCatalogs : list of `lsst.afw.table.SourceCatalogs` or `None`
1048 A list of source catalogs corresponding to measurements made on the individual
1049 visits which went into the input exposure. If None and butler is `None` then
1050 the task cannot propagate visit flags to the output catalog.
1051 wcsUpdates : list of `lsst.afw.geom.SkyWcs` or `None`
1052 If visitCatalogs is not `None` this should be a list of wcs objects which correspond
1053 to the input visits. Used to put all coordinates to common system. If `None` and
1054 butler is `None` then the task cannot propagate visit flags to the output catalog.
1055 butler : `lsst.daf.butler.Butler` or `lsst.daf.persistence.Butler`
1056 Either a gen2 or gen3 butler used to load visit catalogs
1060 results : `lsst.pipe.base.Struct`
1061 Results of running measurement task. Will contain the catalog in the
1062 sources attribute. Optionally will have results of matching to a
1063 reference catalog in the matchResults attribute, and denormalized
1064 matches in the denormMatches attribute.
1066 self.measurement.
run(sources, exposure, exposureId=exposureId)
1068 if self.config.doApCorr:
1069 self.applyApCorr.
run(
1071 apCorrMap=exposure.getInfo().getApCorrMap()
1078 if not sources.isContiguous():
1079 sources = sources.copy(deep=
True)
1081 if self.config.doRunCatalogCalculation:
1082 self.catalogCalculation.
run(sources)
1084 self.setPrimaryFlags.
run(sources, skyMap=skyInfo.skyMap, tractInfo=skyInfo.tractInfo,
1085 patchInfo=skyInfo.patchInfo, includeDeblend=self.deblended)
1086 if self.config.doPropagateFlags:
1087 self.propagateFlags.
run(butler, sources, ccdInputs, exposure.getWcs(), visitCatalogs, wcsUpdates)
1091 if self.config.doMatchSources:
1092 matchResult = self.match.
run(sources, exposure.getInfo().getFilter().getName())
1094 matches.table.setMetadata(matchResult.matchMeta)
1095 results.matchResult = matches
1096 if self.config.doWriteMatchesDenormalized:
1097 if matchResult.matches:
1100 self.log.
warn(
"No matches, so generating dummy denormalized matches file")
1103 denormMatches.getMetadata().add(
"COMMENT",
1104 "This catalog is empty because no matches were found.")
1105 results.denormMatches = denormMatches
1106 results.denormMatches = denormMatches
1108 results.outputSources = sources
1111 def readSources(self, dataRef):
1113 @brief Read input sources.
1115 @param[in] dataRef: Data reference for catalog of merged detections
1116 @return List of sources in merged catalog
1118 We also need to add columns to hold the measurements we're about to make
1119 so we can measure in-place.
1121 merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=
True)
1122 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
1123 idFactory = self.makeIdFactory(dataRef)
1125 idFactory.notify(s.getId())
1126 table = afwTable.SourceTable.make(self.schema, idFactory)
1128 sources.extend(merged, self.schemaMapper)
1131 def writeMatches(self, dataRef, results):
1133 @brief Write matches of the sources to the astrometric reference catalog.
1135 @param[in] dataRef: data reference
1136 @param[in] results: results struct from run method
1138 if hasattr(results,
"matchResult"):
1139 dataRef.put(results.matchResult, self.config.coaddName +
"Coadd_measMatch")
1140 if hasattr(results,
"denormMatches"):
1141 dataRef.put(results.denormMatches, self.config.coaddName +
"Coadd_measMatchFull")
1143 def write(self, dataRef, sources):
1145 @brief Write the source catalog.
1147 @param[in] dataRef: data reference
1148 @param[in] sources: source catalog
1150 dataRef.put(sources, self.config.coaddName +
"Coadd_meas")
1151 self.log.
info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
1153 def getExposureId(self, dataRef):
1154 return int(dataRef.get(self.config.coaddName +
"CoaddId"))