23 "CoaddMultibandFitConfig",
"CoaddMultibandFitSubConfig",
"CoaddMultibandFitSubTask",
24 "CoaddMultibandFitTask",
27from .fit_multiband
import CatalogExposure, CatalogExposureConfig
33import lsst.pipe.base.connectionTypes
as cT
36from abc
import ABC, abstractmethod
37from pydantic
import Field
38from pydantic.dataclasses
import dataclass
39from typing
import Iterable
41CoaddMultibandFitBaseTemplates = {
43 "name_method":
"multiprofit",
47@dataclass(frozen=True, kw_only=True, config=CatalogExposureConfig)
49 table_psf_fits: astropy.table.Table = Field(title=
"A table of PSF fit parameters for each source")
56 pipeBase.PipelineTaskConnections,
57 dimensions=(
"tract",
"patch",
"skymap"),
58 defaultTemplates=CoaddMultibandFitBaseTemplates,
61 doc=
"Reference multiband source catalog",
62 name=
"{name_coadd}Coadd_ref",
63 storageClass=
"SourceCatalog",
64 dimensions=(
"tract",
"patch",
"skymap"),
67 doc=
"Deblended single-band source catalogs",
68 name=
"{name_coadd}Coadd_meas",
69 storageClass=
"SourceCatalog",
70 dimensions=(
"tract",
"patch",
"band",
"skymap"),
74 doc=
"Exposures on which to run fits",
75 name=
"{name_coadd}Coadd_calexp",
76 storageClass=
"ExposureF",
77 dimensions=(
"tract",
"patch",
"band",
"skymap"),
80 models_psf = cT.Input(
81 doc=
"Input PSF model parameter catalog",
83 name=
"{name_coadd}Coadd_psfs_{name_method}",
84 storageClass=
"ArrowAstropy",
85 dimensions=(
"tract",
"patch",
"band",
"skymap"),
88 models_scarlet = pipeBase.connectionTypes.Input(
89 doc=
"Multiband scarlet models produced by the deblender",
90 name=
"{name_coadd}Coadd_scarletModelData",
91 storageClass=
"ScarletModelData",
92 dimensions=(
"tract",
"patch",
"skymap"),
94 cat_output = cT.Output(
95 doc=
"Output source model fit parameter catalog",
96 name=
"{name_coadd}Coadd_objects_{name_method}",
97 storageClass=
"ArrowTable",
98 dimensions=(
"tract",
"patch",
"skymap"),
102 """Validates the `lsst.daf.butler.DatasetRef` bands against the
103 subtask's list of bands to fit and drops unnecessary bands.
108 Dictionary whose keys are an input (regular or prerequisite)
109 connection name
and whose values are a tuple of the connection
110 instance
and a collection of associated `DatasetRef` objects.
111 The exact type of the nested collections
is unspecified; it can be
112 assumed to be multi-
pass iterable
and support `len`
and ``
in``, but
113 it should
not be mutated
in place. In contrast, the outer
114 dictionaries are guaranteed to be temporary copies that are true
115 `dict` instances,
and hence may be modified
and even returned; this
116 is especially useful
for delegating to `super` (see notes below).
118 Mapping of output datasets,
with the same structure
as ``inputs``.
120 Label
for this task
in the pipeline (should be used
in all
121 diagnostic messages).
122 data_id : `lsst.daf.butler.DataCoordinate`
123 Data ID
for this quantum
in the pipeline (should be used
in all
124 diagnostic messages).
128 adjusted_inputs : `Mapping`
129 Mapping of the same form
as ``inputs``
with updated containers of
130 input `DatasetRef` objects. All inputs involving the
'band'
131 dimension are adjusted to put them
in consistent order
and remove
133 adjusted_outputs : `Mapping`
134 Mapping of updated output datasets; always empty
for this task.
138 lsst.pipe.base.NoWorkFound
139 Raised
if there are
not enough of the right bands to run the task
143 bands_fit, bands_read_only = self.config.get_band_sets()
144 bands_needed = bands_fit.union(bands_read_only)
147 for connection_name, (connection, dataset_refs)
in inputs.items():
149 if 'band' in connection.dimensions:
150 datasets_by_band = {dref.dataId[
'band']: dref
for dref
in dataset_refs}
151 if not bands_needed.issubset(datasets_by_band.keys()):
152 raise pipeBase.NoWorkFound(
153 f
'DatasetRefs={dataset_refs} have data with bands in the'
154 f
' set={set(datasets_by_band.keys())},'
155 f
' which is not a superset of the required bands={bands_needed} defined by'
156 f
' {self.config.__class__}.fit_coadd_multiband='
157 f
'{self.config.fit_coadd_multiband._value.__class__}\'s attributes'
158 f
' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
159 f
' Add the required bands={bands_needed.difference(datasets_by_band.keys())}.'
163 adjusted_inputs[connection_name] = (
165 [datasets_by_band[band]
for band
in bands_needed]
169 inputs.update(adjusted_inputs)
171 return adjusted_inputs, {}
175 """Configuration for implementing fitter subtasks.
179 """Return the set of bands that the Task needs to read (e.g. for
180 defining priors) but not necessarily fit.
184 The set of such bands.
188class CoaddMultibandFitSubTask(pipeBase.Task, ABC):
189 """Subtask interface for multiband fitting of deblended sources.
194 Additional arguments to be passed to the `lsst.pipe.base.Task`
197 ConfigClass = CoaddMultibandFitSubConfig
205 ) -> pipeBase.Struct:
206 """Fit models to deblended sources from multi-band inputs.
210 catexps : `typing.List [CatalogExposureInputs]`
211 A list of catalog-exposure pairs with metadata
in a given band.
213 A reference source catalog to fit.
217 retStruct : `lsst.pipe.base.Struct`
218 A struct
with a cat_output attribute containing the output
223 Subclasses may have further requirements on the input parameters,
225 - Passing only one catexp per band;
226 - Catalogs containing HeavyFootprints
with deblended images;
227 - Fitting only a subset of the sources.
228 If any requirements are
not met, the subtask should fail
as soon
as
233class CoaddMultibandFitConfig(
234 pipeBase.PipelineTaskConfig,
235 pipelineConnections=CoaddMultibandFitConnections,
237 """Configure a CoaddMultibandFitTask, including a configurable fitting subtask.
239 fit_coadd_multiband = pexConfig.ConfigurableField(
240 target=CoaddMultibandFitSubTask,
241 doc="Task to fit sources using multiple bands",
243 idGenerator = SkyMapIdGeneratorConfig.make_field()
246 """Get the set of bands required by the fit_coadd_multiband subtask.
251 The set of bands that the subtask will fit.
252 bands_read_only : `set`
253 The set of bands that the subtask will only read data
254 (measurement catalog and exposure)
for.
258 except AttributeError:
259 raise RuntimeError(f
'{__class__}.fit_coadd_multiband must have bands_fit attribute')
from None
261 return set(bands_fit),
set(bands_read_only)
265 """Fit deblended exposures in multiple bands simultaneously.
267 It is generally assumed but
not enforced (
except optionally by the
268 configurable `fit_coadd_multiband` subtask) that there
is only one exposure
269 per band, presumably a coadd.
271 ConfigClass = CoaddMultibandFitConfig
272 _DefaultName = "CoaddMultibandFit"
274 def __init__(self, initInputs, **kwargs):
275 super().__init__(initInputs=initInputs, **kwargs)
276 self.makeSubtask(
"fit_coadd_multiband")
279 inputs = butlerQC.get(inputRefs)
280 id_tp = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
282 input_refs_objs = [(getattr(inputRefs, key), inputs[key])
283 for key
in (
"cats_meas",
"coadds",
"models_psf")]
284 cats, exps, models_psf = [
285 {dRef.dataId: obj
for dRef, obj
in zip(refs, objs)}
286 for refs, objs
in input_refs_objs
288 dataIds =
set(cats).union(
set(exps))
289 models_scarlet = inputs[
"models_scarlet"]
290 catexps = [
None]*len(dataIds)
291 for idx, dataId
in enumerate(dataIds):
292 catalog = cats[dataId]
293 exposure = exps[dataId]
294 models_scarlet.updateCatalogFootprints(
297 psfModel=exposure.getPsf(),
298 redistributeImage=exposure.image,
299 removeScarletData=
True,
300 updateFluxColumns=
False,
303 catalog=catalog, exposure=exposure, table_psf_fits=models_psf[dataId],
304 dataId=dataId, id_tract_patch=id_tp,
306 outputs = self.
run(catexps=catexps, cat_ref=inputs[
'cat_ref'])
307 butlerQC.put(outputs, outputRefs)
310 """Fit sources from a reference catalog using data from multiple
311 exposures in the same region (patch).
315 catexps : `typing.List [CatalogExposure]`
316 A list of catalog-exposure pairs
in a given band.
318 A reference source catalog to fit.
322 retStruct : `lsst.pipe.base.Struct`
323 A struct
with a cat_output attribute containing the output
328 Subtasks may have further requirements; see `CoaddMultibandFitSubTask.run`.
330 cat_output = self.fit_coadd_multiband.run(catalog_multi=cat_ref, catexps=catexps).output
331 retStruct = pipeBase.Struct(cat_output=cat_output)
adjustQuantum(self, inputs, outputs, label, data_id)
set bands_read_only(self)
runQuantum(self, butlerQC, inputRefs, outputRefs)
pipeBase.Struct run(self, list[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
daf::base::PropertySet * set