LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
fit_multiband.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 __all__ = [
23  "CatalogExposure", "MultibandFitConfig", "MultibandFitSubConfig", "MultibandFitSubTask",
24  "MultibandFitTask",
25 ]
26 
27 from abc import ABC, abstractmethod
28 from dataclasses import dataclass, field
29 import lsst.afw.image as afwImage
30 import lsst.afw.table as afwTable
31 import lsst.daf.butler as dafButler
32 from lsst.obs.base import ExposureIdInfo
33 import lsst.pex.config as pexConfig
34 import lsst.pipe.base as pipeBase
36 from typing import Dict, Iterable, List, Optional, Set
37 
38 
39 @dataclass(frozen=True)
41  """A class to store a catalog, exposure, and metadata for a given dataId.
42 
43  This class is intended to store an exposure and an associated measurement
44  catalog. There are no checks to ensure this, so repurpose responsibly.
45  """
46  @property
47  def band(self) -> str:
48  return self.dataId['band']
49 
50  @property
51  def calib(self) -> Optional[afwImage.PhotoCalib]:
52  return None if self.exposure is None else self.exposure.getPhotoCalib()
53 
54  catalog: Optional[afwTable.SourceCatalog]
55  exposure: Optional[afwImage.Exposure]
56  dataId: dafButler.DataCoordinate
57  id_tract_patch: Optional[int] = 0
58  metadata: Dict = field(default_factory=dict)
59 
60  def __post_init__(self):
61  if 'band' not in self.dataId:
62  raise ValueError(f'dataId={self.dataId} must have a band')
63 
64 
65 multibandFitBaseTemplates = {
66  "name_input_coadd": "deep",
67  "name_output_coadd": "deep",
68  "name_output_cat": "fit",
69 }
70 
71 
73  pipeBase.PipelineTaskConnections,
74  dimensions=("tract", "patch", "skymap"),
75  defaultTemplates=multibandFitBaseTemplates,
76 ):
77  cat_ref = cT.Input(
78  doc="Reference multiband source catalog",
79  name="{name_input_coadd}Coadd_ref",
80  storageClass="SourceCatalog",
81  dimensions=("tract", "patch", "skymap"),
82  )
83  cats_meas = cT.Input(
84  doc="Deblended single-band source catalogs",
85  name="{name_input_coadd}Coadd_meas",
86  storageClass="SourceCatalog",
87  multiple=True,
88  dimensions=("tract", "patch", "band", "skymap"),
89  )
90  coadds = cT.Input(
91  doc="Exposures on which to run fits",
92  name="{name_input_coadd}Coadd_calexp",
93  storageClass="ExposureF",
94  multiple=True,
95  dimensions=("tract", "patch", "band", "skymap"),
96  )
97  cat_output = cT.Output(
98  doc="Measurement multi-band catalog",
99  name="{name_output_coadd}Coadd_{name_output_cat}",
100  storageClass="SourceCatalog",
101  dimensions=("tract", "patch", "skymap"),
102  )
103  cat_ref_schema = cT.InitInput(
104  doc="Schema associated with a ref source catalog",
105  storageClass="SourceCatalog",
106  name="{name_input_coadd}Coadd_ref_schema",
107  )
108  cat_output_schema = cT.InitOutput(
109  doc="Output of the schema used in deblending task",
110  name="{name_output_coadd}Coadd_{name_output_cat}_schema",
111  storageClass="SourceCatalog"
112  )
113 
114  def adjustQuantum(self, datasetRefMap):
115  """Validates the `lsst.daf.butler.DatasetRef` bands against the
116  subtask's list of bands to fit and drops unnecessary bands.
117 
118  Parameters
119  ----------
120  datasetRefMap : `NamedKeyDict`
121  Mapping from dataset type to a `set` of
122  `lsst.daf.butler.DatasetRef` objects
123 
124  Returns
125  -------
126  datasetRefMap : `NamedKeyDict`
127  Modified mapping of input with possibly adjusted
128  `lsst.daf.butler.DatasetRef` objects.
129 
130  Raises
131  ------
132  ValueError
133  Raised if any of the per-band datasets have an inconsistent band
134  set, or if the band set to fit is not a subset of the data bands.
135 
136  """
137  datasetRefMap = super().adjustQuantum(datasetRefMap)
138  # Check which bands are going to be fit
139  bands_fit, bands_read_only = self.config.get_band_sets()
140  bands_needed = bands_fit.union(bands_read_only)
141 
142  bands_data = None
143  bands_extra = set()
144 
145  for type_d, ref_d in datasetRefMap.items():
146  # Datasets without bands in their dimensions should be fine
147  if 'band' in type_d.dimensions:
148  bands_set = {dref.dataId['band'] for dref in ref_d}
149  if bands_data is None:
150  bands_data = bands_set
151  if bands_needed != bands_data:
152  if not bands_needed.issubset(bands_data):
153  raise ValueError(
154  f'Datarefs={ref_d} have data with bands in the set={bands_set},'
155  f'which is not a subset of the required bands={bands_needed} defined by '
156  f'{self.config.__class__}.fit_multiband='
157  f'{self.config.fit_multiband._value.__class__}\'s attributes'
158  f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
159  f' Add the required bands={bands_needed.difference(bands_data)}.'
160  )
161  else:
162  bands_extra = bands_data.difference(bands_needed)
163  elif bands_set != bands_data:
164  raise ValueError(
165  f'Datarefs={ref_d} have data with bands in the set={bands_set}'
166  f' which differs from the previous={bands_data}); bandsets must be identical.'
167  )
168  if bands_extra:
169  for dref in ref_d:
170  if dref.dataId['band'] in bands_extra:
171  ref_d.remove(dref)
172  return datasetRefMap
173 
174 
175 class MultibandFitSubConfig(pexConfig.Config):
176  """Config class for the MultibandFitTask to define methods returning
177  values that depend on multiple config settings.
178 
179  """
180  def bands_read_only(self) -> Set:
181  """Return the set of bands that the Task needs to read (e.g. for
182  defining priors) but not necessarily fit.
183 
184  Returns
185  -------
186  The set of such bands.
187  """
188  return set()
189 
190 
191 class MultibandFitSubTask(pipeBase.Task, ABC):
192  """An abstract interface for subtasks of MultibandFitTask to perform
193  multiband fitting of deblended sources.
194 
195  Parameters
196  ----------
197  schema : `lsst.afw.table.Schema`
198  The input schema for the reference source catalog, used to initialize
199  the output schema.
200  **kwargs
201  Additional arguments to be passed to the `lsst.pipe.base.Task`
202  constructor.
203  """
204  ConfigClass = MultibandFitSubConfig
205 
206  def __init__(self, schema: afwTable.Schema, **kwargs):
207  super().__init__(**kwargs)
208 
209  @abstractmethod
210  def run(
211  self, catexps: Iterable[CatalogExposure], cat_ref: afwTable.SourceCatalog
212  ) -> pipeBase.Struct:
213  """Fit sources from a reference catalog using data from multiple
214  exposures in the same patch.
215 
216  Parameters
217  ----------
218  catexps : `typing.List [CatalogExposure]`
219  A list of catalog-exposure pairs in a given band.
220  cat_ref : `lsst.afw.table.SourceCatalog`
221  A reference source catalog to fit.
222 
223  Returns
224  -------
225  retStruct : `lsst.pipe.base.Struct`
226  A struct with a cat_output attribute containing the output
227  measurement catalog.
228 
229  Notes
230  -----
231  Subclasses may have further requirements on the input parameters,
232  including:
233  - Passing only one catexp per band;
234  - Catalogs containing HeavyFootprints with deblended images;
235  - Fitting only a subset of the sources.
236  If any requirements are not met, the subtask should fail as soon as
237  possible.
238  """
239  raise NotImplementedError()
240 
241  @property
242  @abstractmethod
243  def schema(self) -> afwTable.Schema:
244  raise NotImplementedError()
245 
246 
248  pipeBase.PipelineTaskConfig,
249  pipelineConnections=MultibandFitConnections,
250 ):
251  """Configure a MultibandFitTask, including a configurable fitting subtask.
252  """
253  fit_multiband = pexConfig.ConfigurableField(
254  target=MultibandFitSubTask,
255  doc="Task to fit sources using multiple bands",
256  )
257 
258  def get_band_sets(self):
259  """Get the set of bands required by the fit_multiband subtask.
260 
261  Returns
262  -------
263  bands_fit : `set`
264  The set of bands that the subtask will fit.
265  bands_read_only : `set`
266  The set of bands that the subtask will only read data
267  (measurement catalog and exposure) for.
268  """
269  try:
270  bands_fit = self.fit_multibandfit_multiband.bands_fit
271  except AttributeError:
272  raise RuntimeError(f'{__class__}.fit_multiband must have bands_fit attribute') from None
273  bands_read_only = self.fit_multibandfit_multiband.bands_read_only()
274  return set(bands_fit), set(bands_read_only)
275 
276 
277 class MultibandFitTask(pipeBase.PipelineTask):
278  """Fit deblended exposures in multiple bands simultaneously.
279 
280  It is generally assumed but not enforced (except optionally by the
281  configurable `fit_multiband` subtask) that there is only one exposure
282  per band, presumably a coadd.
283  """
284  ConfigClass = MultibandFitConfig
285  _DefaultName = "multibandFit"
286 
287  def __init__(self, initInputs, **kwargs):
288  super().__init__(initInputs=initInputs, **kwargs)
289  self.makeSubtask("fit_multiband", schema=initInputs["cat_ref_schema"].schema)
290  self.cat_output_schemacat_output_schema = afwTable.SourceCatalog(self.fit_multiband.schema)
291 
292  def runQuantum(self, butlerQC, inputRefs, outputRefs):
293  inputs = butlerQC.get(inputRefs)
294  id_tp = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, "tract_patch").expId
295  input_refs_objs = [(inputRefs.cats_meas, inputs['cats_meas']), (inputRefs.coadds, inputs['coadds'])]
296  cats, exps = [
297  {dRef.dataId: obj for dRef, obj in zip(refs, objs)}
298  for refs, objs in input_refs_objs
299  ]
300  dataIds = set(cats).union(set(exps))
301  catexps = [
303  catalog=cats.get(dataId), exposure=exps.get(dataId), dataId=dataId, id_tract_patch=id_tp,
304  )
305  for dataId in dataIds
306  ]
307  outputs = self.runrun(catexps=catexps, cat_ref=inputs['cat_ref'])
308  butlerQC.put(outputs, outputRefs)
309  # Validate the output catalog's schema and raise if inconsistent (after output to allow debugging)
310  if outputs.cat_output.schema != self.cat_output_schemacat_output_schema.schema:
311  raise RuntimeError(f'{__class__}.config.fit_multiband.run schema != initOutput schema:'
312  f' {outputs.cat_output.schema} vs {self.cat_output_schema.schema}')
313 
314  def run(self, catexps: List[CatalogExposure], cat_ref: afwTable.SourceCatalog) -> pipeBase.Struct:
315  """Fit sources from a reference catalog using data from multiple
316  exposures in the same region (patch).
317 
318  Parameters
319  ----------
320  catexps : `typing.List [CatalogExposure]`
321  A list of catalog-exposure pairs in a given band.
322  cat_ref : `lsst.afw.table.SourceCatalog`
323  A reference source catalog to fit.
324 
325  Returns
326  -------
327  retStruct : `lsst.pipe.base.Struct`
328  A struct with a cat_output attribute containing the output
329  measurement catalog.
330 
331  Notes
332  -----
333  Subtasks may have further requirements; see `MultibandFitSubTask.run`.
334  """
335  cat_output = self.fit_multiband.run(catexps, cat_ref).output
336  retStruct = pipeBase.Struct(cat_output=cat_output)
337  return retStruct
A class to contain the data, WCS, and other information needed to describe an image of the sky.
Definition: Exposure.h:72
Optional[afwImage.PhotoCalib] calib(self)
pipeBase.Struct run(self, Iterable[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
def __init__(self, afwTable.Schema schema, **kwargs)
pipeBase.Struct run(self, List[CatalogExposure] catexps, afwTable.SourceCatalog cat_ref)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def __init__(self, initInputs, **kwargs)
daf::base::PropertySet * set
Definition: fits.cc:912
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.