23 "CatalogExposure",
"MultibandFitConfig",
"MultibandFitSubConfig",
"MultibandFitSubTask",
27from abc
import ABC, abstractmethod
28from dataclasses
import dataclass, field
31import lsst.daf.butler
as dafButler
32from lsst.obs.base
import ExposureIdInfo
35import lsst.pipe.base.connectionTypes
as cT
36from typing
import Dict, Iterable, List, Optional, Set
39@dataclass(frozen=True)
41 """A class to store a catalog, exposure, and metadata for a given dataId.
43 This class is intended
to store an exposure and an associated measurement
44 catalog. There are no checks to ensure this, so repurpose responsibly.
48 return self.dataId[
'band']
51 def calib(self) -> Optional[afwImage.PhotoCalib]:
52 return None if self.exposure
is None else self.exposure.getPhotoCalib()
56 dataId: dafButler.DataCoordinate
57 id_tract_patch: Optional[int] = 0
58 metadata: Dict =
field(default_factory=dict)
61 if 'band' not in self.dataId:
62 raise ValueError(f
'dataId={self.dataId} must have a band')
65multibandFitBaseTemplates = {
66 "name_input_coadd":
"deep",
67 "name_output_coadd":
"deep",
68 "name_output_cat":
"fit",
73 pipeBase.PipelineTaskConnections,
74 dimensions=(
"tract",
"patch",
"skymap"),
75 defaultTemplates=multibandFitBaseTemplates,
78 doc=
"Reference multiband source catalog",
79 name=
"{name_input_coadd}Coadd_ref",
80 storageClass=
"SourceCatalog",
81 dimensions=(
"tract",
"patch",
"skymap"),
84 doc=
"Deblended single-band source catalogs",
85 name=
"{name_input_coadd}Coadd_meas",
86 storageClass=
"SourceCatalog",
88 dimensions=(
"tract",
"patch",
"band",
"skymap"),
91 doc=
"Exposures on which to run fits",
92 name=
"{name_input_coadd}Coadd_calexp",
93 storageClass=
"ExposureF",
95 dimensions=(
"tract",
"patch",
"band",
"skymap"),
97 cat_output = cT.Output(
98 doc=
"Measurement multi-band catalog",
99 name=
"{name_output_coadd}Coadd_{name_output_cat}",
100 storageClass=
"SourceCatalog",
101 dimensions=(
"tract",
"patch",
"skymap"),
103 cat_ref_schema = cT.InitInput(
104 doc=
"Schema associated with a ref source catalog",
105 storageClass=
"SourceCatalog",
106 name=
"{name_input_coadd}Coadd_ref_schema",
108 cat_output_schema = cT.InitOutput(
109 doc=
"Output of the schema used in deblending task",
110 name=
"{name_output_coadd}Coadd_{name_output_cat}_schema",
111 storageClass=
"SourceCatalog"
115 """Validates the `lsst.daf.butler.DatasetRef` bands against the
116 subtask's list of bands to fit and drops unnecessary bands.
121 Dictionary whose keys are an input (regular or prerequisite)
122 connection name
and whose values are a tuple of the connection
123 instance
and a collection of associated `DatasetRef` objects.
124 The exact type of the nested collections
is unspecified; it can be
125 assumed to be multi-
pass iterable
and support `len`
and ``
in``, but
126 it should
not be mutated
in place. In contrast, the outer
127 dictionaries are guaranteed to be temporary copies that are true
128 `dict` instances,
and hence may be modified
and even returned; this
129 is especially useful
for delegating to `super` (see notes below).
131 Mapping of output datasets,
with the same structure
as ``inputs``.
133 Label
for this task
in the pipeline (should be used
in all
134 diagnostic messages).
135 data_id : `lsst.daf.butler.DataCoordinate`
136 Data ID
for this quantum
in the pipeline (should be used
in all
137 diagnostic messages).
141 adjusted_inputs : `Mapping`
142 Mapping of the same form
as ``inputs``
with updated containers of
143 input `DatasetRef` objects. All inputs involving the
'band'
144 dimension are adjusted to put them
in consistent order
and remove
146 adjusted_outputs : `Mapping`
147 Mapping of updated output datasets; always empty
for this task.
151 lsst.pipe.base.NoWorkFound
152 Raised
if there are
not enough of the right bands to run the task
156 bands_fit, bands_read_only = self.config.get_band_sets()
157 bands_needed = bands_fit.union(bands_read_only)
160 for connection_name, (connection, dataset_refs)
in inputs.items():
162 if 'band' in connection.dimensions:
163 datasets_by_band = {dref.dataId[
'band']: dref
for dref
in dataset_refs}
164 if not bands_needed.issubset(datasets_by_band.keys()):
165 raise pipeBase.NoWorkFound(
166 f
'DatasetRefs={dataset_refs} have data with bands in the'
167 f
' set={set(datasets_by_band.keys())},'
168 f
' which is not a superset of the required bands={bands_needed} defined by'
169 f
' {self.config.__class__}.fit_multiband='
170 f
'{self.config.fit_multiband._value.__class__}\'s attributes'
171 f
' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
172 f
' Add the required bands={bands_needed.difference(datasets_by_band.keys())}.'
176 adjusted_inputs[connection_name] = (
178 [datasets_by_band[band]
for band
in bands_needed]
182 inputs.update(adjusted_inputs)
184 return adjusted_inputs, {}
188 """Config class for the MultibandFitTask to define methods returning
189 values that depend on multiple config settings.
193 """Return the set of bands that the Task needs to read (e.g. for
194 defining priors) but not necessarily fit.
198 The set of such bands.
204 """An abstract interface for subtasks of MultibandFitTask to perform
205 multiband fitting of deblended sources.
210 The input schema for the reference source catalog, used to initialize
213 Additional arguments to be passed to the `lsst.pipe.base.Task`
216 ConfigClass = MultibandFitSubConfig
218 def __init__(self, schema: afwTable.Schema, **kwargs):
224 ) -> pipeBase.Struct:
225 """Fit sources from a reference catalog using data from multiple
226 exposures in the same patch.
230 catexps : `typing.List [CatalogExposure]`
231 A list of catalog-exposure pairs
in a given band.
233 A reference source catalog to fit.
237 retStruct : `lsst.pipe.base.Struct`
238 A struct
with a cat_output attribute containing the output
243 Subclasses may have further requirements on the input parameters,
245 - Passing only one catexp per band;
246 - Catalogs containing HeavyFootprints
with deblended images;
247 - Fitting only a subset of the sources.
248 If any requirements are
not met, the subtask should fail
as soon
as
251 raise NotImplementedError()
256 raise NotImplementedError()
260 pipeBase.PipelineTaskConfig,
261 pipelineConnections=MultibandFitConnections,
263 """Configure a MultibandFitTask, including a configurable fitting subtask.
265 fit_multiband = pexConfig.ConfigurableField(
266 target=MultibandFitSubTask,
267 doc="Task to fit sources using multiple bands",
271 """Get the set of bands required by the fit_multiband subtask.
276 The set of bands that the subtask will fit.
277 bands_read_only : `set`
278 The set of bands that the subtask will only read data
279 (measurement catalog and exposure)
for.
283 except AttributeError:
284 raise RuntimeError(f
'{__class__}.fit_multiband must have bands_fit attribute')
from None
286 return set(bands_fit),
set(bands_read_only)
290 """Fit deblended exposures in multiple bands simultaneously.
292 It is generally assumed but
not enforced (
except optionally by the
293 configurable `fit_multiband` subtask) that there
is only one exposure
294 per band, presumably a coadd.
296 ConfigClass = MultibandFitConfig
297 _DefaultName = "multibandFit"
299 def __init__(self, initInputs, **kwargs):
300 super().__init__(initInputs=initInputs, **kwargs)
301 self.makeSubtask(
"fit_multiband", schema=initInputs[
"cat_ref_schema"].schema)
305 inputs = butlerQC.get(inputRefs)
306 id_tp = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"tract_patch").expId
307 input_refs_objs = [(inputRefs.cats_meas, inputs[
'cats_meas']), (inputRefs.coadds, inputs[
'coadds'])]
309 {dRef.dataId: obj
for dRef, obj
in zip(refs, objs)}
310 for refs, objs
in input_refs_objs
312 dataIds =
set(cats).union(
set(exps))
315 catalog=cats.get(dataId), exposure=exps.get(dataId), dataId=dataId, id_tract_patch=id_tp,
317 for dataId
in dataIds
319 outputs = self.
run(catexps=catexps, cat_ref=inputs[
'cat_ref'])
320 butlerQC.put(outputs, outputRefs)
323 raise RuntimeError(f
'{__class__}.config.fit_multiband.run schema != initOutput schema:'
324 f
' {outputs.cat_output.schema} vs {self.cat_output_schema.schema}')
327 """Fit sources from a reference catalog using data from multiple
328 exposures in the same region (patch).
332 catexps : `typing.List [CatalogExposure]`
333 A list of catalog-exposure pairs
in a given band.
335 A reference source catalog to fit.
339 retStruct : `lsst.pipe.base.Struct`
340 A struct
with a cat_output attribute containing the output
345 Subtasks may have further requirements; see `MultibandFitSubTask.run`.
347 cat_output = self.fit_multiband.run(catexps, cat_ref).output
348 retStruct = pipeBase.Struct(cat_output=cat_output)
A class to contain the data, WCS, and other information needed to describe an image of the sky.
Defines the fields and offsets for a table.
def adjustQuantum(self, inputs, outputs, label, data_id)
Set bands_read_only(self)
def __init__(self, afwTable.Schema schema, **kwargs)
pipeBase.Struct run(self, List[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertySet * set