25 from .multiBandUtils 
import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
 
   26                              getInputSchema, readCatalog)
 
   33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
 
   34 import lsst.pipe.base.connectionTypes 
as cT
 
   38                                    dimensions=(
"skymap", 
"tract", 
"patch"),
 
   39                                    defaultTemplates={
"inputCoaddName": 
"deep",
 
   40                                                      "outputCoaddName": 
"deep"}):
 
   41     inputSchema = cT.InitInput(
 
   42         doc=
"Schema for the output merged measurement catalog.",
 
   43         name=
"{inputCoaddName}Coadd_meas_schema",
 
   44         storageClass=
"SourceCatalog",
 
   46     outputSchema = cT.InitOutput(
 
   47         doc=
"Schema for the output merged measurement catalog.",
 
   48         name=
"{outputCoaddName}Coadd_ref_schema",
 
   49         storageClass=
"SourceCatalog",
 
   52         doc=
"Input catalogs to merge.",
 
   53         name=
"{inputCoaddName}Coadd_meas",
 
   55         storageClass=
"SourceCatalog",
 
   56         dimensions=[
"band", 
"skymap", 
"tract", 
"patch"],
 
   58     mergedCatalog = cT.Output(
 
   59         doc=
"Output merged catalog.",
 
   60         name=
"{outputCoaddName}Coadd_ref",
 
   61         storageClass=
"SourceCatalog",
 
   62         dimensions=[
"skymap", 
"tract", 
"patch"],
 
   66 class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
 
   68     @anchor MergeMeasurementsConfig_ 
   70     @brief Configuration parameters for the MergeMeasurementsTask 
   72     pseudoFilterList = pexConfig.ListField(
 
   75         doc=
"Names of filters which may have no associated detection\n" 
   76         "(N.b. should include MergeDetectionsConfig.skyFilterName)" 
   78     snName = pexConfig.Field(
 
   80         default=
"base_PsfFlux",
 
   81         doc=
"Name of flux measurement for calculating the S/N when choosing the reference band." 
   83     minSN = pexConfig.Field(
 
   86         doc=
"If the S/N from the priority band is below this value (and the S/N " 
   87         "is larger than minSNDiff compared to the priority band), use the band with " 
   88         "the largest S/N as the reference band." 
   90     minSNDiff = pexConfig.Field(
 
   93         doc=
"If the difference in S/N between another band and the priority band is larger " 
   94         "than this value (and the S/N in the priority band is less than minSN) " 
   95         "use the band with the largest S/N as the reference band" 
   97     flags = pexConfig.ListField(
 
   99         doc=
"Require that these flags, if available, are not set",
 
  100         default=[
"base_PixelFlags_flag_interpolatedCenter", 
"base_PsfFlux_flag",
 
  101                  "ext_photometryKron_KronFlux_flag", 
"modelfit_CModel_flag", ]
 
  103     priorityList = pexConfig.ListField(
 
  106         doc=
"Priority-ordered list of filter bands for the merge." 
  108     coaddName = pexConfig.Field(
 
  116         if len(self.priorityList) == 0:
 
  117             raise RuntimeError(
"No priority list provided")
 
  128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
 
  130     @anchor MergeMeasurementsTask_ 
  132     @brief Merge measurements from multiple bands 
  134     @section pipe_tasks_multiBand_Contents Contents 
  136       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose 
  137       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize 
  138       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run 
  139       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config 
  140       - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug 
  141       - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example 
  143     @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description 
  145     Command-line task that merges measurements from multiple bands. 
  147     Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter 
  148     bands to construct a unified catalog that is suitable for driving forced photometry. Every source is 
  149     required to have centroid, shape and flux measurements in each band. 
  152         deepCoadd_meas{tract,patch,filter}: SourceCatalog 
  154         deepCoadd_ref{tract,patch}: SourceCatalog 
  158     MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask". 
  160     @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize       Task initialization 
  162     @copydoc \_\_init\_\_ 
  164     @section pipe_tasks_multiBand_MergeMeasurementsTask_Run       Invoking the Task 
  168     @section pipe_tasks_multiBand_MergeMeasurementsTask_Config       Configuration parameters 
  170     See @ref MergeMeasurementsConfig_ 
  172     @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug           Debug variables 
  174     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 
  175     flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 
  178     MergeMeasurementsTask has no debug variables. 
  180     @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example 
  181     of using MergeMeasurementsTask 
  183     MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band. 
  184     The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in 
  185     coadds and individual exposures. 
  186     Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list 
  187     of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help` 
  188     command line argument: 
  190     mergeCoaddMeasurements.py --help 
  193     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  194     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 
  195     step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring 
  198     mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 
  200     This will merge the HSC-I & HSC-R band catalogs. The results are written in 
  201     `$CI_HSC_DIR/DATA/deepCoadd-results/`. 
  203     _DefaultName = 
"mergeCoaddMeasurements" 
  204     ConfigClass = MergeMeasurementsConfig
 
  205     RunnerClass = MergeSourcesRunner
 
  206     inputDataset = 
"meas" 
  207     outputDataset = 
"ref" 
  208     getSchemaCatalogs = _makeGetSchemaCatalogs(
"ref")
 
  211     def _makeArgumentParser(cls):
 
  217     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  218         inputs = butlerQC.get(inputRefs)
 
  219         dataIds = (ref.dataId 
for ref 
in inputRefs.catalogs)
 
  220         catalogDict = {dataId[
'band']: cat 
for dataId, cat 
in zip(dataIds, inputs[
'catalogs'])}
 
  221         inputs[
'catalogs'] = catalogDict
 
  222         outputs = self.run(**inputs)
 
  223         butlerQC.put(outputs, outputRefs)
 
  225     def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
 
  229         @param[in] schema: the schema of the detection catalogs used as input to this one 
  230         @param[in] butler: a butler used to read the input schema from disk, if schema is None 
  232         The task will set its own self.schema attribute to the schema of the output merged catalog. 
  234         super().__init__(**kwargs)
 
  236         if initInputs 
is not None:
 
  237             inputSchema = initInputs[
'inputSchema'].schema
 
  239             inputSchema = self.getInputSchema(butler=butler, schema=schema)
 
  241         self.schemaMapper.addMinimalSchema(inputSchema, 
True)
 
  242         self.instFluxKey = inputSchema.find(self.config.snName + 
"_instFlux").getKey()
 
  243         self.instFluxErrKey = inputSchema.find(self.config.snName + 
"_instFluxErr").getKey()
 
  244         self.fluxFlagKey = inputSchema.find(self.config.snName + 
"_flag").getKey()
 
  247         for band 
in self.config.priorityList:
 
  248             outputKey = self.schemaMapper.editOutputSchema().addField(
 
  249                 "merge_measurement_%s" % band,
 
  251                 doc=
"Flag field set if the measurements here are from the %s filter" % band
 
  253             peakKey = inputSchema.find(
"merge_peak_%s" % band).key
 
  254             footprintKey = inputSchema.find(
"merge_footprint_%s" % band).key
 
  255             self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
 
  256         self.schema = self.schemaMapper.getOutputSchema()
 
  258         self.pseudoFilterKeys = []
 
  259         for filt 
in self.config.pseudoFilterList:
 
  261                 self.pseudoFilterKeys.
append(self.schema.find(
"merge_peak_%s" % filt).getKey())
 
  262             except Exception 
as e:
 
  263                 self.log.
warning(
"merge_peak is not set for pseudo-filter %s: %s", filt, e)
 
  266         for flag 
in self.config.flags:
 
  268                 self.badFlags[flag] = self.schema.find(flag).getKey()
 
  269             except KeyError 
as exc:
 
  270                 self.log.
warning(
"Can't find flag %s in schema: %s", flag, exc)
 
  273     def runDataRef(self, patchRefList):
 
  275         @brief Merge coadd sources from multiple bands. Calls @ref `run`. 
  276         @param[in] patchRefList list of data references for each filter 
  278         catalogs = dict(
readCatalog(self, patchRef) 
for patchRef 
in patchRefList)
 
  279         mergedCatalog = self.run(catalogs).mergedCatalog
 
  280         self.write(patchRefList[0], mergedCatalog)
 
  282     def run(self, catalogs):
 
  284         Merge measurement catalogs to create a single reference catalog for forced photometry 
  286         @param[in] catalogs: the catalogs to be merged 
  288         For parent sources, we choose the first band in config.priorityList for which the 
  289         merge_footprint flag for that band is is True. 
  291         For child sources, the logic is the same, except that we use the merge_peak flags. 
  294         orderedCatalogs = [catalogs[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  295         orderedKeys = [self.flagKeys[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  298         mergedCatalog.reserve(len(orderedCatalogs[0]))
 
  300         idKey = orderedCatalogs[0].table.getIdKey()
 
  301         for catalog 
in orderedCatalogs[1:]:
 
  302             if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
 
  303                 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
 
  307         for orderedRecords 
in zip(*orderedCatalogs):
 
  312             priorityRecord = 
None 
  313             priorityFlagKeys = 
None 
  315             hasPseudoFilter = 
False 
  319             for inputRecord, flagKeys 
in zip(orderedRecords, orderedKeys):
 
  320                 parent = (inputRecord.getParent() == 0 
and inputRecord.get(flagKeys.footprint))
 
  321                 child = (inputRecord.getParent() != 0 
and inputRecord.get(flagKeys.peak))
 
  323                 if not (parent 
or child):
 
  324                     for pseudoFilterKey 
in self.pseudoFilterKeys:
 
  325                         if inputRecord.get(pseudoFilterKey):
 
  326                             hasPseudoFilter = 
True 
  327                             priorityRecord = inputRecord
 
  328                             priorityFlagKeys = flagKeys
 
  333                 isBad = 
any(inputRecord.get(flag) 
for flag 
in self.badFlags)
 
  334                 if isBad 
or inputRecord.get(self.fluxFlagKey) 
or inputRecord.get(self.instFluxErrKey) == 0:
 
  337                     sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
 
  338                 if numpy.isnan(sn) 
or sn < 0.:
 
  340                 if (parent 
or child) 
and priorityRecord 
is None:
 
  341                     priorityRecord = inputRecord
 
  342                     priorityFlagKeys = flagKeys
 
  345                     maxSNRecord = inputRecord
 
  346                     maxSNFlagKeys = flagKeys
 
  359                 bestRecord = priorityRecord
 
  360                 bestFlagKeys = priorityFlagKeys
 
  361             elif (prioritySN < self.config.minSN 
and (maxSN - prioritySN) > self.config.minSNDiff
 
  362                   and maxSNRecord 
is not None):
 
  363                 bestRecord = maxSNRecord
 
  364                 bestFlagKeys = maxSNFlagKeys
 
  365             elif priorityRecord 
is not None:
 
  366                 bestRecord = priorityRecord
 
  367                 bestFlagKeys = priorityFlagKeys
 
  369             if bestRecord 
is not None and bestFlagKeys 
is not None:
 
  370                 outputRecord = mergedCatalog.addNew()
 
  371                 outputRecord.assign(bestRecord, self.schemaMapper)
 
  372                 outputRecord.set(bestFlagKeys.output, 
True)
 
  374                 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
 
  378         for inputCatalog 
in orderedCatalogs:
 
  379             if len(mergedCatalog) != len(inputCatalog):
 
  380                 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
 
  381                                  (len(mergedCatalog), len(orderedCatalogs)))
 
  383         return pipeBase.Struct(
 
  384             mergedCatalog=mergedCatalog
 
  387     def write(self, patchRef, catalog):
 
  389         @brief Write the output. 
  391         @param[in]  patchRef   data reference for patch 
  392         @param[in]  catalog    catalog 
  394         We write as the dataset provided by the 'outputDataset' 
  397         patchRef.put(catalog, self.config.coaddName + 
"Coadd_" + self.outputDataset)
 
  400         mergeDataId = patchRef.dataId.copy()
 
  401         del mergeDataId[
"filter"]
 
  402         self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
 
  406         @brief No metadata to write, and not sure how to write it for a list of dataRefs. 
A mapping between the keys of two Schemas, used to copy data between them.
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, coaddExposures, bbox, wcs)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.