23 "CatalogExposure",
"MultibandFitConfig",
"MultibandFitSubConfig",
"MultibandFitSubTask",
27 from abc
import ABC, abstractmethod
28 from dataclasses
import dataclass, field
31 import lsst.daf.butler
as dafButler
36 from typing
import Dict, Iterable, List, Optional, Set
39 @dataclass(frozen=True)
41 """A class to store a catalog, exposure, and metadata for a given dataId.
43 This class is intended to store an exposure and an associated measurement
44 catalog. There are no checks to ensure this, so repurpose responsibly.
48 return self.dataId[
'band']
51 def calib(self) -> Optional[afwImage.PhotoCalib]:
52 return None if self.exposure
is None else self.exposure.getPhotoCalib()
56 dataId: dafButler.DataCoordinate
57 id_tract_patch: Optional[int] = 0
58 metadata: Dict =
field(default_factory=dict)
61 if 'band' not in self.dataId:
62 raise ValueError(f
'dataId={self.dataId} must have a band')
65 multibandFitBaseTemplates = {
66 "name_input_coadd":
"deep",
67 "name_output_coadd":
"deep",
68 "name_output_cat":
"fit",
73 pipeBase.PipelineTaskConnections,
74 dimensions=(
"tract",
"patch",
"skymap"),
75 defaultTemplates=multibandFitBaseTemplates,
78 doc=
"Reference multiband source catalog",
79 name=
"{name_input_coadd}Coadd_ref",
80 storageClass=
"SourceCatalog",
81 dimensions=(
"tract",
"patch",
"skymap"),
84 doc=
"Deblended single-band source catalogs",
85 name=
"{name_input_coadd}Coadd_meas",
86 storageClass=
"SourceCatalog",
88 dimensions=(
"tract",
"patch",
"band",
"skymap"),
91 doc=
"Exposures on which to run fits",
92 name=
"{name_input_coadd}Coadd_calexp",
93 storageClass=
"ExposureF",
95 dimensions=(
"tract",
"patch",
"band",
"skymap"),
97 cat_output = cT.Output(
98 doc=
"Measurement multi-band catalog",
99 name=
"{name_output_coadd}Coadd_{name_output_cat}",
100 storageClass=
"SourceCatalog",
101 dimensions=(
"tract",
"patch",
"skymap"),
103 cat_ref_schema = cT.InitInput(
104 doc=
"Schema associated with a ref source catalog",
105 storageClass=
"SourceCatalog",
106 name=
"{name_input_coadd}Coadd_ref_schema",
108 cat_output_schema = cT.InitOutput(
109 doc=
"Output of the schema used in deblending task",
110 name=
"{name_output_coadd}Coadd_{name_output_cat}_schema",
111 storageClass=
"SourceCatalog"
115 """Validates the `lsst.daf.butler.DatasetRef` bands against the
116 subtask's list of bands to fit and drops unnecessary bands.
120 datasetRefMap : `NamedKeyDict`
121 Mapping from dataset type to a `set` of
122 `lsst.daf.butler.DatasetRef` objects
126 datasetRefMap : `NamedKeyDict`
127 Modified mapping of input with possibly adjusted
128 `lsst.daf.butler.DatasetRef` objects.
133 Raised if any of the per-band datasets have an inconsistent band
134 set, or if the band set to fit is not a subset of the data bands.
139 bands_fit, bands_read_only = self.config.get_band_sets()
140 bands_needed = bands_fit.union(bands_read_only)
145 for type_d, ref_d
in datasetRefMap.items():
147 if 'band' in type_d.dimensions:
148 bands_set = {dref.dataId[
'band']
for dref
in ref_d}
149 if bands_data
is None:
150 bands_data = bands_set
151 if bands_needed != bands_data:
152 if not bands_needed.issubset(bands_data):
154 f
'Datarefs={ref_d} have data with bands in the set={bands_set},'
155 f
'which is not a subset of the required bands={bands_needed} defined by '
156 f
'{self.config.__class__}.fit_multiband='
157 f
'{self.config.fit_multiband._value.__class__}\'s attributes'
158 f
' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
159 f
' Add the required bands={bands_needed.difference(bands_data)}.'
162 bands_extra = bands_data.difference(bands_needed)
163 elif bands_set != bands_data:
165 f
'Datarefs={ref_d} have data with bands in the set={bands_set}'
166 f
' which differs from the previous={bands_data}); bandsets must be identical.'
170 if dref.dataId[
'band']
in bands_extra:
176 """Config class for the MultibandFitTask to define methods returning
177 values that depend on multiple config settings.
181 """Return the set of bands that the Task needs to read (e.g. for
182 defining priors) but not necessarily fit.
186 The set of such bands.
192 """An abstract interface for subtasks of MultibandFitTask to perform
193 multiband fitting of deblended sources.
197 schema : `lsst.afw.table.Schema`
198 The input schema for the reference source catalog, used to initialize
201 Additional arguments to be passed to the `lsst.pipe.base.Task`
204 ConfigClass = MultibandFitSubConfig
206 def __init__(self, schema: afwTable.Schema, **kwargs):
212 ) -> pipeBase.Struct:
213 """Fit sources from a reference catalog using data from multiple
214 exposures in the same patch.
218 catexps : `typing.List [CatalogExposure]`
219 A list of catalog-exposure pairs in a given band.
220 cat_ref : `lsst.afw.table.SourceCatalog`
221 A reference source catalog to fit.
225 retStruct : `lsst.pipe.base.Struct`
226 A struct with a cat_output attribute containing the output
231 Subclasses may have further requirements on the input parameters,
233 - Passing only one catexp per band;
234 - Catalogs containing HeavyFootprints with deblended images;
235 - Fitting only a subset of the sources.
236 If any requirements are not met, the subtask should fail as soon as
239 raise NotImplementedError()
244 raise NotImplementedError()
248 pipeBase.PipelineTaskConfig,
249 pipelineConnections=MultibandFitConnections,
251 """Configure a MultibandFitTask, including a configurable fitting subtask.
253 fit_multiband = pexConfig.ConfigurableField(
254 target=MultibandFitSubTask,
255 doc=
"Task to fit sources using multiple bands",
259 """Get the set of bands required by the fit_multiband subtask.
264 The set of bands that the subtask will fit.
265 bands_read_only : `set`
266 The set of bands that the subtask will only read data
267 (measurement catalog and exposure) for.
271 except AttributeError:
272 raise RuntimeError(f
'{__class__}.fit_multiband must have bands_fit attribute')
from None
273 bands_read_only = self.
fit_multibandfit_multiband.bands_read_only()
274 return set(bands_fit),
set(bands_read_only)
278 """Fit deblended exposures in multiple bands simultaneously.
280 It is generally assumed but not enforced (except optionally by the
281 configurable `fit_multiband` subtask) that there is only one exposure
282 per band, presumably a coadd.
284 ConfigClass = MultibandFitConfig
285 _DefaultName =
"multibandFit"
288 super().
__init__(initInputs=initInputs, **kwargs)
289 self.makeSubtask(
"fit_multiband", schema=initInputs[
"cat_ref_schema"].schema)
293 inputs = butlerQC.get(inputRefs)
294 id_tp = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"tract_patch").expId
295 input_refs_objs = [(inputRefs.cats_meas, inputs[
'cats_meas']), (inputRefs.coadds, inputs[
'coadds'])]
297 {dRef.dataId: obj
for dRef, obj
in zip(refs, objs)}
298 for refs, objs
in input_refs_objs
300 dataIds =
set(cats).union(
set(exps))
303 catalog=cats.get(dataId), exposure=exps.get(dataId), dataId=dataId, id_tract_patch=id_tp,
305 for dataId
in dataIds
307 outputs = self.
runrun(catexps=catexps, cat_ref=inputs[
'cat_ref'])
308 butlerQC.put(outputs, outputRefs)
311 raise RuntimeError(f
'{__class__}.config.fit_multiband.run schema != initOutput schema:'
312 f
' {outputs.cat_output.schema} vs {self.cat_output_schema.schema}')
315 """Fit sources from a reference catalog using data from multiple
316 exposures in the same region (patch).
320 catexps : `typing.List [CatalogExposure]`
321 A list of catalog-exposure pairs in a given band.
322 cat_ref : `lsst.afw.table.SourceCatalog`
323 A reference source catalog to fit.
327 retStruct : `lsst.pipe.base.Struct`
328 A struct with a cat_output attribute containing the output
333 Subtasks may have further requirements; see `MultibandFitSubTask.run`.
335 cat_output = self.fit_multiband.
run(catexps, cat_ref).output
336 retStruct = pipeBase.Struct(cat_output=cat_output)
A class to contain the data, WCS, and other information needed to describe an image of the sky.
Optional[afwImage.PhotoCalib] calib(self)
def adjustQuantum(self, datasetRefMap)
Set bands_read_only(self)
pipeBase.Struct run(self, Iterable[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
afwTable.Schema schema(self)
def __init__(self, afwTable.Schema schema, **kwargs)
pipeBase.Struct run(self, List[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def __init__(self, initInputs, **kwargs)
daf::base::PropertySet * set
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.