24 from lsst.pipe.base import (CmdLineTask, Struct, ArgumentParser, ButlerInitializedTaskRunner,
25 PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
29 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
31 from lsst.meas.extensions.scarlet
import ScarletDeblendTask
43 from .mergeDetections
import MergeDetectionsConfig, MergeDetectionsTask
44 from .mergeMeasurements
import MergeMeasurementsConfig, MergeMeasurementsTask
45 from .multiBandUtils
import MergeSourcesRunner, CullPeaksConfig, _makeGetSchemaCatalogs
46 from .multiBandUtils
import getInputSchema, getShortFilterName, readCatalog, _makeMakeIdFactory
47 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleConfig
48 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesSingleTask
49 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiConfig
50 from .deblendCoaddSourcesPipeline
import DeblendCoaddSourcesMultiTask
55 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
56 * deepCoadd_mergeDet: merged detections (tract, patch)
57 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
58 * deepCoadd_ref: reference sources (tract, patch)
59 All of these have associated *_schema catalogs that require no data ID and hold no records.
61 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
62 the mergeDet, meas, and ref dataset Footprints:
63 * deepCoadd_peak_schema
69 dimensions=(
"tract",
"patch",
"band",
"skymap"),
70 defaultTemplates={
"inputCoaddName":
"deep",
"outputCoaddName":
"deep"}):
71 detectionSchema = cT.InitOutput(
72 doc=
"Schema of the detection catalog",
73 name=
"{outputCoaddName}Coadd_det_schema",
74 storageClass=
"SourceCatalog",
77 doc=
"Exposure on which detections are to be performed",
78 name=
"{inputCoaddName}Coadd",
79 storageClass=
"ExposureF",
80 dimensions=(
"tract",
"patch",
"band",
"skymap")
82 outputBackgrounds = cT.Output(
83 doc=
"Output Backgrounds used in detection",
84 name=
"{outputCoaddName}Coadd_calexp_background",
85 storageClass=
"Background",
86 dimensions=(
"tract",
"patch",
"band",
"skymap")
88 outputSources = cT.Output(
89 doc=
"Detected sources catalog",
90 name=
"{outputCoaddName}Coadd_det",
91 storageClass=
"SourceCatalog",
92 dimensions=(
"tract",
"patch",
"band",
"skymap")
94 outputExposure = cT.Output(
95 doc=
"Exposure post detection",
96 name=
"{outputCoaddName}Coadd_calexp",
97 storageClass=
"ExposureF",
98 dimensions=(
"tract",
"patch",
"band",
"skymap")
102 class DetectCoaddSourcesConfig(
PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
104 @anchor DetectCoaddSourcesConfig_
106 @brief Configuration parameters for the DetectCoaddSourcesTask
108 doScaleVariance =
Field(dtype=bool, default=
True, doc=
"Scale variance plane using empirical noise?")
109 scaleVariance =
ConfigurableField(target=ScaleVarianceTask, doc=
"Variance rescaling")
110 detection =
ConfigurableField(target=DynamicDetectionTask, doc=
"Source detection")
111 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
112 doInsertFakes =
Field(dtype=bool, default=
False,
113 doc=
"Run fake sources injection task")
115 doc=
"Injection of fake sources for testing "
116 "purposes (must be retargeted)")
120 doc=
"Should be set to True if fake sources have been inserted into the input data."
125 self.detection.thresholdType =
"pixel_stdev"
126 self.detection.isotropicGrow =
True
128 self.detection.reEstimateBackground =
False
129 self.detection.background.useApprox =
False
130 self.detection.background.binSize = 4096
131 self.detection.background.undersampleStyle =
'REDUCE_INTERP_ORDER'
132 self.detection.doTempWideBackground =
True
144 @anchor DetectCoaddSourcesTask_
146 @brief Detect sources on a coadd
148 @section pipe_tasks_multiBand_Contents Contents
150 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose
151 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize
152 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run
153 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config
154 - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug
155 - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example
157 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose Description
159 Command-line task that detects sources on a coadd of exposures obtained with a single filter.
161 Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise
162 properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane
163 in the coadd to match the observed variance. This is an approximate approach -- strictly, we should
164 propagate the full covariance matrix -- but it is simple and works well in practice.
166 After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref
167 SourceDetectionTask_ "detection" subtask.
170 deepCoadd{tract,patch,filter}: ExposureF
172 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
173 @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input
175 @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList
179 DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask.
180 You can retarget this subtask if you wish.
182 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize Task initialization
184 @copydoc \_\_init\_\_
186 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run Invoking the Task
190 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config Configuration parameters
192 See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig"
194 @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug Debug variables
196 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
197 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
200 DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to
201 @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for
202 @ref SourceDetectionTask_ "SourceDetectionTask" for further information.
204 @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example
205 of using DetectCoaddSourcesTask
207 DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of
208 the task is to update the background, detect all sources in a single band and generate a set of parent
209 footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and,
210 eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data
211 reference to the coadd to be processed. A list of the available optional arguments can be obtained by
212 calling detectCoaddSources.py with the `--help` command line argument:
214 detectCoaddSources.py --help
217 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
218 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed
219 steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows:
221 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
223 that will process the HSC-I band data. The results are written to
224 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`.
226 It is also necessary to run:
228 detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
230 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
231 processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask".
233 _DefaultName =
"detectCoaddSources"
234 ConfigClass = DetectCoaddSourcesConfig
235 getSchemaCatalogs = _makeGetSchemaCatalogs(
"det")
236 makeIdFactory = _makeMakeIdFactory(
"CoaddId")
239 def _makeArgumentParser(cls):
241 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
242 ContainerClass=ExistingCoaddDataIdContainer)
245 def __init__(self, schema=None, **kwargs):
247 @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask.
249 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
251 @param[in] schema: initial schema for the output catalog, modified-in place to include all
252 fields set by this task. If None, the source minimal schema will be used.
253 @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__
257 super().__init__(**kwargs)
259 schema = afwTable.SourceTable.makeMinimalSchema()
260 if self.config.doInsertFakes:
261 self.makeSubtask(
"insertFakes")
263 self.makeSubtask(
"detection", schema=self.schema)
264 if self.config.doScaleVariance:
265 self.makeSubtask(
"scaleVariance")
269 def runDataRef(self, patchRef):
271 @brief Run detection on a coadd.
273 Invokes @ref run and then uses @ref write to output the
276 @param[in] patchRef: data reference for patch
278 if self.config.hasFakes:
279 exposure = patchRef.get(
"fakes_" + self.config.coaddName +
"Coadd", immediate=
True)
281 exposure = patchRef.get(self.config.coaddName +
"Coadd", immediate=
True)
282 expId = int(patchRef.get(self.config.coaddName +
"CoaddId"))
283 results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
284 self.write(results, patchRef)
287 def runQuantum(self, butlerQC, inputRefs, outputRefs):
288 inputs = butlerQC.get(inputRefs)
289 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch_band", returnMaxBits=
True)
290 inputs[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
291 inputs[
"expId"] = packedId
292 outputs = self.run(**inputs)
293 butlerQC.put(outputs, outputRefs)
295 def run(self, exposure, idFactory, expId):
297 @brief Run detection on an exposure.
299 First scale the variance plane to match the observed variance
300 using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to
303 @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled,
304 depending on configuration).
305 @param[in] idFactory: IdFactory to set source identifiers
306 @param[in] expId: Exposure identifier (integer) for RNG seed
308 @return a pipe.base.Struct with fields
309 - sources: catalog of detections
310 - backgrounds: list of backgrounds
312 if self.config.doScaleVariance:
313 varScale = self.scaleVariance.
run(exposure.maskedImage)
314 exposure.getMetadata().add(
"VARIANCE_SCALE", varScale)
316 if self.config.doInsertFakes:
317 self.insertFakes.
run(exposure, background=backgrounds)
318 table = afwTable.SourceTable.make(self.schema, idFactory)
319 detections = self.detection.
run(table, exposure, expId=expId)
320 sources = detections.sources
321 fpSets = detections.fpSets
322 if hasattr(fpSets,
"background")
and fpSets.background:
323 for bg
in fpSets.background:
324 backgrounds.append(bg)
325 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
327 def write(self, results, patchRef):
329 @brief Write out results from runDetection.
331 @param[in] exposure: Exposure to write out
332 @param[in] results: Struct returned from runDetection
333 @param[in] patchRef: data reference for patch
335 coaddName = self.config.coaddName +
"Coadd"
336 patchRef.put(results.outputBackgrounds, coaddName +
"_calexp_background")
337 patchRef.put(results.outputSources, coaddName +
"_det")
338 if self.config.hasFakes:
339 patchRef.put(results.outputExposure,
"fakes_" + coaddName +
"_calexp")
341 patchRef.put(results.outputExposure, coaddName +
"_calexp")
346 class DeblendCoaddSourcesConfig(
Config):
347 """DeblendCoaddSourcesConfig
349 Configuration parameters for the `DeblendCoaddSourcesTask`.
352 doc=
"Deblend sources separately in each band")
354 doc=
"Deblend sources simultaneously across bands")
355 simultaneous =
Field(dtype=bool, default=
False, doc=
"Simultaneously deblend all bands?")
356 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
357 hasFakes =
Field(dtype=bool,
359 doc=
"Should be set to True if fake sources have been inserted into the input data.")
362 Config.setDefaults(self)
363 self.singleBandDeblend.propagateAllPeaks =
True
367 """Task runner for the `MergeSourcesTask`
369 Required because the run method requires a list of
370 dataRefs rather than a single dataRef.
373 def getTargetList(parsedCmd, **kwargs):
374 """Provide a list of patch references for each patch, tract, filter combo.
381 Keyword arguments passed to the task
386 List of tuples, where each tuple is a (dataRef, kwargs) pair.
388 refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
389 kwargs[
"psfCache"] = parsedCmd.psfCache
390 return [(
list(p.values()), kwargs)
for t
in refDict.values()
for p
in t.values()]
394 """Deblend the sources in a merged catalog
396 Deblend sources from master catalog in each coadd.
397 This can either be done separately in each band using the HSC-SDSS deblender
398 (`DeblendCoaddSourcesTask.config.simultaneous==False`)
399 or use SCARLET to simultaneously fit the blend in all bands
400 (`DeblendCoaddSourcesTask.config.simultaneous==True`).
401 The task will set its own `self.schema` atribute to the `Schema` of the
402 output deblended catalog.
403 This will include all fields from the input `Schema`, as well as additional fields
406 `pipe.tasks.multiband.DeblendCoaddSourcesTask Description
407 ---------------------------------------------------------
413 Butler used to read the input schemas from disk or
414 construct the reference catalog loader, if `schema` or `peakSchema` or
416 The schema of the merged detection catalog as an input to this task.
418 The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog
420 ConfigClass = DeblendCoaddSourcesConfig
421 RunnerClass = DeblendCoaddSourcesRunner
422 _DefaultName =
"deblendCoaddSources"
423 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
426 def _makeArgumentParser(cls):
428 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
429 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
430 ContainerClass=ExistingCoaddDataIdContainer)
431 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
434 def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
435 CmdLineTask.__init__(self, **kwargs)
437 assert butler
is not None,
"Neither butler nor schema is defined"
438 schema = butler.get(self.config.coaddName +
"Coadd_mergeDet_schema", immediate=
True).schema
440 self.schemaMapper.addMinimalSchema(schema)
441 self.schema = self.schemaMapper.getOutputSchema()
442 if peakSchema
is None:
443 assert butler
is not None,
"Neither butler nor peakSchema is defined"
444 peakSchema = butler.get(self.config.coaddName +
"Coadd_peak_schema", immediate=
True).schema
446 if self.config.simultaneous:
447 self.makeSubtask(
"multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
449 self.makeSubtask(
"singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
451 def getSchemaCatalogs(self):
452 """Return a dict of empty catalogs for each catalog dataset produced by this task.
457 Dictionary of empty catalogs, with catalog names as keys.
460 return {self.config.coaddName +
"Coadd_deblendedFlux": catalog,
461 self.config.coaddName +
"Coadd_deblendedModel": catalog}
463 def runDataRef(self, patchRefList, psfCache=100):
466 Deblend each source simultaneously or separately
467 (depending on `DeblendCoaddSourcesTask.config.simultaneous`).
468 Set `is-primary` and related flags.
469 Propagate flags from individual visits.
470 Write the deblended sources out.
475 List of data references for each filter
478 if self.config.hasFakes:
479 coaddType =
"fakes_" + self.config.coaddName
481 coaddType = self.config.coaddName
483 if self.config.simultaneous:
487 for patchRef
in patchRefList:
488 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
489 filters.append(patchRef.dataId[
"filter"])
490 exposures.append(exposure)
492 sources = self.readSources(patchRef)
493 exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
494 fluxCatalogs, templateCatalogs = self.multiBandDeblend.
run(exposure, sources)
495 for n
in range(len(patchRefList)):
496 fluxCat = fluxCatalogs
if fluxCatalogs
is None else fluxCatalogs[filters[n]]
497 self.write(patchRefList[n], fluxCat, templateCatalogs[filters[n]])
500 for patchRef
in patchRefList:
501 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
502 exposure.getPsf().setCacheCapacity(psfCache)
503 sources = self.readSources(patchRef)
504 self.singleBandDeblend.
run(exposure, sources)
505 self.write(patchRef, sources)
507 def readSources(self, dataRef):
508 """Read merged catalog
510 Read the catalog of merged detections and create a catalog
515 dataRef: data reference
516 Data reference for catalog of merged detections
520 sources: `SourceCatalog`
521 List of sources in merged catalog
523 We also need to add columns to hold the measurements we're about to make
524 so we can measure in-place.
526 merged = dataRef.get(self.config.coaddName +
"Coadd_mergeDet", immediate=
True)
527 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
528 idFactory = self.makeIdFactory(dataRef)
530 idFactory.notify(s.getId())
531 table = afwTable.SourceTable.make(self.schema, idFactory)
533 sources.extend(merged, self.schemaMapper)
536 def write(self, dataRef, flux_sources, template_sources=None):
537 """Write the source catalog(s)
541 dataRef: Data Reference
542 Reference to the output catalog.
543 flux_sources: `SourceCatalog`
544 Flux conserved sources to write to file.
545 If using the single band deblender, this is the catalog
547 template_sources: `SourceCatalog`
548 Source catalog using the multiband template models
553 if flux_sources
is not None:
554 assert not self.config.simultaneous
or self.config.multiBandDeblend.conserveFlux
555 dataRef.put(flux_sources, self.config.coaddName +
"Coadd_deblendedFlux")
556 self.log.
info(
"Wrote %d sources: %s" % (len(flux_sources), dataRef.dataId))
560 if template_sources
is not None:
561 assert self.config.multiBandDeblend.saveTemplates
562 dataRef.put(template_sources, self.config.coaddName +
"Coadd_deblendedModel")
563 self.log.
info(
"Wrote %d sources: %s" % (len(template_sources), dataRef.dataId))
566 """Write the metadata produced from processing the data.
570 List of Butler data references used to write the metadata.
571 The metadata is written to dataset type `CmdLineTask._getMetadataName`.
573 for dataRef
in dataRefList:
575 metadataName = self._getMetadataName()
576 if metadataName
is not None:
577 dataRef.put(self.getFullMetadata(), metadataName)
578 except Exception
as e:
579 self.log.
warn(
"Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
581 def getExposureId(self, dataRef):
582 """Get the ExposureId from a data reference
584 return int(dataRef.get(self.config.coaddName +
"CoaddId"))
587 class MeasureMergedCoaddSourcesConnections(
PipelineTaskConnections, dimensions=(
"tract",
"patch",
"band",
"skymap"),
588 defaultTemplates={
"inputCoaddName":
"deep",
589 "outputCoaddName":
"deep"}):
590 inputSchema = cT.InitInput(
591 doc=
"Input schema for measure merged task produced by a deblender or detection task",
592 name=
"{inputCoaddName}Coadd_deblendedFlux_schema",
593 storageClass=
"SourceCatalog"
595 outputSchema = cT.InitOutput(
596 doc=
"Output schema after all new fields are added by task",
597 name=
"{inputCoaddName}Coadd_meas_schema",
598 storageClass=
"SourceCatalog"
600 refCat = cT.PrerequisiteInput(
601 doc=
"Reference catalog used to match measured sources against known sources",
603 storageClass=
"SimpleCatalog",
604 dimensions=(
"skypix",),
609 doc=
"Input coadd image",
610 name=
"{inputCoaddName}Coadd_calexp",
611 storageClass=
"ExposureF",
612 dimensions=(
"tract",
"patch",
"band",
"skymap")
615 doc=
"SkyMap to use in processing",
616 name=
"{inputCoaddName}Coadd_skyMap",
617 storageClass=
"SkyMap",
618 dimensions=(
"skymap",),
620 visitCatalogs = cT.Input(
621 doc=
"Source catalogs for visits which overlap input tract, patch, band. Will be "
622 "further filtered in the task for the purpose of propagating flags from image calibration "
623 "and characterization to codd objects",
625 dimensions=(
"instrument",
"visit",
"detector"),
626 storageClass=
"SourceCatalog",
629 inputCatalog = cT.Input(
630 doc=(
"Name of the input catalog to use."
631 "If the single band deblender was used this should be 'deblendedFlux."
632 "If the multi-band deblender was used this should be 'deblendedModel, "
633 "or deblendedFlux if the multiband deblender was configured to output "
634 "deblended flux catalogs. If no deblending was performed this should "
636 name=
"{inputCoaddName}Coadd_deblendedFlux",
637 storageClass=
"SourceCatalog",
638 dimensions=(
"tract",
"patch",
"band",
"skymap"),
640 outputSources = cT.Output(
641 doc=
"Source catalog containing all the measurement information generated in this task",
642 name=
"{outputCoaddName}Coadd_meas",
643 dimensions=(
"tract",
"patch",
"band",
"skymap"),
644 storageClass=
"SourceCatalog",
646 matchResult = cT.Output(
647 doc=
"Match catalog produced by configured matcher, optional on doMatchSources",
648 name=
"{outputCoaddName}Coadd_measMatch",
649 dimensions=(
"tract",
"patch",
"band",
"skymap"),
650 storageClass=
"Catalog",
652 denormMatches = cT.Output(
653 doc=
"Denormalized Match catalog produced by configured matcher, optional on "
654 "doWriteMatchesDenormalized",
655 name=
"{outputCoaddName}Coadd_measMatchFull",
656 dimensions=(
"tract",
"patch",
"band",
"skymap"),
657 storageClass=
"Catalog",
660 def __init__(self, *, config=None):
661 super().__init__(config=config)
662 if config.doPropagateFlags
is False:
663 self.inputs -=
set((
"visitCatalogs",))
665 if config.doMatchSources
is False:
666 self.outputs -=
set((
"matchResult",))
668 if config.doWriteMatchesDenormalized
is False:
669 self.outputs -=
set((
"denormMatches",))
673 pipelineConnections=MeasureMergedCoaddSourcesConnections):
675 @anchor MeasureMergedCoaddSourcesConfig_
677 @brief Configuration parameters for the MeasureMergedCoaddSourcesTask
679 inputCatalog =
Field(dtype=str, default=
"deblendedFlux",
680 doc=(
"Name of the input catalog to use."
681 "If the single band deblender was used this should be 'deblendedFlux."
682 "If the multi-band deblender was used this should be 'deblendedModel."
683 "If no deblending was performed this should be 'mergeDet'"))
684 measurement =
ConfigurableField(target=SingleFrameMeasurementTask, doc=
"Source measurement")
685 setPrimaryFlags =
ConfigurableField(target=SetPrimaryFlagsTask, doc=
"Set flags for primary tract/patch")
686 doPropagateFlags =
Field(
687 dtype=bool, default=
True,
688 doc=
"Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
690 propagateFlags =
ConfigurableField(target=PropagateVisitFlagsTask, doc=
"Propagate visit flags to coadd")
691 doMatchSources =
Field(dtype=bool, default=
True, doc=
"Match sources to reference catalog?")
692 match =
ConfigurableField(target=DirectMatchTask, doc=
"Matching to reference catalog")
693 doWriteMatchesDenormalized =
Field(
696 doc=(
"Write reference matches in denormalized format? "
697 "This format uses more disk space, but is more convenient to read."),
699 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
700 psfCache =
Field(dtype=int, default=100, doc=
"Size of psfCache")
701 checkUnitsParseStrict =
Field(
702 doc=
"Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
709 doc=
"Apply aperture corrections"
712 target=ApplyApCorrTask,
713 doc=
"Subtask to apply aperture corrections"
715 doRunCatalogCalculation =
Field(
718 doc=
'Run catalogCalculation task'
721 target=CatalogCalculationTask,
722 doc=
"Subtask to run catalogCalculation plugins on catalog"
728 doc=
"Should be set to True if fake sources have been inserted into the input data."
732 def refObjLoader(self):
733 return self.match.refObjLoader
737 self.measurement.plugins.names |= [
'base_InputCount',
739 'base_LocalPhotoCalib',
741 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'CLIPPED',
'SENSOR_EDGE',
743 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'CLIPPED',
'SENSOR_EDGE',
748 refCatGen2 = getattr(self.refObjLoader,
"ref_dataset_name",
None)
749 if refCatGen2
is not None and refCatGen2 != self.connections.refCat:
751 f
"Gen2 ({refCatGen2}) and Gen3 ({self.connections.refCat}) reference catalogs "
752 f
"are different. These options must be kept in sync until Gen2 is retired."
765 """Get the psfCache setting into MeasureMergedCoaddSourcesTask"""
767 def getTargetList(parsedCmd, **kwargs):
768 return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
773 @anchor MeasureMergedCoaddSourcesTask_
775 @brief Deblend sources from master catalog in each coadd seperately and measure.
777 @section pipe_tasks_multiBand_Contents Contents
779 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose
780 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize
781 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run
782 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config
783 - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug
784 - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example
786 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description
788 Command-line task that uses peaks and footprints from a master catalog to perform deblending and
789 measurement in each coadd.
791 Given a master input catalog of sources (peaks and footprints) or deblender outputs
792 (including a HeavyFootprint in each band), measure each source on the
793 coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a
794 consistent set of child sources.
796 The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source
797 properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit
798 flags are propagated to the coadd sources.
800 Optionally, we can match the coadd sources to an external reference catalog.
803 deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog
804 @n deepCoadd_calexp{tract,patch,filter}: ExposureF
806 deepCoadd_meas{tract,patch,filter}: SourceCatalog
810 MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks:
813 <DT> @ref SingleFrameMeasurementTask_ "measurement"
814 <DD> Measure source properties of deblended sources.</DD>
815 <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags"
816 <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are
817 not at the edge of the field and that have either not been deblended or are the children of deblended
819 <DT> @ref PropagateVisitFlagsTask_ "propagateFlags"
820 <DD> Propagate flags set in individual visits to the coadd.</DD>
821 <DT> @ref DirectMatchTask_ "match"
822 <DD> Match input sources to a reference catalog (optional).
825 These subtasks may be retargeted as required.
827 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize Task initialization
829 @copydoc \_\_init\_\_
831 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run Invoking the Task
835 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config Configuration parameters
837 See @ref MeasureMergedCoaddSourcesConfig_
839 @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug Debug variables
841 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
842 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
845 MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to
846 the various sub-tasks. See the documetation for individual sub-tasks for more information.
848 @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using
849 MeasureMergedCoaddSourcesTask
851 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs.
852 The next stage in the multi-band processing procedure will merge these measurements into a suitable
853 catalog for driving forced photometry.
855 Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds
857 A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the
858 `--help` command line argument:
860 measureCoaddSources.py --help
863 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
864 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
865 step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band
868 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
870 This will process the HSC-I band data. The results are written in
871 `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits
873 It is also necessary to run
875 measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
877 to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
878 procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask".
880 _DefaultName =
"measureCoaddSources"
881 ConfigClass = MeasureMergedCoaddSourcesConfig
882 RunnerClass = MeasureMergedCoaddSourcesRunner
883 getSchemaCatalogs = _makeGetSchemaCatalogs(
"meas")
884 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
887 def _makeArgumentParser(cls):
889 parser.add_id_argument(
"--id",
"deepCoadd_calexp",
890 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
891 ContainerClass=ExistingCoaddDataIdContainer)
892 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
895 def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
898 @brief Initialize the task.
900 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
901 @param[in] schema: the schema of the merged detection catalog used as input to this one
902 @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
903 @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
904 catalog. May be None if the loader can be constructed from the butler argument or all steps
905 requiring a reference catalog are disabled.
906 @param[in] butler: a butler used to read the input schemas from disk or construct the reference
907 catalog loader, if schema or peakSchema or refObjLoader is None
909 The task will set its own self.schema attribute to the schema of the output measurement catalog.
910 This will include all fields from the input schema, as well as additional fields for all the
913 super().__init__(**kwargs)
914 self.deblended = self.config.inputCatalog.startswith(
"deblended")
915 self.inputCatalog =
"Coadd_" + self.config.inputCatalog
916 if initInputs
is not None:
917 schema = initInputs[
'inputSchema'].schema
919 assert butler
is not None,
"Neither butler nor schema is defined"
920 schema = butler.get(self.config.coaddName + self.inputCatalog +
"_schema", immediate=
True).schema
922 self.schemaMapper.addMinimalSchema(schema)
923 self.schema = self.schemaMapper.getOutputSchema()
925 self.makeSubtask(
"measurement", schema=self.schema, algMetadata=self.algMetadata)
926 self.makeSubtask(
"setPrimaryFlags", schema=self.schema)
927 if self.config.doMatchSources:
928 self.makeSubtask(
"match", butler=butler, refObjLoader=refObjLoader)
929 if self.config.doPropagateFlags:
930 self.makeSubtask(
"propagateFlags", schema=self.schema)
931 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
932 if self.config.doApCorr:
933 self.makeSubtask(
"applyApCorr", schema=self.schema)
934 if self.config.doRunCatalogCalculation:
935 self.makeSubtask(
"catalogCalculation", schema=self.schema)
939 def runQuantum(self, butlerQC, inputRefs, outputRefs):
940 inputs = butlerQC.get(inputRefs)
943 inputs.pop(
'refCat'), config=self.config.refObjLoader,
945 self.match.setRefObjLoader(refObjLoader)
949 inputs[
'exposure'].getPsf().setCacheCapacity(self.config.psfCache)
952 packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch", returnMaxBits=
True)
953 inputs[
'exposureId'] = packedId
954 idFactory = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
956 table = afwTable.SourceTable.make(self.schema, idFactory)
958 sources.extend(inputs.pop(
'inputCatalog'), self.schemaMapper)
959 table = sources.getTable()
960 table.setMetadata(self.algMetadata)
961 inputs[
'sources'] = sources
963 skyMap = inputs.pop(
'skyMap')
964 tractNumber = inputRefs.inputCatalog.dataId[
'tract']
965 tractInfo = skyMap[tractNumber]
966 patchInfo = tractInfo.getPatchInfo(inputRefs.inputCatalog.dataId[
'patch'])
971 wcs=tractInfo.getWcs(),
972 bbox=patchInfo.getOuterBBox()
974 inputs[
'skyInfo'] = skyInfo
976 if self.config.doPropagateFlags:
978 ccdInputs = inputs[
'exposure'].
getInfo().getCoaddInputs().ccds
979 visitKey = ccdInputs.schema.find(
"visit").key
980 ccdKey = ccdInputs.schema.find(
"ccd").key
981 inputVisitIds =
set()
983 for ccdRecord
in ccdInputs:
984 visit = ccdRecord.get(visitKey)
985 ccd = ccdRecord.get(ccdKey)
986 inputVisitIds.add((visit, ccd))
987 ccdRecordsWcs[(visit, ccd)] = ccdRecord.getWcs()
989 inputCatalogsToKeep = []
990 inputCatalogWcsUpdate = []
991 for i, dataRef
in enumerate(inputRefs.visitCatalogs):
992 key = (dataRef.dataId[
'visit'], dataRef.dataId[
'detector'])
993 if key
in inputVisitIds:
994 inputCatalogsToKeep.append(inputs[
'visitCatalogs'][i])
995 inputCatalogWcsUpdate.append(ccdRecordsWcs[key])
996 inputs[
'visitCatalogs'] = inputCatalogsToKeep
997 inputs[
'wcsUpdates'] = inputCatalogWcsUpdate
998 inputs[
'ccdInputs'] = ccdInputs
1000 outputs = self.run(**inputs)
1001 butlerQC.put(outputs, outputRefs)
1003 def runDataRef(self, patchRef, psfCache=100):
1005 @brief Deblend and measure.
1007 @param[in] patchRef: Patch reference.
1009 Set 'is-primary' and related flags. Propagate flags
1010 from individual visits. Optionally match the sources to a reference catalog and write the matches.
1011 Finally, write the deblended sources and measurements out.
1013 if self.config.hasFakes:
1014 coaddType =
"fakes_" + self.config.coaddName
1016 coaddType = self.config.coaddName
1017 exposure = patchRef.get(coaddType +
"Coadd_calexp", immediate=
True)
1018 exposure.getPsf().setCacheCapacity(psfCache)
1019 sources = self.readSources(patchRef)
1020 table = sources.getTable()
1021 table.setMetadata(self.algMetadata)
1022 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
1024 if self.config.doPropagateFlags:
1025 ccdInputs = self.propagateFlags.getCcdInputs(exposure)
1029 results = self.run(exposure=exposure, sources=sources,
1030 ccdInputs=ccdInputs,
1031 skyInfo=skyInfo, butler=patchRef.getButler(),
1032 exposureId=self.getExposureId(patchRef))
1034 if self.config.doMatchSources:
1035 self.writeMatches(patchRef, results)
1036 self.write(patchRef, results.outputSources)
1038 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, visitCatalogs=None, wcsUpdates=None,
1040 """Run measurement algorithms on the input exposure, and optionally populate the
1041 resulting catalog with extra information.
1045 exposure : `lsst.afw.exposure.Exposure`
1046 The input exposure on which measurements are to be performed
1047 sources : `lsst.afw.table.SourceCatalog`
1048 A catalog built from the results of merged detections, or
1050 skyInfo : `lsst.pipe.base.Struct`
1051 A struct containing information about the position of the input exposure within
1052 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box
1053 exposureId : `int` or `bytes`
1054 packed unique number or bytes unique to the input exposure
1055 ccdInputs : `lsst.afw.table.ExposureCatalog`
1056 Catalog containing information on the individual visits which went into making
1058 visitCatalogs : list of `lsst.afw.table.SourceCatalogs` or `None`
1059 A list of source catalogs corresponding to measurements made on the individual
1060 visits which went into the input exposure. If None and butler is `None` then
1061 the task cannot propagate visit flags to the output catalog.
1062 wcsUpdates : list of `lsst.afw.geom.SkyWcs` or `None`
1063 If visitCatalogs is not `None` this should be a list of wcs objects which correspond
1064 to the input visits. Used to put all coordinates to common system. If `None` and
1065 butler is `None` then the task cannot propagate visit flags to the output catalog.
1066 butler : `lsst.daf.butler.Butler` or `lsst.daf.persistence.Butler`
1067 Either a gen2 or gen3 butler used to load visit catalogs
1071 results : `lsst.pipe.base.Struct`
1072 Results of running measurement task. Will contain the catalog in the
1073 sources attribute. Optionally will have results of matching to a
1074 reference catalog in the matchResults attribute, and denormalized
1075 matches in the denormMatches attribute.
1077 self.measurement.
run(sources, exposure, exposureId=exposureId)
1079 if self.config.doApCorr:
1080 self.applyApCorr.
run(
1082 apCorrMap=exposure.getInfo().getApCorrMap()
1089 if not sources.isContiguous():
1090 sources = sources.copy(deep=
True)
1092 if self.config.doRunCatalogCalculation:
1093 self.catalogCalculation.
run(sources)
1095 self.setPrimaryFlags.
run(sources, skyMap=skyInfo.skyMap, tractInfo=skyInfo.tractInfo,
1096 patchInfo=skyInfo.patchInfo, includeDeblend=self.deblended)
1097 if self.config.doPropagateFlags:
1098 self.propagateFlags.
run(butler, sources, ccdInputs, exposure.getWcs(), visitCatalogs, wcsUpdates)
1102 if self.config.doMatchSources:
1103 matchResult = self.match.
run(sources, exposure.getInfo().getFilter().getName())
1105 matches.table.setMetadata(matchResult.matchMeta)
1106 results.matchResult = matches
1107 if self.config.doWriteMatchesDenormalized:
1108 if matchResult.matches:
1111 self.log.
warn(
"No matches, so generating dummy denormalized matches file")
1114 denormMatches.getMetadata().add(
"COMMENT",
1115 "This catalog is empty because no matches were found.")
1116 results.denormMatches = denormMatches
1117 results.denormMatches = denormMatches
1119 results.outputSources = sources
1122 def readSources(self, dataRef):
1124 @brief Read input sources.
1126 @param[in] dataRef: Data reference for catalog of merged detections
1127 @return List of sources in merged catalog
1129 We also need to add columns to hold the measurements we're about to make
1130 so we can measure in-place.
1132 merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=
True)
1133 self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
1134 idFactory = self.makeIdFactory(dataRef)
1136 idFactory.notify(s.getId())
1137 table = afwTable.SourceTable.make(self.schema, idFactory)
1139 sources.extend(merged, self.schemaMapper)
1142 def writeMatches(self, dataRef, results):
1144 @brief Write matches of the sources to the astrometric reference catalog.
1146 @param[in] dataRef: data reference
1147 @param[in] results: results struct from run method
1149 if hasattr(results,
"matchResult"):
1150 dataRef.put(results.matchResult, self.config.coaddName +
"Coadd_measMatch")
1151 if hasattr(results,
"denormMatches"):
1152 dataRef.put(results.denormMatches, self.config.coaddName +
"Coadd_measMatchFull")
1154 def write(self, dataRef, sources):
1156 @brief Write the source catalog.
1158 @param[in] dataRef: data reference
1159 @param[in] sources: source catalog
1161 dataRef.put(sources, self.config.coaddName +
"Coadd_meas")
1162 self.log.
info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
1164 def getExposureId(self, dataRef):
1165 return int(dataRef.get(self.config.coaddName +
"CoaddId"))