LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
fit_coadd_multiband.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 "CoaddMultibandFitConfig", "CoaddMultibandFitSubConfig", "CoaddMultibandFitSubTask",
24 "CoaddMultibandFitTask",
25]
26
27from .fit_multiband import CatalogExposure, CatalogExposureConfig
28
29import lsst.afw.table as afwTable
30from lsst.meas.base import SkyMapIdGeneratorConfig
31from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
32import lsst.pex.config as pexConfig
33import lsst.pipe.base as pipeBase
34import lsst.pipe.base.connectionTypes as cT
35
36import astropy.table
37from abc import ABC, abstractmethod
38from pydantic import Field
39from pydantic.dataclasses import dataclass
40from typing import Iterable
41
42CoaddMultibandFitBaseTemplates = {
43 "name_coadd": "deep",
44 "name_method": "multiprofit",
45}
46
47
48@dataclass(frozen=True, kw_only=True, config=CatalogExposureConfig)
50 table_psf_fits: astropy.table.Table = Field(title="A table of PSF fit parameters for each source")
51
52 def get_catalog(self):
53 return self.catalog
54
55
57 pipeBase.PipelineTaskConnections,
58 dimensions=("tract", "patch", "skymap"),
59 defaultTemplates=CoaddMultibandFitBaseTemplates,
60):
61 cat_ref = cT.Input(
62 doc="Reference multiband source catalog",
63 name="{name_coadd}Coadd_ref",
64 storageClass="SourceCatalog",
65 dimensions=("tract", "patch", "skymap"),
66 )
67 cats_meas = cT.Input(
68 doc="Deblended single-band source catalogs",
69 name="{name_coadd}Coadd_meas",
70 storageClass="SourceCatalog",
71 dimensions=("tract", "patch", "band", "skymap"),
72 multiple=True,
73 )
74 coadds = cT.Input(
75 doc="Exposures on which to run fits",
76 name="{name_coadd}Coadd_calexp",
77 storageClass="ExposureF",
78 dimensions=("tract", "patch", "band", "skymap"),
79 multiple=True,
80 )
81 models_psf = cT.Input(
82 doc="Input PSF model parameter catalog",
83 # Consider allowing independent psf fit method
84 name="{name_coadd}Coadd_psfs_{name_method}",
85 storageClass="ArrowAstropy",
86 dimensions=("tract", "patch", "band", "skymap"),
87 multiple=True,
88 )
89 models_scarlet = pipeBase.connectionTypes.Input(
90 doc="Multiband scarlet models produced by the deblender",
91 name="{name_coadd}Coadd_scarletModelData",
92 storageClass="ScarletModelData",
93 dimensions=("tract", "patch", "skymap"),
94 )
95
96 def adjustQuantum(self, inputs, outputs, label, data_id):
97 """Validates the `lsst.daf.butler.DatasetRef` bands against the
98 subtask's list of bands to fit and drops unnecessary bands.
99
100 Parameters
101 ----------
102 inputs : `dict`
103 Dictionary whose keys are an input (regular or prerequisite)
104 connection name and whose values are a tuple of the connection
105 instance and a collection of associated `DatasetRef` objects.
106 The exact type of the nested collections is unspecified; it can be
107 assumed to be multi-pass iterable and support `len` and ``in``, but
108 it should not be mutated in place. In contrast, the outer
109 dictionaries are guaranteed to be temporary copies that are true
110 `dict` instances, and hence may be modified and even returned; this
111 is especially useful for delegating to `super` (see notes below).
112 outputs : `Mapping`
113 Mapping of output datasets, with the same structure as ``inputs``.
114 label : `str`
115 Label for this task in the pipeline (should be used in all
116 diagnostic messages).
117 data_id : `lsst.daf.butler.DataCoordinate`
118 Data ID for this quantum in the pipeline (should be used in all
119 diagnostic messages).
120
121 Returns
122 -------
123 adjusted_inputs : `Mapping`
124 Mapping of the same form as ``inputs`` with updated containers of
125 input `DatasetRef` objects. All inputs involving the 'band'
126 dimension are adjusted to put them in consistent order and remove
127 unneeded bands.
128 adjusted_outputs : `Mapping`
129 Mapping of updated output datasets; always empty for this task.
130
131 Raises
132 ------
133 lsst.pipe.base.NoWorkFound
134 Raised if there are not enough of the right bands to run the task
135 on this quantum.
136 """
137 # Check which bands are going to be fit
138 bands_fit, bands_read_only = self.config.get_band_sets()
139 bands_needed = bands_fit + [band for band in bands_read_only if band not in bands_fit]
140
141 adjusted_inputs = {}
142 for connection_name, (connection, dataset_refs) in inputs.items():
143 # Datasets without bands in their dimensions should be fine
144 if 'band' in connection.dimensions:
145 datasets_by_band = {dref.dataId['band']: dref for dref in dataset_refs}
146 if not set(bands_needed).issubset(datasets_by_band.keys()):
147 raise pipeBase.NoWorkFound(
148 f'DatasetRefs={dataset_refs} have data with bands in the'
149 f' set={set(datasets_by_band.keys())},'
150 f' which is not a superset of the required bands={bands_needed} defined by'
151 f' {self.config.__class__}.fit_coadd_multiband='
152 f'{self.config.fit_coadd_multiband._value.__class__}\'s attributes'
153 f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
154 f' Add the required bands={set(bands_needed).difference(datasets_by_band.keys())}.'
155 )
156 # Adjust all datasets with band dimensions to include just
157 # the needed bands, in consistent order.
158 adjusted_inputs[connection_name] = (
159 connection,
160 [datasets_by_band[band] for band in bands_needed]
161 )
162
163 # Delegate to super for more checks.
164 inputs.update(adjusted_inputs)
165 super().adjustQuantum(inputs, outputs, label, data_id)
166 return adjusted_inputs, {}
167
168
170 cat_output = cT.Output(
171 doc="Output source model fit parameter catalog",
172 name="{name_coadd}Coadd_objects_{name_method}",
173 storageClass="ArrowTable",
174 dimensions=("tract", "patch", "skymap"),
175 )
176
177
178class CoaddMultibandFitSubConfig(pexConfig.Config):
179 """Configuration for implementing fitter subtasks.
180 """
181 @abstractmethod
182 def bands_read_only(self) -> set:
183 """Return the set of bands that the Task needs to read (e.g. for
184 defining priors) but not necessarily fit.
185
186 Returns
187 -------
188 The set of such bands.
189 """
190
191
192class CoaddMultibandFitSubTask(pipeBase.Task, ABC):
193 """Subtask interface for multiband fitting of deblended sources.
194
195 Parameters
196 ----------
197 **kwargs
198 Additional arguments to be passed to the `lsst.pipe.base.Task`
199 constructor.
200 """
201 ConfigClass = CoaddMultibandFitSubConfig
202
203 def __init__(self, **kwargs):
204 super().__init__(**kwargs)
205
206 @abstractmethod
207 def run(
208 self, catexps: Iterable[CatalogExposureInputs], cat_ref: afwTable.SourceCatalog
209 ) -> pipeBase.Struct:
210 """Fit models to deblended sources from multi-band inputs.
211
212 Parameters
213 ----------
214 catexps : `typing.List [CatalogExposureInputs]`
215 A list of catalog-exposure pairs with metadata in a given band.
216 cat_ref : `lsst.afw.table.SourceCatalog`
217 A reference source catalog to fit.
218
219 Returns
220 -------
221 retStruct : `lsst.pipe.base.Struct`
222 A struct with a cat_output attribute containing the output
223 measurement catalog.
224
225 Notes
226 -----
227 Subclasses may have further requirements on the input parameters,
228 including:
229 - Passing only one catexp per band;
230 - Catalogs containing HeavyFootprints with deblended images;
231 - Fitting only a subset of the sources.
232 If any requirements are not met, the subtask should fail as soon as
233 possible.
234 """
235
236
238 pipeBase.PipelineTaskConfig,
239 pipelineConnections=CoaddMultibandFitInputConnections,
240):
241 """Base class for multiband fitting."""
242
243 fit_coadd_multiband = pexConfig.ConfigurableField(
244 target=CoaddMultibandFitSubTask,
245 doc="Task to fit sources using multiple bands",
246 )
247 idGenerator = SkyMapIdGeneratorConfig.make_field()
248
249 def get_band_sets(self):
250 """Get the set of bands required by the fit_coadd_multiband subtask.
251
252 Returns
253 -------
254 bands_fit : `set`
255 The set of bands that the subtask will fit.
256 bands_read_only : `set`
257 The set of bands that the subtask will only read data
258 (measurement catalog and exposure) for.
259 """
260 try:
261 bands_fit = self.fit_coadd_multiband.bands_fit
262 except AttributeError:
263 raise RuntimeError(f'{__class__}.fit_coadd_multiband must have bands_fit attribute') from None
264 bands_read_only = self.fit_coadd_multiband.bands_read_only()
265 return tuple(list({band: None for band in bands}.keys()) for bands in (bands_fit, bands_read_only))
266
267
269 CoaddMultibandFitBaseConfig,
270 pipelineConnections=CoaddMultibandFitConnections,
271):
272 """Configuration for a CoaddMultibandFitTask."""
273
274
276 """Base class for tasks that fit or rebuild multiband models.
277
278 This class only implements data reconstruction.
279 """
280
281 def build_catexps(self, butlerQC, inputRefs, inputs) -> list[CatalogExposureInputs]:
282 id_tp = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
283 # This is a roundabout way of ensuring all inputs get sorted and matched
284 input_refs_objs = [(getattr(inputRefs, key), inputs[key])
285 for key in ("cats_meas", "coadds", "models_psf")]
286 cats, exps, models_psf = [
287 {dRef.dataId: obj for dRef, obj in zip(refs, objs)}
288 for refs, objs in input_refs_objs
289 ]
290 dataIds = set(cats).union(set(exps))
291 models_scarlet = inputs["models_scarlet"]
292 catexps = {}
293 for dataId in dataIds:
294 catalog = cats[dataId]
295 exposure = exps[dataId]
296 updateCatalogFootprints(
297 modelData=models_scarlet,
298 catalog=catalog,
299 band=dataId['band'],
300 imageForRedistribution=exposure,
301 removeScarletData=True,
302 updateFluxColumns=False,
303 )
304 catexps[dataId['band']] = CatalogExposureInputs(
305 catalog=catalog, exposure=exposure, table_psf_fits=models_psf[dataId],
306 dataId=dataId, id_tract_patch=id_tp,
307 )
308 catexps = [catexps[band] for band in self.config.get_band_sets()[0]]
309 return catexps
310
311
312class CoaddMultibandFitTask(CoaddMultibandFitBase, pipeBase.PipelineTask):
313 """Fit deblended exposures in multiple bands simultaneously.
314
315 It is generally assumed but not enforced (except optionally by the
316 configurable `fit_coadd_multiband` subtask) that there is only one exposure
317 per band, presumably a coadd.
318 """
319
320 ConfigClass = CoaddMultibandFitConfig
321 _DefaultName = "CoaddMultibandFit"
322
323 def __init__(self, initInputs, **kwargs):
324 super().__init__(initInputs=initInputs, **kwargs)
325 self.makeSubtask("fit_coadd_multiband")
326
327 def runQuantum(self, butlerQC, inputRefs, outputRefs):
328 inputs = butlerQC.get(inputRefs)
329 catexps = self.build_catexps(butlerQC, inputRefs, inputs)
330 outputs = self.run(catexps=catexps, cat_ref=inputs['cat_ref'])
331 butlerQC.put(outputs, outputRefs)
332
333 def run(self, catexps: list[CatalogExposure], cat_ref: afwTable.SourceCatalog) -> pipeBase.Struct:
334 """Fit sources from a reference catalog using data from multiple
335 exposures in the same region (patch).
336
337 Parameters
338 ----------
339 catexps : `typing.List [CatalogExposure]`
340 A list of catalog-exposure pairs in a given band.
341 cat_ref : `lsst.afw.table.SourceCatalog`
342 A reference source catalog to fit.
343
344 Returns
345 -------
346 retStruct : `lsst.pipe.base.Struct`
347 A struct with a cat_output attribute containing the output
348 measurement catalog.
349
350 Notes
351 -----
352 Subtasks may have further requirements; see `CoaddMultibandFitSubTask.run`.
353 """
354 cat_output = self.fit_coadd_multiband.run(catalog_multi=cat_ref, catexps=catexps).output
355 retStruct = pipeBase.Struct(cat_output=cat_output)
356 return retStruct
list[CatalogExposureInputs] build_catexps(self, butlerQC, inputRefs, inputs)
pipeBase.Struct run(self, Iterable[CatalogExposureInputs] catexps, afwTable.SourceCatalog cat_ref)
pipeBase.Struct run(self, list[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
daf::base::PropertySet * set
Definition fits.cc:931