25 from .multiBandUtils 
import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
 
   26                              getInputSchema, getShortFilterName, readCatalog)
 
   30 import lsst.pex.config 
as pexConfig
 
   33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
 
   38                                    dimensions=(
"skymap", 
"tract", 
"patch"),
 
   39                                    defaultTemplates={
"inputCoaddName": 
"deep",
 
   40                                                      "outputCoaddName": 
"deep"}):
 
   41     inputSchema = cT.InitInput(
 
   42         doc=
"Schema for the output merged measurement catalog.",
 
   43         name=
"{inputCoaddName}Coadd_meas_schema",
 
   44         storageClass=
"SourceCatalog",
 
   46     outputSchema = cT.InitOutput(
 
   47         doc=
"Schema for the output merged measurement catalog.",
 
   48         name=
"{outputCoaddName}Coadd_ref_schema",
 
   49         storageClass=
"SourceCatalog",
 
   52         doc=
"Input catalogs to merge.",
 
   53         name=
"{inputCoaddName}Coadd_meas",
 
   55         storageClass=
"SourceCatalog",
 
   56         dimensions=[
"abstract_filter", 
"skymap", 
"tract", 
"patch"],
 
   58     mergedCatalog = cT.Output(
 
   59         doc=
"Output merged catalog.",
 
   60         name=
"{outputCoaddName}Coadd_ref",
 
   61         storageClass=
"SourceCatalog",
 
   62         dimensions=[
"skymap", 
"tract", 
"patch"],
 
   66 class MergeMeasurementsConfig(
PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
 
   68     @anchor MergeMeasurementsConfig_ 
   70     @brief Configuration parameters for the MergeMeasurementsTask 
   72     pseudoFilterList = pexConfig.ListField(
 
   75         doc=
"Names of filters which may have no associated detection\n" 
   76         "(N.b. should include MergeDetectionsConfig.skyFilterName)" 
   78     snName = pexConfig.Field(
 
   80         default=
"base_PsfFlux",
 
   81         doc=
"Name of flux measurement for calculating the S/N when choosing the reference band." 
   83     minSN = pexConfig.Field(
 
   86         doc=
"If the S/N from the priority band is below this value (and the S/N " 
   87         "is larger than minSNDiff compared to the priority band), use the band with " 
   88         "the largest S/N as the reference band." 
   90     minSNDiff = pexConfig.Field(
 
   93         doc=
"If the difference in S/N between another band and the priority band is larger " 
   94         "than this value (and the S/N in the priority band is less than minSN) " 
   95         "use the band with the largest S/N as the reference band" 
   97     flags = pexConfig.ListField(
 
   99         doc=
"Require that these flags, if available, are not set",
 
  100         default=[
"base_PixelFlags_flag_interpolatedCenter", 
"base_PsfFlux_flag",
 
  101                  "ext_photometryKron_KronFlux_flag", 
"modelfit_CModel_flag", ]
 
  103     priorityList = pexConfig.ListField(
 
  106         doc=
"Priority-ordered list of bands for the merge." 
  108     coaddName = pexConfig.Field(
 
  116         if len(self.priorityList) == 0:
 
  117             raise RuntimeError(
"No priority list provided")
 
  128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
 
  130     @anchor MergeMeasurementsTask_ 
  132     @brief Merge measurements from multiple bands 
  134     @section pipe_tasks_multiBand_Contents Contents 
  136       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose 
  137       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize 
  138       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run 
  139       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config 
  140       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug 
  141       - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example 
  143     @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description 
  145     Command-line task that merges measurements from multiple bands. 
  147     Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter 
  148     bands to construct a unified catalog that is suitable for driving forced photometry. Every source is 
  149     required to have centroid, shape and flux measurements in each band. 
  152         deepCoadd_meas{tract,patch,filter}: SourceCatalog 
  154         deepCoadd_ref{tract,patch}: SourceCatalog 
  158     MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask". 
  160     @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize       Task initialization 
  162     @copydoc \_\_init\_\_ 
  164     @section pipe_tasks_multiBand_MergeMeasurementsTask_Run       Invoking the Task 
  168     @section pipe_tasks_multiBand_MergeMeasurementsTask_Config       Configuration parameters 
  170     See @ref MergeMeasurementsConfig_ 
  172     @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug           Debug variables 
  174     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 
  175     flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 
  178     MergeMeasurementsTask has no debug variables. 
  180     @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example 
  181     of using MergeMeasurementsTask 
  183     MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band. 
  184     The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in 
  185     coadds and individual exposures. 
  186     Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list 
  187     of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help` 
  188     command line argument: 
  190     mergeCoaddMeasurements.py --help 
  193     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  194     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 
  195     step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring 
  198     mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 
  200     This will merge the HSC-I & HSC-R band catalogs. The results are written in 
  201     `$CI_HSC_DIR/DATA/deepCoadd-results/`. 
  203     _DefaultName = 
"mergeCoaddMeasurements" 
  204     ConfigClass = MergeMeasurementsConfig
 
  205     RunnerClass = MergeSourcesRunner
 
  206     inputDataset = 
"meas" 
  207     outputDataset = 
"ref" 
  208     getSchemaCatalogs = _makeGetSchemaCatalogs(
"ref")
 
  211     def _makeArgumentParser(cls):
 
  217     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  218         inputs = butlerQC.get(inputRefs)
 
  219         dataIds = (ref.dataId 
for ref 
in inputRefs.catalogs)
 
  220         catalogDict = {dataId[
'abstract_filter']: cat 
for dataId, cat 
in zip(dataIds,
 
  222         inputs[
'catalogs'] = catalogDict
 
  223         outputs = self.run(**inputs)
 
  224         butlerQC.put(outputs, outputRefs)
 
  226     def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
 
  230         @param[in] schema: the schema of the detection catalogs used as input to this one 
  231         @param[in] butler: a butler used to read the input schema from disk, if schema is None 
  233         The task will set its own self.schema attribute to the schema of the output merged catalog. 
  235         super().__init__(**kwargs)
 
  237         if initInputs 
is not None:
 
  238             inputSchema = initInputs[
'inputSchema'].schema
 
  240             inputSchema = self.getInputSchema(butler=butler, schema=schema)
 
  242         self.schemaMapper.addMinimalSchema(inputSchema, 
True)
 
  243         self.instFluxKey = inputSchema.find(self.config.snName + 
"_instFlux").getKey()
 
  244         self.instFluxErrKey = inputSchema.find(self.config.snName + 
"_instFluxErr").getKey()
 
  245         self.fluxFlagKey = inputSchema.find(self.config.snName + 
"_flag").getKey()
 
  248         for band 
in self.config.priorityList:
 
  250             outputKey = self.schemaMapper.editOutputSchema().addField(
 
  251                 "merge_measurement_%s" % short,
 
  253                 doc=
"Flag field set if the measurements here are from the %s filter" % band
 
  255             peakKey = inputSchema.find(
"merge_peak_%s" % short).key
 
  256             footprintKey = inputSchema.find(
"merge_footprint_%s" % short).key
 
  257             self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
 
  258         self.schema = self.schemaMapper.getOutputSchema()
 
  260         self.pseudoFilterKeys = []
 
  261         for filt 
in self.config.pseudoFilterList:
 
  263                 self.pseudoFilterKeys.
append(self.schema.find(
"merge_peak_%s" % filt).getKey())
 
  264             except Exception 
as e:
 
  265                 self.log.
warn(
"merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
 
  268         for flag 
in self.config.flags:
 
  270                 self.badFlags[flag] = self.schema.find(flag).getKey()
 
  271             except KeyError 
as exc:
 
  272                 self.log.
warn(
"Can't find flag %s in schema: %s" % (flag, exc,))
 
  275     def runDataRef(self, patchRefList):
 
  277         @brief Merge coadd sources from multiple bands. Calls @ref `run`. 
  278         @param[in] patchRefList list of data references for each filter 
  280         catalogs = dict(
readCatalog(self, patchRef) 
for patchRef 
in patchRefList)
 
  281         mergedCatalog = self.run(catalogs).mergedCatalog
 
  282         self.write(patchRefList[0], mergedCatalog)
 
  284     def run(self, catalogs):
 
  286         Merge measurement catalogs to create a single reference catalog for forced photometry 
  288         @param[in] catalogs: the catalogs to be merged 
  290         For parent sources, we choose the first band in config.priorityList for which the 
  291         merge_footprint flag for that band is is True. 
  293         For child sources, the logic is the same, except that we use the merge_peak flags. 
  296         orderedCatalogs = [catalogs[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  297         orderedKeys = [self.flagKeys[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  300         mergedCatalog.reserve(len(orderedCatalogs[0]))
 
  302         idKey = orderedCatalogs[0].table.getIdKey()
 
  303         for catalog 
in orderedCatalogs[1:]:
 
  304             if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
 
  305                 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
 
  309         for orderedRecords 
in zip(*orderedCatalogs):
 
  314             priorityRecord = 
None 
  315             priorityFlagKeys = 
None 
  317             hasPseudoFilter = 
False 
  321             for inputRecord, flagKeys 
in zip(orderedRecords, orderedKeys):
 
  322                 parent = (inputRecord.getParent() == 0 
and inputRecord.get(flagKeys.footprint))
 
  323                 child = (inputRecord.getParent() != 0 
and inputRecord.get(flagKeys.peak))
 
  325                 if not (parent 
or child):
 
  326                     for pseudoFilterKey 
in self.pseudoFilterKeys:
 
  327                         if inputRecord.get(pseudoFilterKey):
 
  328                             hasPseudoFilter = 
True 
  329                             priorityRecord = inputRecord
 
  330                             priorityFlagKeys = flagKeys
 
  335                 isBad = 
any(inputRecord.get(flag) 
for flag 
in self.badFlags)
 
  336                 if isBad 
or inputRecord.get(self.fluxFlagKey) 
or inputRecord.get(self.instFluxErrKey) == 0:
 
  339                     sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
 
  340                 if numpy.isnan(sn) 
or sn < 0.:
 
  342                 if (parent 
or child) 
and priorityRecord 
is None:
 
  343                     priorityRecord = inputRecord
 
  344                     priorityFlagKeys = flagKeys
 
  347                     maxSNRecord = inputRecord
 
  348                     maxSNFlagKeys = flagKeys
 
  361                 bestRecord = priorityRecord
 
  362                 bestFlagKeys = priorityFlagKeys
 
  363             elif (prioritySN < self.config.minSN 
and (maxSN - prioritySN) > self.config.minSNDiff
 
  364                   and maxSNRecord 
is not None):
 
  365                 bestRecord = maxSNRecord
 
  366                 bestFlagKeys = maxSNFlagKeys
 
  367             elif priorityRecord 
is not None:
 
  368                 bestRecord = priorityRecord
 
  369                 bestFlagKeys = priorityFlagKeys
 
  371             if bestRecord 
is not None and bestFlagKeys 
is not None:
 
  372                 outputRecord = mergedCatalog.addNew()
 
  373                 outputRecord.assign(bestRecord, self.schemaMapper)
 
  374                 outputRecord.set(bestFlagKeys.output, 
True)
 
  376                 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
 
  380         for inputCatalog 
in orderedCatalogs:
 
  381             if len(mergedCatalog) != len(inputCatalog):
 
  382                 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
 
  383                                  (len(mergedCatalog), len(orderedCatalogs)))
 
  385         return pipeBase.Struct(
 
  386             mergedCatalog=mergedCatalog
 
  389     def write(self, patchRef, catalog):
 
  391         @brief Write the output. 
  393         @param[in]  patchRef   data reference for patch 
  394         @param[in]  catalog    catalog 
  396         We write as the dataset provided by the 'outputDataset' 
  399         patchRef.put(catalog, self.config.coaddName + 
"Coadd_" + self.outputDataset)
 
  402         mergeDataId = patchRef.dataId.copy()
 
  403         del mergeDataId[
"filter"]
 
  404         self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
 
  408         @brief No metadata to write, and not sure how to write it for a list of dataRefs.