LSSTApplications  18.1.0
LSSTDataManagementBasePackage
deblendCoaddSourcesPipeline.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 from lsst.pipe.base import (Struct, PipelineTask, InitInputDatasetField, InitOutputDatasetField,
23  InputDatasetField, OutputDatasetField, PipelineTaskConfig)
24 
25 from lsst.pex.config import ConfigurableField
26 from lsst.meas.deblender import SourceDeblendTask, MultibandDeblendTask
27 
28 import lsst.afw.image as afwImage
29 import lsst.afw.table as afwTable
30 
31 __all__ = ("DeblendCoaddSourcesSingleConfig", "DeblendCoaddSourcesSingleTask",
32  "DeblendCoaddSourcesMultiConfig", "DeblendCoaddSourcesMultiTask")
33 
34 
35 class DeblendCoaddSourcesBaseConfig(PipelineTaskConfig):
36  inputSchema = InitInputDatasetField(
37  doc="Input schema to use in the deblend catalog",
38  nameTemplate="{inputCoaddName}Coadd_mergeDet_schema",
39  storageClass="SourceCatalog"
40  )
41  peakSchema = InitInputDatasetField(
42  doc="Schema of the footprint peak catalogs",
43  nameTemplate="{inputCoaddName}Coadd_peak_schema",
44  storageClass="PeakCatalog"
45  )
46  mergedDetections = InputDatasetField(
47  doc="Detection catalog merged across bands",
48  nameTemplate="{inputCoaddName}Coadd_mergeDet",
49  storageClass="SourceCatalog",
50  scalar=True,
51  dimensions=("tract", "patch", "skymap")
52  )
53 
54  def setDefaults(self):
55  super().setDefaults()
56  self.quantum.dimensions = ("tract", "patch", "abstract_filter", "skymap")
57  self.formatTemplateNames({"inputCoaddName": "deep", "outputCoaddName": "deep"})
58 
59 
61  singleBandDeblend = ConfigurableField(
62  target=SourceDeblendTask,
63  doc="Task to deblend an image in one band"
64  )
66  doc="Exposure on which to run deblending",
67  nameTemplate="{inputCoaddName}Coadd_calexp",
68  storageClass="ExposureF",
69  scalar=True,
70  dimensions=("tract", "patch", "abstract_filter", "skymap")
71  )
72  measureCatalog = OutputDatasetField(
73  doc="The output measurement catalog of deblended sources",
74  nameTemplate="{outputCoaddName}Coadd_deblendedFlux",
75  scalar=True,
76  storageClass="SourceCatalog",
77  dimensions=("tract", "patch", "abstract_filter", "skymap")
78  )
79  outputSchema = InitOutputDatasetField(
80  doc="Output of the schema used in deblending task",
81  nameTemplate="{outputCoaddName}Coadd_deblendedFlux_schema",
82  storageClass="SourceCatalog"
83  )
84 
85 
87  multibandDeblend = ConfigurableField(
88  target=MultibandDeblendTask,
89  doc="Task to deblend an images in multiple bands"
90  )
92  doc="Exposure on which to run deblending",
93  nameTemplate="{inputCoaddName}Coadd_calexp",
94  storageClass="ExposureF",
95  dimensions=("tract", "patch", "abstract_filter", "skymap")
96  )
97  outputSchema = InitOutputDatasetField(
98  doc="Output of the schema used in deblending task",
99  nameTemplate="{outputCoaddName}Coadd_deblendedModel_schema",
100  storageClass="SourceCatalog"
101  )
102  fluxCatalogs = OutputDatasetField(
103  doc="Flux catalogs produced by multiband deblending, not written "
104  "if conserve flux is turned off",
105  nameTemplate="{outputCoaddName}Coadd_deblendedFlux",
106  storageClass="SourceCatalog",
107  dimensions=("tract", "patch", "abstract_filter", "skymap")
108  )
109  templateCatalogs = OutputDatasetField(
110  doc="Template catalogs produced by multiband deblending",
111  nameTemplate="{outputCoaddName}Coadd_deblendedModel",
112  storageClass="SourceCatalog",
113  dimensions=("tract", "patch", "abstract_filter", "skymap")
114  )
115 
116  def setDefaults(self):
117  super().setDefaults()
118  self.quantum.dimensions = ("tract", "patch", "skymap")
119 
120 
121 class DeblendCoaddSourcesBaseTask(PipelineTask):
122  def __init__(self, initInputs, **kwargs):
123  super().__init__(initInputs=initInputs, **kwargs)
124  schema = initInputs["inputSchema"].schema
125  self.peakSchema = initInputs["peakSchema"].schema
127  self.schemaMapper.addMinimalSchema(schema)
128  self.schema = self.schemaMapper.getOutputSchema()
129 
131  return {"outputSchema": afwTable.SourceCatalog(self.schema)}
132 
133  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
134  packedId, maxBits = butler.registry.packDataId("tract_patch", inputDataIds["mergedDetections"],
135  returnMaxBits=True)
136  inputData["idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
137  return self.run(**inputData)
138 
139  def _makeSourceCatalog(self, mergedDetections, idFactory):
140  table = afwTable.SourceTable.make(self.schema, idFactory)
141  sources = afwTable.SourceCatalog(table)
142  sources.extend(mergedDetections, self.schemaMapper)
143  return sources
144 
145 
147  ConfigClass = DeblendCoaddSourcesSingleConfig
148  _DefaultName = "deblendCoaddSourcesSingle"
149 
150  def __init__(self, initInputs, **kwargs):
151  super().__init__(initInputs=initInputs, **kwargs)
152  self.makeSubtask("singleBandDeblend", schema=self.schema, peakSchema=self.peakSchema)
153 
154  def run(self, coadd, mergedDetections, idFactory):
155  sources = self._makeSourceCatalog(mergedDetections, idFactory)
156  self.singleBandDeblend.run(coadd, sources)
157  return Struct(measureCatalog=sources)
158 
159 
161  ConfigClass = DeblendCoaddSourcesMultiConfig
162  _DefaultName = "deblendCoaddSourcesMulti"
163 
164  def __init__(self, initInputs, **kwargs):
165  super().__init__(initInputs=initInputs, **kwargs)
166  self.makeSubtask("multibandDeblend", schema=self.schema, peakSchema=self.peakSchema)
167 
168  @classmethod
169  def getOutputDatasetTypes(cls, config):
170  outputTypeDict = super().getOutputDatasetTypes(config)
171  # If Conserve flux is set to false, remove that catalog as a possible output
172  if not config.multibandDeblend.conserveFlux:
173  outputTypeDict.pop("fluxCatalogs", None)
174  return outputTypeDict
175 
176  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
177  inputData["filters"] = [dId["abstract_filter"] for dId in inputDataIds["coadds"]]
178  return super().adaptArgsAndRun(inputData, inputDataIds, outputDataIds, butler)
179 
180  def run(self, coadds, filters, mergedDetections, idFactory):
181  sources = self._makeSourceCatalog(mergedDetections, idFactory)
182  multiExposure = afwImage.MultibandExposure.fromExposures(filters, coadds)
183  fluxCatalogs, templateCatalogs = self.multibandDeblend.run(multiExposure, sources)
184  retStruct = Struct(templateCatalogs)
185  if self.config.multibandDeblend.conserveFlux:
186  retStruct.fluxCatalogs = fluxCatalogs
187  return retStruct
def InitOutputDatasetField
Definition: config.py:339
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...