LSST Applications 28.0.0,g1653933729+a8ce1bb630,g1a997c3884+a8ce1bb630,g28da252d5a+5bd70b7e6d,g2bbee38e9b+638fca75ac,g2bc492864f+638fca75ac,g3156d2b45e+07302053f8,g347aa1857d+638fca75ac,g35bb328faa+a8ce1bb630,g3a166c0a6a+638fca75ac,g3e281a1b8c+7bbb0b2507,g4005a62e65+17cd334064,g414038480c+5b5cd4fff3,g41af890bb2+4ffae9de63,g4e1a3235cc+0f1912dca3,g6249c6f860+3c3976f90c,g80478fca09+46aba80bd6,g82479be7b0+77990446f6,g858d7b2824+78ba4d1ce1,g89c8672015+f667a5183b,g9125e01d80+a8ce1bb630,ga5288a1d22+2a6264e9ca,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc22bb204ba+78ba4d1ce1,gc28159a63d+638fca75ac,gcf0d15dbbd+32ddb6096f,gd6b7c0dfd1+3e339405e9,gda3e153d99+78ba4d1ce1,gda6a2b7d83+32ddb6096f,gdaeeff99f8+1711a396fd,gdd5a9049c5+b18c39e5e3,ge2409df99d+a5e4577cdc,ge33fd446bb+78ba4d1ce1,ge79ae78c31+638fca75ac,gf0baf85859+64e8883e75,gf5289d68f6+e1b046a8d7,gfa443fc69c+91d9ed1ecf,gfda6b12a05+8419469a56
LSST Data Management Base Package
Loading...
Searching...
No Matches
fit_coadd_multiband.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 "CoaddMultibandFitConfig", "CoaddMultibandFitConnections", "CoaddMultibandFitSubConfig",
24 "CoaddMultibandFitSubTask", "CoaddMultibandFitTask",
25]
26
27from .fit_multiband import CatalogExposure, CatalogExposureConfig
28
29import lsst.afw.table as afwTable
30from lsst.meas.base import SkyMapIdGeneratorConfig
31from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
32import lsst.pex.config as pexConfig
33import lsst.pipe.base as pipeBase
34import lsst.pipe.base.connectionTypes as cT
35
36import astropy.table
37from abc import ABC, abstractmethod
38from pydantic import Field
39from pydantic.dataclasses import dataclass
40from typing import Iterable
41
42CoaddMultibandFitBaseTemplates = {
43 "name_coadd": "deep",
44 "name_method": "multiprofit",
45 "name_table": "objects",
46}
47
48
49@dataclass(frozen=True, kw_only=True, config=CatalogExposureConfig)
51 table_psf_fits: astropy.table.Table = Field(title="A table of PSF fit parameters for each source")
52
53 def get_catalog(self):
54 return self.catalog
55
56
58 pipeBase.PipelineTaskConnections,
59 dimensions=("tract", "patch", "skymap"),
60 defaultTemplates=CoaddMultibandFitBaseTemplates,
61):
62 cat_ref = cT.Input(
63 doc="Reference multiband source catalog",
64 name="{name_coadd}Coadd_ref",
65 storageClass="SourceCatalog",
66 dimensions=("tract", "patch", "skymap"),
67 )
68 cats_meas = cT.Input(
69 doc="Deblended single-band source catalogs",
70 name="{name_coadd}Coadd_meas",
71 storageClass="SourceCatalog",
72 dimensions=("tract", "patch", "band", "skymap"),
73 multiple=True,
74 )
75 coadds = cT.Input(
76 doc="Exposures on which to run fits",
77 name="{name_coadd}Coadd_calexp",
78 storageClass="ExposureF",
79 dimensions=("tract", "patch", "band", "skymap"),
80 multiple=True,
81 )
82 models_psf = cT.Input(
83 doc="Input PSF model parameter catalog",
84 # Consider allowing independent psf fit method
85 name="{name_coadd}Coadd_psfs_{name_method}",
86 storageClass="ArrowAstropy",
87 dimensions=("tract", "patch", "band", "skymap"),
88 multiple=True,
89 )
90 models_scarlet = pipeBase.connectionTypes.Input(
91 doc="Multiband scarlet models produced by the deblender",
92 name="{name_coadd}Coadd_scarletModelData",
93 storageClass="ScarletModelData",
94 dimensions=("tract", "patch", "skymap"),
95 )
96
97 def adjustQuantum(self, inputs, outputs, label, data_id):
98 """Validates the `lsst.daf.butler.DatasetRef` bands against the
99 subtask's list of bands to fit and drops unnecessary bands.
100
101 Parameters
102 ----------
103 inputs : `dict`
104 Dictionary whose keys are an input (regular or prerequisite)
105 connection name and whose values are a tuple of the connection
106 instance and a collection of associated `DatasetRef` objects.
107 The exact type of the nested collections is unspecified; it can be
108 assumed to be multi-pass iterable and support `len` and ``in``, but
109 it should not be mutated in place. In contrast, the outer
110 dictionaries are guaranteed to be temporary copies that are true
111 `dict` instances, and hence may be modified and even returned; this
112 is especially useful for delegating to `super` (see notes below).
113 outputs : `Mapping`
114 Mapping of output datasets, with the same structure as ``inputs``.
115 label : `str`
116 Label for this task in the pipeline (should be used in all
117 diagnostic messages).
118 data_id : `lsst.daf.butler.DataCoordinate`
119 Data ID for this quantum in the pipeline (should be used in all
120 diagnostic messages).
121
122 Returns
123 -------
124 adjusted_inputs : `Mapping`
125 Mapping of the same form as ``inputs`` with updated containers of
126 input `DatasetRef` objects. All inputs involving the 'band'
127 dimension are adjusted to put them in consistent order and remove
128 unneeded bands.
129 adjusted_outputs : `Mapping`
130 Mapping of updated output datasets; always empty for this task.
131
132 Raises
133 ------
134 lsst.pipe.base.NoWorkFound
135 Raised if there are not enough of the right bands to run the task
136 on this quantum.
137 """
138 # Check which bands are going to be fit
139 bands_fit, bands_read_only = self.config.get_band_sets()
140 bands_needed = bands_fit + [band for band in bands_read_only if band not in bands_fit]
141
142 adjusted_inputs = {}
143 for connection_name, (connection, dataset_refs) in inputs.items():
144 # Datasets without bands in their dimensions should be fine
145 if 'band' in connection.dimensions:
146 datasets_by_band = {dref.dataId['band']: dref for dref in dataset_refs}
147 if not set(bands_needed).issubset(datasets_by_band.keys()):
148 raise pipeBase.NoWorkFound(
149 f'DatasetRefs={dataset_refs} have data with bands in the'
150 f' set={set(datasets_by_band.keys())},'
151 f' which is not a superset of the required bands={bands_needed} defined by'
152 f' {self.config.__class__}.fit_coadd_multiband='
153 f'{self.config.fit_coadd_multiband._value.__class__}\'s attributes'
154 f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
155 f' Add the required bands={set(bands_needed).difference(datasets_by_band.keys())}.'
156 )
157 # Adjust all datasets with band dimensions to include just
158 # the needed bands, in consistent order.
159 adjusted_inputs[connection_name] = (
160 connection,
161 [datasets_by_band[band] for band in bands_needed]
162 )
163
164 # Delegate to super for more checks.
165 inputs.update(adjusted_inputs)
166 super().adjustQuantum(inputs, outputs, label, data_id)
167 return adjusted_inputs, {}
168
169 def __init__(self, *, config=None):
170 if config.drop_psf_connection:
171 del self.models_psf
172
173
175 cat_output = cT.Output(
176 doc="Output source model fit parameter catalog",
177 name="{name_coadd}Coadd_{name_table}_{name_method}",
178 storageClass="ArrowTable",
179 dimensions=("tract", "patch", "skymap"),
180 )
181
182
183class CoaddMultibandFitSubConfig(pexConfig.Config):
184 """Configuration for implementing fitter subtasks.
185 """
186 @abstractmethod
187 def bands_read_only(self) -> set:
188 """Return the set of bands that the Task needs to read (e.g. for
189 defining priors) but not necessarily fit.
190
191 Returns
192 -------
193 The set of such bands.
194 """
195
196
197class CoaddMultibandFitSubTask(pipeBase.Task, ABC):
198 """Subtask interface for multiband fitting of deblended sources.
199
200 Parameters
201 ----------
202 **kwargs
203 Additional arguments to be passed to the `lsst.pipe.base.Task`
204 constructor.
205 """
206 ConfigClass = CoaddMultibandFitSubConfig
207
208 def __init__(self, **kwargs):
209 super().__init__(**kwargs)
210
211 @abstractmethod
212 def run(
213 self, catexps: Iterable[CatalogExposureInputs], cat_ref: afwTable.SourceCatalog
214 ) -> pipeBase.Struct:
215 """Fit models to deblended sources from multi-band inputs.
216
217 Parameters
218 ----------
219 catexps : `typing.List [CatalogExposureInputs]`
220 A list of catalog-exposure pairs with metadata in a given band.
221 cat_ref : `lsst.afw.table.SourceCatalog`
222 A reference source catalog to fit.
223
224 Returns
225 -------
226 retStruct : `lsst.pipe.base.Struct`
227 A struct with a cat_output attribute containing the output
228 measurement catalog.
229
230 Notes
231 -----
232 Subclasses may have further requirements on the input parameters,
233 including:
234 - Passing only one catexp per band;
235 - Catalogs containing HeavyFootprints with deblended images;
236 - Fitting only a subset of the sources.
237 If any requirements are not met, the subtask should fail as soon as
238 possible.
239 """
240
241
243 pipeBase.PipelineTaskConfig,
244 pipelineConnections=CoaddMultibandFitInputConnections,
245):
246 """Base class for multiband fitting."""
247
248 drop_psf_connection = pexConfig.Field[bool](
249 doc="Whether to drop the PSF model connection, e.g. because PSF parameters are in the input catalog",
250 default=False,
251 )
252 fit_coadd_multiband = pexConfig.ConfigurableField(
253 target=CoaddMultibandFitSubTask,
254 doc="Task to fit sources using multiple bands",
255 )
256 idGenerator = SkyMapIdGeneratorConfig.make_field()
257
258 def get_band_sets(self):
259 """Get the set of bands required by the fit_coadd_multiband subtask.
260
261 Returns
262 -------
263 bands_fit : `set`
264 The set of bands that the subtask will fit.
265 bands_read_only : `set`
266 The set of bands that the subtask will only read data
267 (measurement catalog and exposure) for.
268 """
269 try:
270 bands_fit = self.fit_coadd_multiband.bands_fit
271 except AttributeError:
272 raise RuntimeError(f'{__class__}.fit_coadd_multiband must have bands_fit attribute') from None
273 bands_read_only = self.fit_coadd_multiband.bands_read_only()
274 return tuple(list({band: None for band in bands}.keys()) for bands in (bands_fit, bands_read_only))
275
276
278 CoaddMultibandFitBaseConfig,
279 pipelineConnections=CoaddMultibandFitConnections,
280):
281 """Configuration for a CoaddMultibandFitTask."""
282
283
285 """Base class for tasks that fit or rebuild multiband models.
286
287 This class only implements data reconstruction.
288 """
289
290 def build_catexps(self, butlerQC, inputRefs, inputs) -> list[CatalogExposureInputs]:
291 id_tp = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
292 # This is a roundabout way of ensuring all inputs get sorted and matched
293 keys = ["cats_meas", "coadds"]
294 has_psf_models = "models_psf" in inputs
295 if has_psf_models:
296 keys.append("models_psf")
297 input_refs_objs = ((getattr(inputRefs, key), inputs[key]) for key in keys)
298 inputs_sorted = tuple(
299 {dRef.dataId: obj for dRef, obj in zip(refs, objs)}
300 for refs, objs in input_refs_objs
301 )
302 cats = inputs_sorted[0]
303 exps = inputs_sorted[1]
304 models_psf = inputs_sorted[2] if has_psf_models else None
305 dataIds = set(cats).union(set(exps))
306 models_scarlet = inputs["models_scarlet"]
307 catexps = {}
308 for dataId in dataIds:
309 catalog = cats[dataId]
310 exposure = exps[dataId]
311 updateCatalogFootprints(
312 modelData=models_scarlet,
313 catalog=catalog,
314 band=dataId['band'],
315 imageForRedistribution=exposure,
316 removeScarletData=True,
317 updateFluxColumns=False,
318 )
319 catexps[dataId['band']] = CatalogExposureInputs(
320 catalog=catalog,
321 exposure=exposure,
322 table_psf_fits=models_psf[dataId] if has_psf_models else astropy.table.Table(),
323 dataId=dataId,
324 id_tract_patch=id_tp,
325 )
326 catexps = [catexps[band] for band in self.config.get_band_sets()[0]]
327 return catexps
328
329
330class CoaddMultibandFitTask(CoaddMultibandFitBase, pipeBase.PipelineTask):
331 """Fit deblended exposures in multiple bands simultaneously.
332
333 It is generally assumed but not enforced (except optionally by the
334 configurable `fit_coadd_multiband` subtask) that there is only one exposure
335 per band, presumably a coadd.
336 """
337
338 ConfigClass = CoaddMultibandFitConfig
339 _DefaultName = "coaddMultibandFit"
340
341 def __init__(self, initInputs, **kwargs):
342 super().__init__(initInputs=initInputs, **kwargs)
343 self.makeSubtask("fit_coadd_multiband")
344
345 def runQuantum(self, butlerQC, inputRefs, outputRefs):
346 inputs = butlerQC.get(inputRefs)
347 catexps = self.build_catexps(butlerQC, inputRefs, inputs)
348 outputs = self.run(catexps=catexps, cat_ref=inputs['cat_ref'])
349 butlerQC.put(outputs, outputRefs)
350
351 def run(self, catexps: list[CatalogExposure], cat_ref: afwTable.SourceCatalog) -> pipeBase.Struct:
352 """Fit sources from a reference catalog using data from multiple
353 exposures in the same region (patch).
354
355 Parameters
356 ----------
357 catexps : `typing.List [CatalogExposure]`
358 A list of catalog-exposure pairs in a given band.
359 cat_ref : `lsst.afw.table.SourceCatalog`
360 A reference source catalog to fit.
361
362 Returns
363 -------
364 retStruct : `lsst.pipe.base.Struct`
365 A struct with a cat_output attribute containing the output
366 measurement catalog.
367
368 Notes
369 -----
370 Subtasks may have further requirements; see `CoaddMultibandFitSubTask.run`.
371 """
372 cat_output = self.fit_coadd_multiband.run(catalog_multi=cat_ref, catexps=catexps).output
373 retStruct = pipeBase.Struct(cat_output=cat_output)
374 return retStruct
list[CatalogExposureInputs] build_catexps(self, butlerQC, inputRefs, inputs)
pipeBase.Struct run(self, Iterable[CatalogExposureInputs] catexps, afwTable.SourceCatalog cat_ref)
pipeBase.Struct run(self, list[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)