LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+1b3060144d,g18429d2f64+f642bf4753,g199a45376c+0ba108daf9,g1fd858c14a+2dcf163641,g262e1987ae+7b8c96d2ca,g29ae962dfc+3bd6ecb08a,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53e1a9e7c5,g4595892280+fef73a337f,g47891489e3+2efcf17695,g4d44eb3520+642b70b07e,g53246c7159+8c5ae1fdc5,g67b6fd64d1+2efcf17695,g67fd3c3899+b70e05ef52,g74acd417e5+317eb4c7d4,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+2efcf17695,g8d7436a09f+3be3c13596,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+a4e7b16d9b,g97be763408+ad77d7208f,g9dd6db0277+b70e05ef52,ga681d05dcb+a3f46e7fff,gabf8522325+735880ea63,gac2eed3f23+2efcf17695,gb89ab40317+2efcf17695,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+b70e05ef52,gdab6d2f7ff+317eb4c7d4,gdc713202bf+b70e05ef52,gdfd2d52018+b10e285e0f,ge365c994fd+310e8507c4,ge410e46f29+2efcf17695,geaed405ab2+562b3308c0,gffca2db377+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
fit_coadd_multiband.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 "CoaddMultibandFitConfig", "CoaddMultibandFitConnections", "CoaddMultibandFitSubConfig",
24 "CoaddMultibandFitSubTask", "CoaddMultibandFitTask",
25]
26
27from .fit_multiband import CatalogExposure, CatalogExposureConfig
28
29import lsst.afw.table as afwTable
30from lsst.meas.base import SkyMapIdGeneratorConfig
31from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
32import lsst.pex.config as pexConfig
33import lsst.pipe.base as pipeBase
34import lsst.pipe.base.connectionTypes as cT
35
36import astropy.table
37from abc import ABC, abstractmethod
38from pydantic import Field
39from pydantic.dataclasses import dataclass
40from typing import Iterable
41
42CoaddMultibandFitBaseTemplates = {
43 "name_coadd": "deep",
44 "name_method": "multiprofit",
45 "name_table": "objects",
46}
47
48
49@dataclass(frozen=True, kw_only=True, config=CatalogExposureConfig)
51 table_psf_fits: astropy.table.Table = Field(title="A table of PSF fit parameters for each source")
52
53 def get_catalog(self):
54 return self.catalog
55
56
58 pipeBase.PipelineTaskConnections,
59 dimensions=("tract", "patch", "skymap"),
60 defaultTemplates=CoaddMultibandFitBaseTemplates,
61):
62 cat_ref = cT.Input(
63 doc="Reference multiband source catalog",
64 name="{name_coadd}Coadd_ref",
65 storageClass="SourceCatalog",
66 dimensions=("tract", "patch", "skymap"),
67 )
68 cats_meas = cT.Input(
69 doc="Deblended single-band source catalogs",
70 name="{name_coadd}Coadd_meas",
71 storageClass="SourceCatalog",
72 dimensions=("tract", "patch", "band", "skymap"),
73 multiple=True,
74 )
75 coadds = cT.Input(
76 doc="Exposures on which to run fits",
77 name="{name_coadd}Coadd_calexp",
78 storageClass="ExposureF",
79 dimensions=("tract", "patch", "band", "skymap"),
80 multiple=True,
81 )
82 coadds_cell = cT.Input(
83 doc="Cell-coadd exposures on which to run fits",
84 name="{name_coadd}CoaddCell",
85 storageClass="MultipleCellCoadd",
86 dimensions=("tract", "patch", "band", "skymap"),
87 multiple=True,
88 )
89 backgrounds = cT.Input(
90 doc="Background models to subtract from the coadds_cell",
91 name="{name_coadd}Coadd_calexp_background",
92 storageClass="Background",
93 dimensions=("tract", "patch", "band", "skymap"),
94 multiple=True,
95 )
96 models_psf = cT.Input(
97 doc="Input PSF model parameter catalog",
98 # Consider allowing independent psf fit method
99 name="{name_coadd}Coadd_psfs_{name_method}",
100 storageClass="ArrowAstropy",
101 dimensions=("tract", "patch", "band", "skymap"),
102 multiple=True,
103 )
104 models_scarlet = pipeBase.connectionTypes.Input(
105 doc="Multiband scarlet models produced by the deblender",
106 name="{name_coadd}Coadd_scarletModelData",
107 storageClass="ScarletModelData",
108 dimensions=("tract", "patch", "skymap"),
109 )
110
111 def adjustQuantum(self, inputs, outputs, label, data_id):
112 """Validates the `lsst.daf.butler.DatasetRef` bands against the
113 subtask's list of bands to fit and drops unnecessary bands.
114
115 Parameters
116 ----------
117 inputs : `dict`
118 Dictionary whose keys are an input (regular or prerequisite)
119 connection name and whose values are a tuple of the connection
120 instance and a collection of associated `DatasetRef` objects.
121 The exact type of the nested collections is unspecified; it can be
122 assumed to be multi-pass iterable and support `len` and ``in``, but
123 it should not be mutated in place. In contrast, the outer
124 dictionaries are guaranteed to be temporary copies that are true
125 `dict` instances, and hence may be modified and even returned; this
126 is especially useful for delegating to `super` (see notes below).
127 outputs : `Mapping`
128 Mapping of output datasets, with the same structure as ``inputs``.
129 label : `str`
130 Label for this task in the pipeline (should be used in all
131 diagnostic messages).
132 data_id : `lsst.daf.butler.DataCoordinate`
133 Data ID for this quantum in the pipeline (should be used in all
134 diagnostic messages).
135
136 Returns
137 -------
138 adjusted_inputs : `Mapping`
139 Mapping of the same form as ``inputs`` with updated containers of
140 input `DatasetRef` objects. All inputs involving the 'band'
141 dimension are adjusted to put them in consistent order and remove
142 unneeded bands.
143 adjusted_outputs : `Mapping`
144 Mapping of updated output datasets; always empty for this task.
145
146 Raises
147 ------
148 lsst.pipe.base.NoWorkFound
149 Raised if there are not enough of the right bands to run the task
150 on this quantum.
151 """
152 # Check which bands are going to be fit
153 bands_fit, bands_read_only = self.config.get_band_sets()
154 bands_needed = bands_fit + [band for band in bands_read_only if band not in bands_fit]
155 bands_needed_set = set(bands_needed)
156
157 adjusted_inputs = {}
158 bands_found, connection_first = None, None
159 for connection_name, (connection, dataset_refs) in inputs.items():
160 # Datasets without bands in their dimensions should be fine
161 if 'band' in connection.dimensions:
162 datasets_by_band = {dref.dataId['band']: dref for dref in dataset_refs}
163 bands_set = set(datasets_by_band.keys())
164 if self.config.allow_missing_bands:
165 # Use the first dataset found as the reference since all
166 # dataset types with band should have the same bands
167 # This will only break if one of the calexp/meas datasets
168 # is missing from a given band, which would surely be an
169 # upstream problem anyway
170 if bands_found is None:
171 bands_found, connection_first = bands_set, connection_name
172 if len(bands_found) == 0:
173 raise pipeBase.NoWorkFound(
174 f'DatasetRefs={dataset_refs} for {connection_name=} is empty'
175 )
176 elif not set(bands_read_only).issubset(bands_set):
177 raise pipeBase.NoWorkFound(
178 f'DatasetRefs={dataset_refs} has {bands_set=} which is missing at least one'
179 f' of {bands_read_only=}'
180 )
181 # Put the bands to fit first, then any other bands
182 # needed for initialization/priors only last
183 bands_needed = [band for band in bands_fit if band in bands_found] + [
184 band for band in bands_read_only if band not in bands_found
185 ]
186 elif bands_found != bands_set:
187 raise RuntimeError(
188 f'DatasetRefs={dataset_refs} with {connection_name=} has {bands_set=} !='
189 f' {bands_found=} from {connection_first=}'
190 )
191 # All configured bands are treated as necessary
192 elif not bands_needed_set.issubset(bands_set):
193 raise pipeBase.NoWorkFound(
194 f'DatasetRefs={dataset_refs} have data with bands in the'
195 f' set={set(datasets_by_band.keys())},'
196 f' which is not a superset of the required bands={bands_needed} defined by'
197 f' {self.config.__class__}.fit_coadd_multiband='
198 f'{self.config.fit_coadd_multiband._value.__class__}\'s attributes'
199 f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
200 f' Add the required bands={set(bands_needed).difference(datasets_by_band.keys())}.'
201 )
202 # Adjust all datasets with band dimensions to include just
203 # the needed bands, in consistent order.
204 adjusted_inputs[connection_name] = (
205 connection,
206 [datasets_by_band[band] for band in bands_needed]
207 )
208
209 # Delegate to super for more checks.
210 inputs.update(adjusted_inputs)
211 super().adjustQuantum(inputs, outputs, label, data_id)
212 return adjusted_inputs, {}
213
214 def __init__(self, *, config=None):
215 super().__init__(config=config)
216 assert isinstance(config, CoaddMultibandFitBaseConfig)
217
218 if config.drop_psf_connection:
219 del self.models_psf
220
221 if config.use_cell_coadds:
222 del self.coadds
223 else:
224 del self.coadds_cell
225 del self.backgrounds
226
227
229 cat_output = cT.Output(
230 doc="Output source model fit parameter catalog",
231 name="{name_coadd}Coadd_{name_table}_{name_method}",
232 storageClass="ArrowTable",
233 dimensions=("tract", "patch", "skymap"),
234 )
235
236
237class CoaddMultibandFitSubConfig(pexConfig.Config):
238 """Configuration for implementing fitter subtasks.
239 """
240
241 bands_fit = pexConfig.ListField[str](
242 default=[],
243 doc="list of bandpass filters to fit",
244 listCheck=lambda x: (len(x) > 0) and (len(set(x)) == len(x)),
245 )
246
247 @abstractmethod
248 def bands_read_only(self) -> set:
249 """Return the set of bands that the Task needs to read (e.g. for
250 defining priors) but not necessarily fit.
251
252 Returns
253 -------
254 The set of such bands.
255 """
256
257
258class CoaddMultibandFitSubTask(pipeBase.Task, ABC):
259 """Subtask interface for multiband fitting of deblended sources.
260
261 Parameters
262 ----------
263 **kwargs
264 Additional arguments to be passed to the `lsst.pipe.base.Task`
265 constructor.
266 """
267 ConfigClass = CoaddMultibandFitSubConfig
268
269 def __init__(self, **kwargs):
270 super().__init__(**kwargs)
271
272 @abstractmethod
273 def run(
274 self, catexps: Iterable[CatalogExposureInputs], cat_ref: afwTable.SourceCatalog
275 ) -> pipeBase.Struct:
276 """Fit models to deblended sources from multi-band inputs.
277
278 Parameters
279 ----------
280 catexps : `typing.List [CatalogExposureInputs]`
281 A list of catalog-exposure pairs with metadata in a given band.
282 cat_ref : `lsst.afw.table.SourceCatalog`
283 A reference source catalog to fit.
284
285 Returns
286 -------
287 retStruct : `lsst.pipe.base.Struct`
288 A struct with a cat_output attribute containing the output
289 measurement catalog.
290
291 Notes
292 -----
293 Subclasses may have further requirements on the input parameters,
294 including:
295 - Passing only one catexp per band;
296 - Catalogs containing HeavyFootprints with deblended images;
297 - Fitting only a subset of the sources.
298 If any requirements are not met, the subtask should fail as soon as
299 possible.
300 """
301
302
304 pipeBase.PipelineTaskConfig,
305 pipelineConnections=CoaddMultibandFitInputConnections,
306):
307 """Base class for multiband fitting."""
308
309 allow_missing_bands = pexConfig.Field[bool](
310 doc="Whether to still fit even if some bands are missing",
311 default=True,
312 )
313 drop_psf_connection = pexConfig.Field[bool](
314 doc="Whether to drop the PSF model connection, e.g. because PSF parameters are in the input catalog",
315 default=False,
316 )
317 fit_coadd_multiband = pexConfig.ConfigurableField(
318 target=CoaddMultibandFitSubTask,
319 doc="Task to fit sources using multiple bands",
320 )
321 use_cell_coadds = pexConfig.Field[bool](
322 doc="Use cell coadd images for object fitting?",
323 default=False,
324 )
325 idGenerator = SkyMapIdGeneratorConfig.make_field()
326
327 def get_band_sets(self):
328 """Get the set of bands required by the fit_coadd_multiband subtask.
329
330 Returns
331 -------
332 bands_fit : `set`
333 The set of bands that the subtask will fit.
334 bands_read_only : `set`
335 The set of bands that the subtask will only read data
336 (measurement catalog and exposure) for.
337 """
338 try:
339 bands_fit = self.fit_coadd_multiband.bands_fit
340 except AttributeError:
341 raise RuntimeError(f'{__class__}.fit_coadd_multiband must have bands_fit attribute') from None
342 bands_read_only = self.fit_coadd_multiband.bands_read_only()
343 return tuple(list({band: None for band in bands}.keys()) for bands in (bands_fit, bands_read_only))
344
345
347 CoaddMultibandFitBaseConfig,
348 pipelineConnections=CoaddMultibandFitConnections,
349):
350 """Configuration for a CoaddMultibandFitTask."""
351
352
354 """Base class for tasks that fit or rebuild multiband models.
355
356 This class only implements data reconstruction.
357 """
358
359 def build_catexps(self, butlerQC, inputRefs, inputs) -> list[CatalogExposureInputs]:
360 id_tp = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
361 # This is a roundabout way of ensuring all inputs get sorted and matched
362 if self.config.use_cell_coadds:
363 keys = ["cats_meas", "coadds_cell", "backgrounds"]
364 else:
365 keys = ["cats_meas", "coadds"]
366 has_psf_models = "models_psf" in inputs
367 if has_psf_models:
368 keys.append("models_psf")
369 input_refs_objs = {key: (getattr(inputRefs, key), inputs[key]) for key in keys}
370 inputs_sorted = {
371 key: {dRef.dataId: obj for dRef, obj in zip(refs, objs, strict=True)}
372 for key, (refs, objs) in input_refs_objs.items()
373 }
374 cats = inputs_sorted["cats_meas"]
375 if self.config.use_cell_coadds:
376 exps = {}
377 for data_id, background in inputs_sorted["backgrounds"].items():
378 mcc = inputs_sorted["coadds_cell"][data_id]
379 stitched_coadd = mcc.stitch()
380 exposure = stitched_coadd.asExposure()
381 exposure.image -= background.getImage()
382 exps[data_id] = exposure
383 else:
384 exps = inputs_sorted["coadds"]
385 models_psf = inputs_sorted["models_psf"] if has_psf_models else None
386 dataIds = set(cats).union(set(exps))
387 models_scarlet = inputs["models_scarlet"]
388 catexp_dict = {}
389 dataId = None
390 for dataId in dataIds:
391 catalog = cats[dataId]
392 exposure = exps[dataId]
393 updateCatalogFootprints(
394 modelData=models_scarlet,
395 catalog=catalog,
396 band=dataId['band'],
397 imageForRedistribution=exposure,
398 removeScarletData=False,
399 updateFluxColumns=False,
400 )
401 catexp_dict[dataId['band']] = CatalogExposureInputs(
402 catalog=catalog,
403 exposure=exposure,
404 table_psf_fits=models_psf[dataId] if has_psf_models else astropy.table.Table(),
405 dataId=dataId,
406 id_tract_patch=id_tp,
407 )
408 # This shouldn't happen unless this is called with no inputs, but check anyway
409 if dataId is None:
410 raise RuntimeError(f"Did not build any catexps for {inputRefs=}")
411 catexps = []
412 for band in self.config.get_band_sets()[0]:
413 if band in catexp_dict:
414 catexp = catexp_dict[band]
415 else:
416 # Make a dummy catexp with a dataId if there's no data
417 # This should be handled by any subtasks
418 dataId_band = dataId.to_simple(minimal=True)
419 dataId_band.dataId["band"] = band
420 catexp = CatalogExposureInputs(
421 catalog=afwTable.SourceCatalog(),
422 exposure=None,
423 table_psf_fits=astropy.table.Table(),
424 dataId=dataId.from_simple(dataId_band, universe=dataId.universe),
425 id_tract_patch=id_tp,
426 )
427 catexps.append(catexp)
428 return catexps
429
430
431class CoaddMultibandFitTask(CoaddMultibandFitBase, pipeBase.PipelineTask):
432 """Fit deblended exposures in multiple bands simultaneously.
433
434 It is generally assumed but not enforced (except optionally by the
435 configurable `fit_coadd_multiband` subtask) that there is only one exposure
436 per band, presumably a coadd.
437 """
438
439 ConfigClass = CoaddMultibandFitConfig
440 _DefaultName = "coaddMultibandFit"
441
442 def __init__(self, initInputs, **kwargs):
443 super().__init__(initInputs=initInputs, **kwargs)
444 self.makeSubtask("fit_coadd_multiband")
445
446 def make_kwargs(self, butlerQC, inputRefs, inputs):
447 """Make any kwargs needed to be passed to run.
448
449 This method should be overloaded by subclasses that are configured to
450 use a specific subtask that needs additional arguments derived from
451 the inputs but do not otherwise need to overload runQuantum."""
452 return {}
453
454 def runQuantum(self, butlerQC, inputRefs, outputRefs):
455 inputs = butlerQC.get(inputRefs)
456 catexps = self.build_catexps(butlerQC, inputRefs, inputs)
457 if not self.config.allow_missing_bands and any([catexp is None for catexp in catexps]):
458 raise RuntimeError(
459 f"Got a None catexp with {self.config.allow_missing_band=}; NoWorkFound should have been"
460 f" raised earlier"
461 )
462 kwargs = self.make_kwargs(butlerQC, inputRefs, inputs)
463 outputs = self.run(catexps=catexps, cat_ref=inputs['cat_ref'], **kwargs)
464 butlerQC.put(outputs, outputRefs)
465
466 def run(
467 self,
468 catexps: list[CatalogExposure],
469 cat_ref: afwTable.SourceCatalog,
470 **kwargs
471 ) -> pipeBase.Struct:
472 """Fit sources from a reference catalog using data from multiple
473 exposures in the same region (patch).
474
475 Parameters
476 ----------
477 catexps : `typing.List [CatalogExposure]`
478 A list of catalog-exposure pairs in a given band.
479 cat_ref : `lsst.afw.table.SourceCatalog`
480 A reference source catalog to fit.
481
482 Returns
483 -------
484 retStruct : `lsst.pipe.base.Struct`
485 A struct with a cat_output attribute containing the output
486 measurement catalog.
487
488 Notes
489 -----
490 Subtasks may have further requirements; see `CoaddMultibandFitSubTask.run`.
491 """
492 cat_output = self.fit_coadd_multiband.run(catalog_multi=cat_ref, catexps=catexps, **kwargs).output
493 retStruct = pipeBase.Struct(cat_output=cat_output)
494 return retStruct
list[CatalogExposureInputs] build_catexps(self, butlerQC, inputRefs, inputs)
pipeBase.Struct run(self, Iterable[CatalogExposureInputs] catexps, afwTable.SourceCatalog cat_ref)
pipeBase.Struct run(self, list[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref, **kwargs)