24 from lsst.pipe.base import (CmdLineTask, Struct, ArgumentParser, ButlerInitializedTaskRunner,
 
   25                             PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
 
   27 from lsst.pex.config 
import Config, Field, ConfigurableField
 
   29 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
 
   31 from lsst.meas.extensions.scarlet.deblend 
import ScarletDeblendTask
 
   43 from .mergeDetections 
import MergeDetectionsConfig, MergeDetectionsTask  
 
   44 from .mergeMeasurements 
import MergeMeasurementsConfig, MergeMeasurementsTask  
 
   45 from .multiBandUtils 
import MergeSourcesRunner, CullPeaksConfig, _makeGetSchemaCatalogs  
 
   46 from .multiBandUtils 
import getInputSchema, getShortFilterName, readCatalog, _makeMakeIdFactory  
 
   47 from .deblendCoaddSourcesPipeline 
import DeblendCoaddSourcesSingleConfig  
 
   48 from .deblendCoaddSourcesPipeline 
import DeblendCoaddSourcesSingleTask  
 
   49 from .deblendCoaddSourcesPipeline 
import DeblendCoaddSourcesMultiConfig  
 
   50 from .deblendCoaddSourcesPipeline 
import DeblendCoaddSourcesMultiTask  
 
   55 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter) 
   56 * deepCoadd_mergeDet: merged detections (tract, patch) 
   57 * deepCoadd_meas: measurements of merged detections (tract, patch, filter) 
   58 * deepCoadd_ref: reference sources (tract, patch) 
   59 All of these have associated *_schema catalogs that require no data ID and hold no records. 
   61 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in 
   62 the mergeDet, meas, and ref dataset Footprints: 
   63 * deepCoadd_peak_schema 
   69                                     dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
   70                                     defaultTemplates={
"inputCoaddName": 
"deep", 
"outputCoaddName": 
"deep"}):
 
   71     detectionSchema = cT.InitOutput(
 
   72         doc=
"Schema of the detection catalog",
 
   73         name=
"{outputCoaddName}Coadd_det_schema",
 
   74         storageClass=
"SourceCatalog",
 
   77         doc=
"Exposure on which detections are to be performed",
 
   78         name=
"{inputCoaddName}Coadd",
 
   79         storageClass=
"ExposureF",
 
   80         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap")
 
   82     outputBackgrounds = cT.Output(
 
   83         doc=
"Output Backgrounds used in detection",
 
   84         name=
"{outputCoaddName}Coadd_calexp_background",
 
   85         storageClass=
"Background",
 
   86         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap")
 
   88     outputSources = cT.Output(
 
   89         doc=
"Detected sources catalog",
 
   90         name=
"{outputCoaddName}Coadd_det",
 
   91         storageClass=
"SourceCatalog",
 
   92         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap")
 
   94     outputExposure = cT.Output(
 
   95         doc=
"Exposure post detection",
 
   96         name=
"{outputCoaddName}Coadd_calexp",
 
   97         storageClass=
"ExposureF",
 
   98         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap")
 
  102 class DetectCoaddSourcesConfig(
PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
 
  104     @anchor DetectCoaddSourcesConfig_ 
  106     @brief Configuration parameters for the DetectCoaddSourcesTask 
  108     doScaleVariance = Field(dtype=bool, default=
True, doc=
"Scale variance plane using empirical noise?")
 
  109     scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc=
"Variance rescaling")
 
  110     detection = ConfigurableField(target=DynamicDetectionTask, doc=
"Source detection")
 
  111     coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  112     doInsertFakes = Field(dtype=bool, default=
False,
 
  113                           doc=
"Run fake sources injection task")
 
  114     insertFakes = ConfigurableField(target=BaseFakeSourcesTask,
 
  115                                     doc=
"Injection of fake sources for testing " 
  116                                     "purposes (must be retargeted)")
 
  120         doc=
"Should be set to True if fake sources have been inserted into the input data." 
  125         self.detection.thresholdType = 
"pixel_stdev" 
  126         self.detection.isotropicGrow = 
True 
  128         self.detection.reEstimateBackground = 
False 
  129         self.detection.background.useApprox = 
False 
  130         self.detection.background.binSize = 4096
 
  131         self.detection.background.undersampleStyle = 
'REDUCE_INTERP_ORDER' 
  132         self.detection.doTempWideBackground = 
True   
  144     @anchor DetectCoaddSourcesTask_ 
  146     @brief Detect sources on a coadd 
  148     @section pipe_tasks_multiBand_Contents Contents 
  150       - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose 
  151       - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize 
  152       - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run 
  153       - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config 
  154       - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug 
  155       - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example 
  157     @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose        Description 
  159     Command-line task that detects sources on a coadd of exposures obtained with a single filter. 
  161     Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise 
  162     properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane 
  163     in the coadd to match the observed variance. This is an approximate approach -- strictly, we should 
  164     propagate the full covariance matrix -- but it is simple and works well in practice. 
  166     After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref 
  167     SourceDetectionTask_ "detection" subtask. 
  170         deepCoadd{tract,patch,filter}: ExposureF 
  172         deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints) 
  173         @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input 
  175         @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList 
  179     DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask. 
  180     You can retarget this subtask if you wish. 
  182     @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize       Task initialization 
  184     @copydoc \_\_init\_\_ 
  186     @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run       Invoking the Task 
  190     @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config       Configuration parameters 
  192     See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig" 
  194     @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug          Debug variables 
  196     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 
  197     flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 
  200     DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to 
  201     @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for 
  202     @ref SourceDetectionTask_ "SourceDetectionTask" for further information. 
  204     @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example 
  205     of using DetectCoaddSourcesTask 
  207     DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of 
  208     the task is to update the background, detect all sources in a single band and generate a set of parent 
  209     footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and, 
  210     eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data 
  211     reference to the coadd to be processed. A list of the available optional arguments can be obtained by 
  212     calling detectCoaddSources.py with the `--help` command line argument: 
  214     detectCoaddSources.py --help 
  217     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  218     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed 
  219     steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows: 
  221     detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I 
  223     that will process the HSC-I band data. The results are written to 
  224     `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`. 
  226     It is also necessary to run: 
  228     detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R 
  230     to generate the sources catalogs for the HSC-R band required by the next step in the multi-band 
  231     processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask". 
  233     _DefaultName = 
"detectCoaddSources" 
  234     ConfigClass = DetectCoaddSourcesConfig
 
  235     getSchemaCatalogs = _makeGetSchemaCatalogs(
"det")
 
  236     makeIdFactory = _makeMakeIdFactory(
"CoaddId")
 
  239     def _makeArgumentParser(cls):
 
  241         parser.add_id_argument(
"--id", 
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
 
  242                                ContainerClass=ExistingCoaddDataIdContainer)
 
  245     def __init__(self, schema=None, **kwargs):
 
  247         @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask. 
  249         Keyword arguments (in addition to those forwarded to CmdLineTask.__init__): 
  251         @param[in] schema:   initial schema for the output catalog, modified-in place to include all 
  252                              fields set by this task.  If None, the source minimal schema will be used. 
  253         @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__ 
  257         super().__init__(**kwargs)
 
  259             schema = afwTable.SourceTable.makeMinimalSchema()
 
  260         if self.config.doInsertFakes:
 
  261             self.makeSubtask(
"insertFakes")
 
  263         self.makeSubtask(
"detection", schema=self.schema)
 
  264         if self.config.doScaleVariance:
 
  265             self.makeSubtask(
"scaleVariance")
 
  269     def runDataRef(self, patchRef):
 
  271         @brief Run detection on a coadd. 
  273         Invokes @ref run and then uses @ref write to output the 
  276         @param[in] patchRef: data reference for patch 
  278         if self.config.hasFakes:
 
  279             exposure = patchRef.get(
"fakes_" + self.config.coaddName + 
"Coadd", immediate=
True)
 
  281             exposure = patchRef.get(self.config.coaddName + 
"Coadd", immediate=
True)
 
  282         expId = int(patchRef.get(self.config.coaddName + 
"CoaddId"))
 
  283         results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
 
  284         self.write(results, patchRef)
 
  287     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  288         inputs = butlerQC.get(inputRefs)
 
  289         packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch_abstract_filter", returnMaxBits=
True)
 
  290         inputs[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
 
  291         inputs[
"expId"] = packedId
 
  292         outputs = self.run(**inputs)
 
  293         butlerQC.put(outputs, outputRefs)
 
  295     def run(self, exposure, idFactory, expId):
 
  297         @brief Run detection on an exposure. 
  299         First scale the variance plane to match the observed variance 
  300         using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to 
  303         @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled, 
  304                                  depending on configuration). 
  305         @param[in] idFactory: IdFactory to set source identifiers 
  306         @param[in] expId: Exposure identifier (integer) for RNG seed 
  308         @return a pipe.base.Struct with fields 
  309         - sources: catalog of detections 
  310         - backgrounds: list of backgrounds 
  312         if self.config.doScaleVariance:
 
  313             varScale = self.scaleVariance.
run(exposure.maskedImage)
 
  314             exposure.getMetadata().add(
"VARIANCE_SCALE", varScale)
 
  316         if self.config.doInsertFakes:
 
  317             self.insertFakes.
run(exposure, background=backgrounds)
 
  318         table = afwTable.SourceTable.make(self.schema, idFactory)
 
  319         detections = self.detection.
run(table, exposure, expId=expId)
 
  320         sources = detections.sources
 
  321         fpSets = detections.fpSets
 
  322         if hasattr(fpSets, 
"background") 
and fpSets.background:
 
  323             for bg 
in fpSets.background:
 
  324                 backgrounds.append(bg)
 
  325         return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
 
  327     def write(self, results, patchRef):
 
  329         @brief Write out results from runDetection. 
  331         @param[in] exposure: Exposure to write out 
  332         @param[in] results: Struct returned from runDetection 
  333         @param[in] patchRef: data reference for patch 
  335         coaddName = self.config.coaddName + 
"Coadd" 
  336         patchRef.put(results.outputBackgrounds, coaddName + 
"_calexp_background")
 
  337         patchRef.put(results.outputSources, coaddName + 
"_det")
 
  338         if self.config.hasFakes:
 
  339             patchRef.put(results.outputExposure, 
"fakes_" + coaddName + 
"_calexp")
 
  341             patchRef.put(results.outputExposure, coaddName + 
"_calexp")
 
  346 class DeblendCoaddSourcesConfig(Config):
 
  347     """DeblendCoaddSourcesConfig 
  349     Configuration parameters for the `DeblendCoaddSourcesTask`. 
  351     singleBandDeblend = ConfigurableField(target=SourceDeblendTask,
 
  352                                           doc=
"Deblend sources separately in each band")
 
  353     multiBandDeblend = ConfigurableField(target=ScarletDeblendTask,
 
  354                                          doc=
"Deblend sources simultaneously across bands")
 
  355     simultaneous = Field(dtype=bool, default=
False, doc=
"Simultaneously deblend all bands?")
 
  356     coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  357     hasFakes = Field(dtype=bool,
 
  359                      doc=
"Should be set to True if fake sources have been inserted into the input data.")
 
  362         Config.setDefaults(self)
 
  363         self.singleBandDeblend.propagateAllPeaks = 
True 
  367     """Task runner for the `MergeSourcesTask` 
  369     Required because the run method requires a list of 
  370     dataRefs rather than a single dataRef. 
  373     def getTargetList(parsedCmd, **kwargs):
 
  374         """Provide a list of patch references for each patch, tract, filter combo. 
  381             Keyword arguments passed to the task 
  386             List of tuples, where each tuple is a (dataRef, kwargs) pair. 
  388         refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
 
  389         kwargs[
"psfCache"] = parsedCmd.psfCache
 
  390         return [(
list(p.values()), kwargs) 
for t 
in refDict.values() 
for p 
in t.values()]
 
  394     """Deblend the sources in a merged catalog 
  396     Deblend sources from master catalog in each coadd. 
  397     This can either be done separately in each band using the HSC-SDSS deblender 
  398     (`DeblendCoaddSourcesTask.config.simultaneous==False`) 
  399     or use SCARLET to simultaneously fit the blend in all bands 
  400     (`DeblendCoaddSourcesTask.config.simultaneous==True`). 
  401     The task will set its own `self.schema` atribute to the `Schema` of the 
  402     output deblended catalog. 
  403     This will include all fields from the input `Schema`, as well as additional fields 
  406     `pipe.tasks.multiband.DeblendCoaddSourcesTask Description 
  407     --------------------------------------------------------- 
  413         Butler used to read the input schemas from disk or 
  414         construct the reference catalog loader, if `schema` or `peakSchema` or 
  416         The schema of the merged detection catalog as an input to this task. 
  418         The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog 
  420     ConfigClass = DeblendCoaddSourcesConfig
 
  421     RunnerClass = DeblendCoaddSourcesRunner
 
  422     _DefaultName = 
"deblendCoaddSources" 
  423     makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
 
  426     def _makeArgumentParser(cls):
 
  428         parser.add_id_argument(
"--id", 
"deepCoadd_calexp",
 
  429                                help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
 
  430                                ContainerClass=ExistingCoaddDataIdContainer)
 
  431         parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
 
  434     def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
 
  435         CmdLineTask.__init__(self, **kwargs)
 
  437             assert butler 
is not None, 
"Neither butler nor schema is defined" 
  438             schema = butler.get(self.config.coaddName + 
"Coadd_mergeDet_schema", immediate=
True).schema
 
  440         self.schemaMapper.addMinimalSchema(schema)
 
  441         self.schema = self.schemaMapper.getOutputSchema()
 
  442         if peakSchema 
is None:
 
  443             assert butler 
is not None, 
"Neither butler nor peakSchema is defined" 
  444             peakSchema = butler.get(self.config.coaddName + 
"Coadd_peak_schema", immediate=
True).schema
 
  446         if self.config.simultaneous:
 
  447             self.makeSubtask(
"multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
 
  449             self.makeSubtask(
"singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
 
  451     def getSchemaCatalogs(self):
 
  452         """Return a dict of empty catalogs for each catalog dataset produced by this task. 
  457             Dictionary of empty catalogs, with catalog names as keys. 
  460         return {self.config.coaddName + 
"Coadd_deblendedFlux": catalog,
 
  461                 self.config.coaddName + 
"Coadd_deblendedModel": catalog}
 
  463     def runDataRef(self, patchRefList, psfCache=100):
 
  466         Deblend each source simultaneously or separately 
  467         (depending on `DeblendCoaddSourcesTask.config.simultaneous`). 
  468         Set `is-primary` and related flags. 
  469         Propagate flags from individual visits. 
  470         Write the deblended sources out. 
  475             List of data references for each filter 
  478         if self.config.hasFakes:
 
  479             coaddType = 
"fakes_" + self.config.coaddName
 
  481             coaddType = self.config.coaddName
 
  483         if self.config.simultaneous:
 
  487             for patchRef 
in patchRefList:
 
  488                 exposure = patchRef.get(coaddType + 
"Coadd_calexp", immediate=
True)
 
  489                 filters.append(patchRef.dataId[
"filter"])
 
  490                 exposures.append(exposure)
 
  492             sources = self.readSources(patchRef)
 
  493             exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
 
  494             fluxCatalogs, templateCatalogs = self.multiBandDeblend.
run(exposure, sources)
 
  495             for n 
in range(len(patchRefList)):
 
  496                 fluxCat = fluxCatalogs 
if fluxCatalogs 
is None else fluxCatalogs[filters[n]]
 
  497                 self.write(patchRefList[n], fluxCat, templateCatalogs[filters[n]])
 
  500             for patchRef 
in patchRefList:
 
  501                 exposure = patchRef.get(coaddType + 
"Coadd_calexp", immediate=
True)
 
  502                 exposure.getPsf().setCacheCapacity(psfCache)
 
  503                 sources = self.readSources(patchRef)
 
  504                 self.singleBandDeblend.
run(exposure, sources)
 
  505                 self.write(patchRef, sources)
 
  507     def readSources(self, dataRef):
 
  508         """Read merged catalog 
  510         Read the catalog of merged detections and create a catalog 
  515         dataRef: data reference 
  516             Data reference for catalog of merged detections 
  520         sources: `SourceCatalog` 
  521             List of sources in merged catalog 
  523         We also need to add columns to hold the measurements we're about to make 
  524         so we can measure in-place. 
  526         merged = dataRef.get(self.config.coaddName + 
"Coadd_mergeDet", immediate=
True)
 
  527         self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
 
  528         idFactory = self.makeIdFactory(dataRef)
 
  530             idFactory.notify(s.getId())
 
  531         table = afwTable.SourceTable.make(self.schema, idFactory)
 
  533         sources.extend(merged, self.schemaMapper)
 
  536     def write(self, dataRef, flux_sources, template_sources=None):
 
  537         """Write the source catalog(s) 
  541         dataRef: Data Reference 
  542             Reference to the output catalog. 
  543         flux_sources: `SourceCatalog` 
  544             Flux conserved sources to write to file. 
  545             If using the single band deblender, this is the catalog 
  547         template_sources: `SourceCatalog` 
  548             Source catalog using the multiband template models 
  553         if flux_sources 
is not None:
 
  554             assert not self.config.simultaneous 
or self.config.multiBandDeblend.conserveFlux
 
  555             dataRef.put(flux_sources, self.config.coaddName + 
"Coadd_deblendedFlux")
 
  556             self.log.
info(
"Wrote %d sources: %s" % (len(flux_sources), dataRef.dataId))
 
  560         if template_sources 
is not None:
 
  561             assert self.config.multiBandDeblend.saveTemplates
 
  562             dataRef.put(template_sources, self.config.coaddName + 
"Coadd_deblendedModel")
 
  563             self.log.
info(
"Wrote %d sources: %s" % (len(template_sources), dataRef.dataId))
 
  566         """Write the metadata produced from processing the data. 
  570             List of Butler data references used to write the metadata. 
  571             The metadata is written to dataset type `CmdLineTask._getMetadataName`. 
  573         for dataRef 
in dataRefList:
 
  575                 metadataName = self._getMetadataName()
 
  576                 if metadataName 
is not None:
 
  577                     dataRef.put(self.getFullMetadata(), metadataName)
 
  578             except Exception 
as e:
 
  579                 self.log.
warn(
"Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
 
  581     def getExposureId(self, dataRef):
 
  582         """Get the ExposureId from a data reference 
  584         return int(dataRef.get(self.config.coaddName + 
"CoaddId"))
 
  587 class MeasureMergedCoaddSourcesConnections(
PipelineTaskConnections,                                           dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
  588                                            defaultTemplates={
"inputCoaddName": 
"deep",
 
  589                                                              "outputCoaddName": 
"deep"}):
 
  590     inputSchema = cT.InitInput(
 
  591         doc=
"Input schema for measure merged task produced by a deblender or detection task",
 
  592         name=
"{inputCoaddName}Coadd_deblendedFlux_schema",
 
  593         storageClass=
"SourceCatalog" 
  595     outputSchema = cT.InitOutput(
 
  596         doc=
"Output schema after all new fields are added by task",
 
  597         name=
"{inputCoaddName}Coadd_meas_schema",
 
  598         storageClass=
"SourceCatalog" 
  600     refCat = cT.PrerequisiteInput(
 
  601         doc=
"Reference catalog used to match measured sources against known sources",
 
  603         storageClass=
"SimpleCatalog",
 
  604         dimensions=(
"skypix",),
 
  609         doc=
"Input coadd image",
 
  610         name=
"{inputCoaddName}Coadd_calexp",
 
  611         storageClass=
"ExposureF",
 
  612         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap")
 
  615         doc=
"SkyMap to use in processing",
 
  616         name=
"{inputCoaddName}Coadd_skyMap",
 
  617         storageClass=
"SkyMap",
 
  618         dimensions=(
"skymap",),
 
  620     visitCatalogs = cT.Input(
 
  621         doc=
"Source catalogs for visits which overlap input tract, patch, abstract_filter. Will be " 
  622             "further filtered in the task for the purpose of propagating flags from image calibration " 
  623             "and characterization to codd objects",
 
  625         dimensions=(
"instrument", 
"visit", 
"detector"),
 
  626         storageClass=
"SourceCatalog",
 
  629     inputCatalog = cT.Input(
 
  630         doc=(
"Name of the input catalog to use." 
  631              "If the single band deblender was used this should be 'deblendedFlux." 
  632              "If the multi-band deblender was used this should be 'deblendedModel, " 
  633              "or deblendedFlux if the multiband deblender was configured to output " 
  634              "deblended flux catalogs. If no deblending was performed this should " 
  636         name=
"{inputCoaddName}Coadd_deblendedFlux",
 
  637         storageClass=
"SourceCatalog",
 
  638         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
  640     outputSources = cT.Output(
 
  641         doc=
"Source catalog containing all the measurement information generated in this task",
 
  642         name=
"{outputCoaddName}Coadd_meas",
 
  643         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
  644         storageClass=
"SourceCatalog",
 
  646     matchResult = cT.Output(
 
  647         doc=
"Match catalog produced by configured matcher, optional on doMatchSources",
 
  648         name=
"{outputCoaddName}Coadd_measMatch",
 
  649         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
  650         storageClass=
"Catalog",
 
  652     denormMatches = cT.Output(
 
  653         doc=
"Denormalized Match catalog produced by configured matcher, optional on " 
  654             "doWriteMatchesDenormalized",
 
  655         name=
"{outputCoaddName}Coadd_measMatchFull",
 
  656         dimensions=(
"tract", 
"patch", 
"abstract_filter", 
"skymap"),
 
  657         storageClass=
"Catalog",
 
  660     def __init__(self, *, config=None):
 
  661         super().__init__(config=config)
 
  662         if config.doPropagateFlags 
is False:
 
  663             self.inputs -= 
set((
"visitCatalogs",))
 
  665         if config.doMatchSources 
is False:
 
  666             self.outputs -= 
set((
"matchResult",))
 
  668         if config.doWriteMatchesDenormalized 
is False:
 
  669             self.outputs -= 
set((
"denormMatches",))
 
  673                                       pipelineConnections=MeasureMergedCoaddSourcesConnections):
 
  675     @anchor MeasureMergedCoaddSourcesConfig_ 
  677     @brief Configuration parameters for the MeasureMergedCoaddSourcesTask 
  679     inputCatalog = Field(dtype=str, default=
"deblendedFlux",
 
  680                          doc=(
"Name of the input catalog to use." 
  681                               "If the single band deblender was used this should be 'deblendedFlux." 
  682                               "If the multi-band deblender was used this should be 'deblendedModel." 
  683                               "If no deblending was performed this should be 'mergeDet'"))
 
  684     measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc=
"Source measurement")
 
  685     setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc=
"Set flags for primary tract/patch")
 
  686     doPropagateFlags = Field(
 
  687         dtype=bool, default=
True,
 
  688         doc=
"Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)" 
  690     propagateFlags = ConfigurableField(target=PropagateVisitFlagsTask, doc=
"Propagate visit flags to coadd")
 
  691     doMatchSources = Field(dtype=bool, default=
True, doc=
"Match sources to reference catalog?")
 
  692     match = ConfigurableField(target=DirectMatchTask, doc=
"Matching to reference catalog")
 
  693     doWriteMatchesDenormalized = Field(
 
  696         doc=(
"Write reference matches in denormalized format? " 
  697              "This format uses more disk space, but is more convenient to read."),
 
  699     coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  700     psfCache = Field(dtype=int, default=100, doc=
"Size of psfCache")
 
  701     checkUnitsParseStrict = Field(
 
  702         doc=
"Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
 
  709         doc=
"Apply aperture corrections" 
  711     applyApCorr = ConfigurableField(
 
  712         target=ApplyApCorrTask,
 
  713         doc=
"Subtask to apply aperture corrections" 
  715     doRunCatalogCalculation = Field(
 
  718         doc=
'Run catalogCalculation task' 
  720     catalogCalculation = ConfigurableField(
 
  721         target=CatalogCalculationTask,
 
  722         doc=
"Subtask to run catalogCalculation plugins on catalog" 
  728         doc=
"Should be set to True if fake sources have been inserted into the input data." 
  732     def refObjLoader(self):
 
  733         return self.match.refObjLoader
 
  737         self.measurement.plugins.names |= [
'base_InputCount',
 
  739                                            'base_LocalPhotoCalib',
 
  741         self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'CLIPPED', 
'SENSOR_EDGE',
 
  743         self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'CLIPPED', 
'SENSOR_EDGE',
 
  748         refCatGen2 = getattr(self.refObjLoader, 
"ref_dataset_name", 
None)
 
  749         if refCatGen2 
is not None and refCatGen2 != self.connections.refCat:
 
  751                 f
"Gen2 ({refCatGen2}) and Gen3 ({self.connections.refCat}) reference catalogs " 
  752                 f
"are different.  These options must be kept in sync until Gen2 is retired." 
  765     """Get the psfCache setting into MeasureMergedCoaddSourcesTask""" 
  767     def getTargetList(parsedCmd, **kwargs):
 
  768         return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
 
  773     @anchor MeasureMergedCoaddSourcesTask_ 
  775     @brief Deblend sources from master catalog in each coadd seperately and measure. 
  777     @section pipe_tasks_multiBand_Contents Contents 
  779       - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose 
  780       - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize 
  781       - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run 
  782       - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config 
  783       - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug 
  784       - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example 
  786     @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description 
  788     Command-line task that uses peaks and footprints from a master catalog to perform deblending and 
  789     measurement in each coadd. 
  791     Given a master input catalog of sources (peaks and footprints) or deblender outputs 
  792     (including a HeavyFootprint in each band), measure each source on the 
  793     coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a 
  794     consistent set of child sources. 
  796     The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source 
  797     properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit 
  798     flags are propagated to the coadd sources. 
  800     Optionally, we can match the coadd sources to an external reference catalog. 
  803         deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog 
  804         @n deepCoadd_calexp{tract,patch,filter}: ExposureF 
  806         deepCoadd_meas{tract,patch,filter}: SourceCatalog 
  810     MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks: 
  813       <DT> @ref SingleFrameMeasurementTask_ "measurement" 
  814       <DD> Measure source properties of deblended sources.</DD> 
  815       <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags" 
  816       <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are 
  817       not at the edge of the field and that have either not been deblended or are the children of deblended 
  819       <DT> @ref PropagateVisitFlagsTask_ "propagateFlags" 
  820       <DD> Propagate flags set in individual visits to the coadd.</DD> 
  821       <DT> @ref DirectMatchTask_ "match" 
  822       <DD> Match input sources to a reference catalog (optional). 
  825     These subtasks may be retargeted as required. 
  827     @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize       Task initialization 
  829     @copydoc \_\_init\_\_ 
  831     @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run       Invoking the Task 
  835     @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config       Configuration parameters 
  837     See @ref MeasureMergedCoaddSourcesConfig_ 
  839     @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug           Debug variables 
  841     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 
  842     flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 
  845     MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to 
  846     the various sub-tasks. See the documetation for individual sub-tasks for more information. 
  848     @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using 
  849     MeasureMergedCoaddSourcesTask 
  851     After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs. 
  852     The next stage in the multi-band processing procedure will merge these measurements into a suitable 
  853     catalog for driving forced photometry. 
  855     Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds 
  857     A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the 
  858     `--help` command line argument: 
  860     measureCoaddSources.py --help 
  863     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  864     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 
  865     step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band 
  868     measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I 
  870     This will process the HSC-I band data. The results are written in 
  871     `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits 
  873     It is also necessary to run 
  875     measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R 
  877     to generate the sources catalogs for the HSC-R band required by the next step in the multi-band 
  878     procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask". 
  880     _DefaultName = 
"measureCoaddSources" 
  881     ConfigClass = MeasureMergedCoaddSourcesConfig
 
  882     RunnerClass = MeasureMergedCoaddSourcesRunner
 
  883     getSchemaCatalogs = _makeGetSchemaCatalogs(
"meas")
 
  884     makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")  
 
  887     def _makeArgumentParser(cls):
 
  889         parser.add_id_argument(
"--id", 
"deepCoadd_calexp",
 
  890                                help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
 
  891                                ContainerClass=ExistingCoaddDataIdContainer)
 
  892         parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
 
  895     def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
 
  898         @brief Initialize the task. 
  900         Keyword arguments (in addition to those forwarded to CmdLineTask.__init__): 
  901         @param[in] schema: the schema of the merged detection catalog used as input to this one 
  902         @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog 
  903         @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 
  904             catalog. May be None if the loader can be constructed from the butler argument or all steps 
  905             requiring a reference catalog are disabled. 
  906         @param[in] butler: a butler used to read the input schemas from disk or construct the reference 
  907             catalog loader, if schema or peakSchema or refObjLoader is None 
  909         The task will set its own self.schema attribute to the schema of the output measurement catalog. 
  910         This will include all fields from the input schema, as well as additional fields for all the 
  913         super().__init__(**kwargs)
 
  914         self.deblended = self.config.inputCatalog.startswith(
"deblended")
 
  915         self.inputCatalog = 
"Coadd_" + self.config.inputCatalog
 
  916         if initInputs 
is not None:
 
  917             schema = initInputs[
'inputSchema'].schema
 
  919             assert butler 
is not None, 
"Neither butler nor schema is defined" 
  920             schema = butler.get(self.config.coaddName + self.inputCatalog + 
"_schema", immediate=
True).schema
 
  922         self.schemaMapper.addMinimalSchema(schema)
 
  923         self.schema = self.schemaMapper.getOutputSchema()
 
  925         self.makeSubtask(
"measurement", schema=self.schema, algMetadata=self.algMetadata)
 
  926         self.makeSubtask(
"setPrimaryFlags", schema=self.schema)
 
  927         if self.config.doMatchSources:
 
  928             self.makeSubtask(
"match", butler=butler, refObjLoader=refObjLoader)
 
  929         if self.config.doPropagateFlags:
 
  930             self.makeSubtask(
"propagateFlags", schema=self.schema)
 
  931         self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
 
  932         if self.config.doApCorr:
 
  933             self.makeSubtask(
"applyApCorr", schema=self.schema)
 
  934         if self.config.doRunCatalogCalculation:
 
  935             self.makeSubtask(
"catalogCalculation", schema=self.schema)
 
  939     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  940         inputs = butlerQC.get(inputRefs)
 
  943                                              inputs.pop(
'refCat'), config=self.config.refObjLoader,
 
  945         self.match.setRefObjLoader(refObjLoader)
 
  949         inputs[
'exposure'].getPsf().setCacheCapacity(self.config.psfCache)
 
  952         packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch", returnMaxBits=
True)
 
  953         inputs[
'exposureId'] = packedId
 
  954         idFactory = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
 
  956         table = afwTable.SourceTable.make(self.schema, idFactory)
 
  958         sources.extend(inputs.pop(
'inputCatalog'), self.schemaMapper)
 
  959         table = sources.getTable()
 
  960         table.setMetadata(self.algMetadata)  
 
  961         inputs[
'sources'] = sources
 
  963         skyMap = inputs.pop(
'skyMap')
 
  964         tractNumber = inputRefs.inputCatalog.dataId[
'tract']
 
  965         tractInfo = skyMap[tractNumber]
 
  966         patchInfo = tractInfo.getPatchInfo(inputRefs.inputCatalog.dataId[
'patch'])
 
  971             wcs=tractInfo.getWcs(),
 
  972             bbox=patchInfo.getOuterBBox()
 
  974         inputs[
'skyInfo'] = skyInfo
 
  976         if self.config.doPropagateFlags:
 
  978             ccdInputs = inputs[
'exposure'].
getInfo().getCoaddInputs().ccds
 
  979             visitKey = ccdInputs.schema.find(
"visit").key
 
  980             ccdKey = ccdInputs.schema.find(
"ccd").key
 
  981             inputVisitIds = 
set()
 
  983             for ccdRecord 
in ccdInputs:
 
  984                 visit = ccdRecord.get(visitKey)
 
  985                 ccd = ccdRecord.get(ccdKey)
 
  986                 inputVisitIds.add((visit, ccd))
 
  987                 ccdRecordsWcs[(visit, ccd)] = ccdRecord.getWcs()
 
  989             inputCatalogsToKeep = []
 
  990             inputCatalogWcsUpdate = []
 
  991             for i, dataRef 
in enumerate(inputRefs.visitCatalogs):
 
  992                 key = (dataRef.dataId[
'visit'], dataRef.dataId[
'detector'])
 
  993                 if key 
in inputVisitIds:
 
  994                     inputCatalogsToKeep.append(inputs[
'visitCatalogs'][i])
 
  995                     inputCatalogWcsUpdate.append(ccdRecordsWcs[key])
 
  996             inputs[
'visitCatalogs'] = inputCatalogsToKeep
 
  997             inputs[
'wcsUpdates'] = inputCatalogWcsUpdate
 
  998             inputs[
'ccdInputs'] = ccdInputs
 
 1000         outputs = self.run(**inputs)
 
 1001         butlerQC.put(outputs, outputRefs)
 
 1003     def runDataRef(self, patchRef, psfCache=100):
 
 1005         @brief Deblend and measure. 
 1007         @param[in] patchRef: Patch reference. 
 1009         Set 'is-primary' and related flags. Propagate flags 
 1010         from individual visits. Optionally match the sources to a reference catalog and write the matches. 
 1011         Finally, write the deblended sources and measurements out. 
 1013         if self.config.hasFakes:
 
 1014             coaddType = 
"fakes_" + self.config.coaddName
 
 1016             coaddType = self.config.coaddName
 
 1017         exposure = patchRef.get(coaddType + 
"Coadd_calexp", immediate=
True)
 
 1018         exposure.getPsf().setCacheCapacity(psfCache)
 
 1019         sources = self.readSources(patchRef)
 
 1020         table = sources.getTable()
 
 1021         table.setMetadata(self.algMetadata)  
 
 1022         skyInfo = 
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
 
 1024         if self.config.doPropagateFlags:
 
 1025             ccdInputs = self.propagateFlags.getCcdInputs(exposure)
 
 1029         results = self.run(exposure=exposure, sources=sources,
 
 1030                            ccdInputs=ccdInputs,
 
 1031                            skyInfo=skyInfo, butler=patchRef.getButler(),
 
 1032                            exposureId=self.getExposureId(patchRef))
 
 1034         if self.config.doMatchSources:
 
 1035             self.writeMatches(patchRef, results)
 
 1036         self.write(patchRef, results.outputSources)
 
 1038     def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, visitCatalogs=None, wcsUpdates=None,
 
 1040         """Run measurement algorithms on the input exposure, and optionally populate the 
 1041         resulting catalog with extra information. 
 1045         exposure : `lsst.afw.exposure.Exposure` 
 1046             The input exposure on which measurements are to be performed 
 1047         sources :  `lsst.afw.table.SourceCatalog` 
 1048             A catalog built from the results of merged detections, or 
 1050         skyInfo : `lsst.pipe.base.Struct` 
 1051             A struct containing information about the position of the input exposure within 
 1052             a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box 
 1053         exposureId : `int` or `bytes` 
 1054             packed unique number or bytes unique to the input exposure 
 1055         ccdInputs : `lsst.afw.table.ExposureCatalog` 
 1056             Catalog containing information on the individual visits which went into making 
 1058         visitCatalogs : list of `lsst.afw.table.SourceCatalogs` or `None` 
 1059             A list of source catalogs corresponding to measurements made on the individual 
 1060             visits which went into the input exposure. If None and butler is `None` then 
 1061             the task cannot propagate visit flags to the output catalog. 
 1062         wcsUpdates : list of `lsst.afw.geom.SkyWcs` or `None` 
 1063             If visitCatalogs is not `None` this should be a list of wcs objects which correspond 
 1064             to the input visits. Used to put all coordinates to common system. If `None` and 
 1065             butler is `None` then the task cannot propagate visit flags to the output catalog. 
 1066         butler : `lsst.daf.butler.Butler` or `lsst.daf.persistence.Butler` 
 1067             Either a gen2 or gen3 butler used to load visit catalogs 
 1071         results : `lsst.pipe.base.Struct` 
 1072             Results of running measurement task. Will contain the catalog in the 
 1073             sources attribute. Optionally will have results of matching to a 
 1074             reference catalog in the matchResults attribute, and denormalized 
 1075             matches in the denormMatches attribute. 
 1077         self.measurement.
run(sources, exposure, exposureId=exposureId)
 
 1079         if self.config.doApCorr:
 
 1080             self.applyApCorr.
run(
 
 1082                 apCorrMap=exposure.getInfo().getApCorrMap()
 
 1089         if not sources.isContiguous():
 
 1090             sources = sources.copy(deep=
True)
 
 1092         if self.config.doRunCatalogCalculation:
 
 1093             self.catalogCalculation.
run(sources)
 
 1095         self.setPrimaryFlags.
run(sources, skyInfo.skyMap, skyInfo.tractInfo, skyInfo.patchInfo,
 
 1096                                  includeDeblend=self.deblended)
 
 1097         if self.config.doPropagateFlags:
 
 1098             self.propagateFlags.
run(butler, sources, ccdInputs, exposure.getWcs(), visitCatalogs, wcsUpdates)
 
 1102         if self.config.doMatchSources:
 
 1103             matchResult = self.match.
run(sources, exposure.getInfo().getFilter().getName())
 
 1105             matches.table.setMetadata(matchResult.matchMeta)
 
 1106             results.matchResult = matches
 
 1107             if self.config.doWriteMatchesDenormalized:
 
 1108                 if matchResult.matches:
 
 1111                     self.log.
warn(
"No matches, so generating dummy denormalized matches file")
 
 1114                     denormMatches.getMetadata().add(
"COMMENT",
 
 1115                                                     "This catalog is empty because no matches were found.")
 
 1116                     results.denormMatches = denormMatches
 
 1117                 results.denormMatches = denormMatches
 
 1119         results.outputSources = sources
 
 1122     def readSources(self, dataRef):
 
 1124         @brief Read input sources. 
 1126         @param[in] dataRef: Data reference for catalog of merged detections 
 1127         @return List of sources in merged catalog 
 1129         We also need to add columns to hold the measurements we're about to make 
 1130         so we can measure in-place. 
 1132         merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=
True)
 
 1133         self.log.
info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
 
 1134         idFactory = self.makeIdFactory(dataRef)
 
 1136             idFactory.notify(s.getId())
 
 1137         table = afwTable.SourceTable.make(self.schema, idFactory)
 
 1139         sources.extend(merged, self.schemaMapper)
 
 1142     def writeMatches(self, dataRef, results):
 
 1144         @brief Write matches of the sources to the astrometric reference catalog. 
 1146         @param[in] dataRef: data reference 
 1147         @param[in] results: results struct from run method 
 1149         if hasattr(results, 
"matchResult"):
 
 1150             dataRef.put(results.matchResult, self.config.coaddName + 
"Coadd_measMatch")
 
 1151         if hasattr(results, 
"denormMatches"):
 
 1152             dataRef.put(results.denormMatches, self.config.coaddName + 
"Coadd_measMatchFull")
 
 1154     def write(self, dataRef, sources):
 
 1156         @brief Write the source catalog. 
 1158         @param[in] dataRef: data reference 
 1159         @param[in] sources: source catalog 
 1161         dataRef.put(sources, self.config.coaddName + 
"Coadd_meas")
 
 1162         self.log.
info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
 
 1164     def getExposureId(self, dataRef):
 
 1165         return int(dataRef.get(self.config.coaddName + 
"CoaddId"))