LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
fit_coadd_multiband.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 "CoaddMultibandFitConfig", "CoaddMultibandFitConnections", "CoaddMultibandFitSubConfig",
24 "CoaddMultibandFitSubTask", "CoaddMultibandFitTask",
25]
26
27from .fit_multiband import CatalogExposure, CatalogExposureConfig
28
29import lsst.afw.table as afwTable
30from lsst.meas.base import SkyMapIdGeneratorConfig
31from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
32import lsst.pex.config as pexConfig
33import lsst.pipe.base as pipeBase
34import lsst.pipe.base.connectionTypes as cT
35
36import astropy.table
37from abc import ABC, abstractmethod
38from pydantic import Field
39from pydantic.dataclasses import dataclass
40from typing import Iterable
41
42CoaddMultibandFitBaseTemplates = {
43 "name_coadd": "deep",
44 "name_method": "multiprofit",
45 "name_table": "objects",
46}
47
48
49@dataclass(frozen=True, kw_only=True, config=CatalogExposureConfig)
51 table_psf_fits: astropy.table.Table = Field(title="A table of PSF fit parameters for each source")
52
53 def get_catalog(self):
54 return self.catalog
55
56
58 pipeBase.PipelineTaskConnections,
59 dimensions=("tract", "patch", "skymap"),
60 defaultTemplates=CoaddMultibandFitBaseTemplates,
61):
62 cat_ref = cT.Input(
63 doc="Reference multiband source catalog",
64 name="{name_coadd}Coadd_ref",
65 storageClass="SourceCatalog",
66 dimensions=("tract", "patch", "skymap"),
67 )
68 cats_meas = cT.Input(
69 doc="Deblended single-band source catalogs",
70 name="{name_coadd}Coadd_meas",
71 storageClass="SourceCatalog",
72 dimensions=("tract", "patch", "band", "skymap"),
73 multiple=True,
74 )
75 coadds = cT.Input(
76 doc="Exposures on which to run fits",
77 name="{name_coadd}Coadd_calexp",
78 storageClass="ExposureF",
79 dimensions=("tract", "patch", "band", "skymap"),
80 multiple=True,
81 )
82 models_psf = cT.Input(
83 doc="Input PSF model parameter catalog",
84 # Consider allowing independent psf fit method
85 name="{name_coadd}Coadd_psfs_{name_method}",
86 storageClass="ArrowAstropy",
87 dimensions=("tract", "patch", "band", "skymap"),
88 multiple=True,
89 )
90 models_scarlet = pipeBase.connectionTypes.Input(
91 doc="Multiband scarlet models produced by the deblender",
92 name="{name_coadd}Coadd_scarletModelData",
93 storageClass="ScarletModelData",
94 dimensions=("tract", "patch", "skymap"),
95 )
96
97 def adjustQuantum(self, inputs, outputs, label, data_id):
98 """Validates the `lsst.daf.butler.DatasetRef` bands against the
99 subtask's list of bands to fit and drops unnecessary bands.
100
101 Parameters
102 ----------
103 inputs : `dict`
104 Dictionary whose keys are an input (regular or prerequisite)
105 connection name and whose values are a tuple of the connection
106 instance and a collection of associated `DatasetRef` objects.
107 The exact type of the nested collections is unspecified; it can be
108 assumed to be multi-pass iterable and support `len` and ``in``, but
109 it should not be mutated in place. In contrast, the outer
110 dictionaries are guaranteed to be temporary copies that are true
111 `dict` instances, and hence may be modified and even returned; this
112 is especially useful for delegating to `super` (see notes below).
113 outputs : `Mapping`
114 Mapping of output datasets, with the same structure as ``inputs``.
115 label : `str`
116 Label for this task in the pipeline (should be used in all
117 diagnostic messages).
118 data_id : `lsst.daf.butler.DataCoordinate`
119 Data ID for this quantum in the pipeline (should be used in all
120 diagnostic messages).
121
122 Returns
123 -------
124 adjusted_inputs : `Mapping`
125 Mapping of the same form as ``inputs`` with updated containers of
126 input `DatasetRef` objects. All inputs involving the 'band'
127 dimension are adjusted to put them in consistent order and remove
128 unneeded bands.
129 adjusted_outputs : `Mapping`
130 Mapping of updated output datasets; always empty for this task.
131
132 Raises
133 ------
134 lsst.pipe.base.NoWorkFound
135 Raised if there are not enough of the right bands to run the task
136 on this quantum.
137 """
138 # Check which bands are going to be fit
139 bands_fit, bands_read_only = self.config.get_band_sets()
140 bands_needed = bands_fit + [band for band in bands_read_only if band not in bands_fit]
141
142 adjusted_inputs = {}
143 for connection_name, (connection, dataset_refs) in inputs.items():
144 # Datasets without bands in their dimensions should be fine
145 if 'band' in connection.dimensions:
146 datasets_by_band = {dref.dataId['band']: dref for dref in dataset_refs}
147 if not set(bands_needed).issubset(datasets_by_band.keys()):
148 raise pipeBase.NoWorkFound(
149 f'DatasetRefs={dataset_refs} have data with bands in the'
150 f' set={set(datasets_by_band.keys())},'
151 f' which is not a superset of the required bands={bands_needed} defined by'
152 f' {self.config.__class__}.fit_coadd_multiband='
153 f'{self.config.fit_coadd_multiband._value.__class__}\'s attributes'
154 f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
155 f' Add the required bands={set(bands_needed).difference(datasets_by_band.keys())}.'
156 )
157 # Adjust all datasets with band dimensions to include just
158 # the needed bands, in consistent order.
159 adjusted_inputs[connection_name] = (
160 connection,
161 [datasets_by_band[band] for band in bands_needed]
162 )
163
164 # Delegate to super for more checks.
165 inputs.update(adjusted_inputs)
166 super().adjustQuantum(inputs, outputs, label, data_id)
167 return adjusted_inputs, {}
168
169 def __init__(self, *, config=None):
170 if config.drop_psf_connection:
171 del self.models_psf
172
173
175 cat_output = cT.Output(
176 doc="Output source model fit parameter catalog",
177 name="{name_coadd}Coadd_{name_table}_{name_method}",
178 storageClass="ArrowTable",
179 dimensions=("tract", "patch", "skymap"),
180 )
181
182
183class CoaddMultibandFitSubConfig(pexConfig.Config):
184 """Configuration for implementing fitter subtasks.
185 """
186
187 bands_fit = pexConfig.ListField[str](
188 default=[],
189 doc="list of bandpass filters to fit",
190 listCheck=lambda x: (len(x) > 0) and (len(set(x)) == len(x)),
191 )
192
193 @abstractmethod
194 def bands_read_only(self) -> set:
195 """Return the set of bands that the Task needs to read (e.g. for
196 defining priors) but not necessarily fit.
197
198 Returns
199 -------
200 The set of such bands.
201 """
202
203
204class CoaddMultibandFitSubTask(pipeBase.Task, ABC):
205 """Subtask interface for multiband fitting of deblended sources.
206
207 Parameters
208 ----------
209 **kwargs
210 Additional arguments to be passed to the `lsst.pipe.base.Task`
211 constructor.
212 """
213 ConfigClass = CoaddMultibandFitSubConfig
214
215 def __init__(self, **kwargs):
216 super().__init__(**kwargs)
217
218 @abstractmethod
219 def run(
220 self, catexps: Iterable[CatalogExposureInputs], cat_ref: afwTable.SourceCatalog
221 ) -> pipeBase.Struct:
222 """Fit models to deblended sources from multi-band inputs.
223
224 Parameters
225 ----------
226 catexps : `typing.List [CatalogExposureInputs]`
227 A list of catalog-exposure pairs with metadata in a given band.
228 cat_ref : `lsst.afw.table.SourceCatalog`
229 A reference source catalog to fit.
230
231 Returns
232 -------
233 retStruct : `lsst.pipe.base.Struct`
234 A struct with a cat_output attribute containing the output
235 measurement catalog.
236
237 Notes
238 -----
239 Subclasses may have further requirements on the input parameters,
240 including:
241 - Passing only one catexp per band;
242 - Catalogs containing HeavyFootprints with deblended images;
243 - Fitting only a subset of the sources.
244 If any requirements are not met, the subtask should fail as soon as
245 possible.
246 """
247
248
250 pipeBase.PipelineTaskConfig,
251 pipelineConnections=CoaddMultibandFitInputConnections,
252):
253 """Base class for multiband fitting."""
254
255 drop_psf_connection = pexConfig.Field[bool](
256 doc="Whether to drop the PSF model connection, e.g. because PSF parameters are in the input catalog",
257 default=False,
258 )
259 fit_coadd_multiband = pexConfig.ConfigurableField(
260 target=CoaddMultibandFitSubTask,
261 doc="Task to fit sources using multiple bands",
262 )
263 idGenerator = SkyMapIdGeneratorConfig.make_field()
264
265 def get_band_sets(self):
266 """Get the set of bands required by the fit_coadd_multiband subtask.
267
268 Returns
269 -------
270 bands_fit : `set`
271 The set of bands that the subtask will fit.
272 bands_read_only : `set`
273 The set of bands that the subtask will only read data
274 (measurement catalog and exposure) for.
275 """
276 try:
277 bands_fit = self.fit_coadd_multiband.bands_fit
278 except AttributeError:
279 raise RuntimeError(f'{__class__}.fit_coadd_multiband must have bands_fit attribute') from None
280 bands_read_only = self.fit_coadd_multiband.bands_read_only()
281 return tuple(list({band: None for band in bands}.keys()) for bands in (bands_fit, bands_read_only))
282
283
285 CoaddMultibandFitBaseConfig,
286 pipelineConnections=CoaddMultibandFitConnections,
287):
288 """Configuration for a CoaddMultibandFitTask."""
289
290
292 """Base class for tasks that fit or rebuild multiband models.
293
294 This class only implements data reconstruction.
295 """
296
297 def build_catexps(self, butlerQC, inputRefs, inputs) -> list[CatalogExposureInputs]:
298 id_tp = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
299 # This is a roundabout way of ensuring all inputs get sorted and matched
300 keys = ["cats_meas", "coadds"]
301 has_psf_models = "models_psf" in inputs
302 if has_psf_models:
303 keys.append("models_psf")
304 input_refs_objs = ((getattr(inputRefs, key), inputs[key]) for key in keys)
305 inputs_sorted = tuple(
306 {dRef.dataId: obj for dRef, obj in zip(refs, objs)}
307 for refs, objs in input_refs_objs
308 )
309 cats = inputs_sorted[0]
310 exps = inputs_sorted[1]
311 models_psf = inputs_sorted[2] if has_psf_models else None
312 dataIds = set(cats).union(set(exps))
313 models_scarlet = inputs["models_scarlet"]
314 catexps = {}
315 for dataId in dataIds:
316 catalog = cats[dataId]
317 exposure = exps[dataId]
318 updateCatalogFootprints(
319 modelData=models_scarlet,
320 catalog=catalog,
321 band=dataId['band'],
322 imageForRedistribution=exposure,
323 removeScarletData=True,
324 updateFluxColumns=False,
325 )
326 catexps[dataId['band']] = CatalogExposureInputs(
327 catalog=catalog,
328 exposure=exposure,
329 table_psf_fits=models_psf[dataId] if has_psf_models else astropy.table.Table(),
330 dataId=dataId,
331 id_tract_patch=id_tp,
332 )
333 catexps = [catexps[band] for band in self.config.get_band_sets()[0]]
334 return catexps
335
336
337class CoaddMultibandFitTask(CoaddMultibandFitBase, pipeBase.PipelineTask):
338 """Fit deblended exposures in multiple bands simultaneously.
339
340 It is generally assumed but not enforced (except optionally by the
341 configurable `fit_coadd_multiband` subtask) that there is only one exposure
342 per band, presumably a coadd.
343 """
344
345 ConfigClass = CoaddMultibandFitConfig
346 _DefaultName = "coaddMultibandFit"
347
348 def __init__(self, initInputs, **kwargs):
349 super().__init__(initInputs=initInputs, **kwargs)
350 self.makeSubtask("fit_coadd_multiband")
351
352 def runQuantum(self, butlerQC, inputRefs, outputRefs):
353 inputs = butlerQC.get(inputRefs)
354 catexps = self.build_catexps(butlerQC, inputRefs, inputs)
355 outputs = self.run(catexps=catexps, cat_ref=inputs['cat_ref'])
356 butlerQC.put(outputs, outputRefs)
357
358 def run(self, catexps: list[CatalogExposure], cat_ref: afwTable.SourceCatalog) -> pipeBase.Struct:
359 """Fit sources from a reference catalog using data from multiple
360 exposures in the same region (patch).
361
362 Parameters
363 ----------
364 catexps : `typing.List [CatalogExposure]`
365 A list of catalog-exposure pairs in a given band.
366 cat_ref : `lsst.afw.table.SourceCatalog`
367 A reference source catalog to fit.
368
369 Returns
370 -------
371 retStruct : `lsst.pipe.base.Struct`
372 A struct with a cat_output attribute containing the output
373 measurement catalog.
374
375 Notes
376 -----
377 Subtasks may have further requirements; see `CoaddMultibandFitSubTask.run`.
378 """
379 cat_output = self.fit_coadd_multiband.run(catalog_multi=cat_ref, catexps=catexps).output
380 retStruct = pipeBase.Struct(cat_output=cat_output)
381 return retStruct
list[CatalogExposureInputs] build_catexps(self, butlerQC, inputRefs, inputs)
pipeBase.Struct run(self, Iterable[CatalogExposureInputs] catexps, afwTable.SourceCatalog cat_ref)
pipeBase.Struct run(self, list[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)