LSST Applications g042eb84c57+730a74494b,g04e9c324dd+8c5ae1fdc5,g134cb467dc+1f1e3e7524,g199a45376c+0ba108daf9,g1fd858c14a+fa7d31856b,g210f2d0738+f66ac109ec,g262e1987ae+83a3acc0e5,g29ae962dfc+d856a2cb1f,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+a1e0c9f713,g47891489e3+0d594cb711,g4d44eb3520+c57ec8f3ed,g4d7b6aa1c5+f66ac109ec,g53246c7159+8c5ae1fdc5,g56a1a4eaf3+fd7ad03fde,g64539dfbff+f66ac109ec,g67b6fd64d1+0d594cb711,g67fd3c3899+f66ac109ec,g6985122a63+0d594cb711,g74acd417e5+3098891321,g786e29fd12+668abc6043,g81db2e9a8d+98e2ab9f28,g87389fa792+8856018cbb,g89139ef638+0d594cb711,g8d7436a09f+80fda9ce03,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+a8a29bda4b,g99822b682c+e3ec3c61f9,g9d5c6a246b+0d5dac0c3d,ga41d0fce20+9243b26dd2,gbf99507273+8c5ae1fdc5,gd7ef33dd92+0d594cb711,gdab6d2f7ff+3098891321,ge410e46f29+0d594cb711,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.38
LSST Data Management Base Package
Loading...
Searching...
No Matches
match_tract_catalog.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 'MatchTractCatalogSubConfig', 'MatchTractCatalogSubTask',
24 'MatchTractCatalogConfig', 'MatchTractCatalogTask'
25]
26
27import lsst.afw.geom as afwGeom
28import lsst.pex.config as pexConfig
29import lsst.pipe.base as pipeBase
30import lsst.pipe.base.connectionTypes as cT
31from lsst.skymap import BaseSkyMap
32
33from abc import ABC, abstractmethod
34
35import astropy.table
36import pandas as pd
37from typing import Tuple, Set
38
39
40MatchTractCatalogBaseTemplates = {
41 "name_input_cat_ref": "truth_summary",
42 "name_input_cat_target": "objectTable_tract",
43}
44
45
47 pipeBase.PipelineTaskConnections,
48 dimensions=("tract", "skymap"),
49 defaultTemplates=MatchTractCatalogBaseTemplates,
50):
51 cat_ref = cT.Input(
52 doc="Reference object catalog to match from",
53 name="{name_input_cat_ref}",
54 storageClass="ArrowAstropy",
55 dimensions=("tract", "skymap"),
56 deferLoad=True,
57 )
58 cat_target = cT.Input(
59 doc="Target object catalog to match",
60 name="{name_input_cat_target}",
61 storageClass="ArrowAstropy",
62 dimensions=("tract", "skymap"),
63 deferLoad=True,
64 )
65 skymap = cT.Input(
66 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
68 storageClass="SkyMap",
69 dimensions=("skymap",),
70 )
71 cat_output_ref = cT.Output(
72 doc="Reference matched catalog with indices of target matches",
73 name="match_ref_{name_input_cat_ref}_{name_input_cat_target}",
74 storageClass="ArrowAstropy",
75 dimensions=("tract", "skymap"),
76 )
77 cat_output_target = cT.Output(
78 doc="Target matched catalog with indices of reference matches",
79 name="match_target_{name_input_cat_ref}_{name_input_cat_target}",
80 storageClass="ArrowAstropy",
81 dimensions=("tract", "skymap"),
82 )
83
84 def __init__(self, *, config=None):
85 if config.refcat_sharding_type != "tract":
86 if config.refcat_sharding_type == "none":
87 old = self.cat_ref
88 self.cat_ref = cT.Input(
89 doc=old.doc,
90 name=old.name,
91 storageClass=old.storageClass,
92 dimensions=(),
93 deferLoad=old.deferLoad,
94 )
95 else:
96 raise NotImplementedError(f"{config.refcat_sharding_type=} not implemented")
97 if config.target_sharding_type != "tract":
98 if config.target_sharding_type == "none":
99 old = self.cat_target
100 self.cat_target = cT.Input(
101 doc=old.doc,
102 name=old.name,
103 storageClass=old.storageClass,
104 dimensions=(),
105 deferLoad=old.deferLoad,
106 )
107 else:
108 raise NotImplementedError(f"{config.target_sharding_type=} not implemented")
109
110
111class MatchTractCatalogSubConfig(pexConfig.Config):
112 """Config class for the MatchTractCatalogSubTask to define methods returning
113 values that depend on multiple config settings.
114 """
115 @property
116 @abstractmethod
117 def columns_in_ref(self) -> Set[str]:
118 raise NotImplementedError()
119
120 @property
121 @abstractmethod
122 def columns_in_target(self) -> Set[str]:
123 raise NotImplementedError()
124
125
126class MatchTractCatalogSubTask(pipeBase.Task, ABC):
127 """An abstract interface for subtasks of MatchTractCatalogTask to match
128 two tract object catalogs.
129
130 Parameters
131 ----------
132 **kwargs
133 Additional arguments to be passed to the `lsst.pipe.base.Task`
134 constructor.
135 """
136 ConfigClass = MatchTractCatalogSubConfig
137
138 def __init__(self, **kwargs):
139 super().__init__(**kwargs)
140
141 @abstractmethod
142 def run(
143 self,
144 catalog_ref: pd.DataFrame | astropy.table.Table,
145 catalog_target: pd.DataFrame | astropy.table.Table,
146 wcs: afwGeom.SkyWcs = None,
147 ) -> pipeBase.Struct:
148 """Match sources in a reference tract catalog with a target catalog.
149
150 Parameters
151 ----------
152 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
153 A reference catalog to match objects/sources from.
154 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
155 A target catalog to match reference objects/sources to.
156 wcs : `lsst.afw.image.SkyWcs`
157 A coordinate system to convert catalog positions to sky coordinates.
158
159 Returns
160 -------
161 retStruct : `lsst.pipe.base.Struct`
162 A struct with output_ref and output_target attribute containing the
163 output matched catalogs.
164 """
165 raise NotImplementedError()
166
167
169 pipeBase.PipelineTaskConfig,
170 pipelineConnections=MatchTractCatalogConnections,
171):
172 """Configure a MatchTractCatalogTask, including a configurable matching subtask.
173 """
174 match_tract_catalog = pexConfig.ConfigurableField(
175 target=MatchTractCatalogSubTask,
176 doc="Task to match sources in a reference tract catalog with a target catalog",
177 )
178 refcat_sharding_type = pexConfig.ChoiceField[str](
179 doc="The type of sharding (spatial splitting) for the reference catalog",
180 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
181 default="tract",
182 )
183 target_sharding_type = pexConfig.ChoiceField[str](
184 doc="The type of sharding (spatial splitting) for the target catalog",
185 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
186 default="tract",
187 )
188
189 def get_columns_in(self) -> Tuple[Set, Set]:
190 """Get the set of input columns required for matching.
191
192 Returns
193 -------
194 columns_ref : `set` [`str`]
195 The set of required input catalog column names.
196 columns_target : `set` [`str`]
197 The set of required target catalog column names.
198 """
199 try:
200 columns_ref, columns_target = (self.match_tract_catalog.columns_in_ref,
201 self.match_tract_catalog.columns_in_target)
202 except AttributeError as err:
203 raise RuntimeError(f'{__class__}.match_tract_catalog must have columns_in_ref and'
204 f' columns_in_target attributes: {err}') from None
205 return set(columns_ref), set(columns_target)
206
207
208class MatchTractCatalogTask(pipeBase.PipelineTask):
209 """Match sources in a reference tract catalog with those in a target catalog.
210 """
211 ConfigClass = MatchTractCatalogConfig
212 _DefaultName = "MatchTractCatalog"
213
214 def __init__(self, initInputs, **kwargs):
215 super().__init__(initInputs=initInputs, **kwargs)
216 self.makeSubtask("match_tract_catalog")
217
218 def runQuantum(self, butlerQC, inputRefs, outputRefs):
219 inputs = butlerQC.get(inputRefs)
220 columns_ref, columns_target = self.config.get_columns_in()
221 skymap = inputs.pop("skymap")
222
223 outputs = self.run(
224 catalog_ref=inputs['cat_ref'].get(parameters={'columns': columns_ref}),
225 catalog_target=inputs['cat_target'].get(parameters={'columns': columns_target}),
226 wcs=skymap[butlerQC.quantum.dataId["tract"]].wcs,
227 )
228 butlerQC.put(outputs, outputRefs)
229
230 def run(
231 self,
232 catalog_ref: pd.DataFrame | astropy.table.Table,
233 catalog_target: pd.DataFrame | astropy.table.Table,
234 wcs: afwGeom.SkyWcs = None,
235 ) -> pipeBase.Struct:
236 """Match sources in a reference tract catalog with a target catalog.
237
238 Parameters
239 ----------
240 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
241 A reference catalog to match objects/sources from.
242 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
243 A target catalog to match reference objects/sources to.
244 wcs : `lsst.afw.image.SkyWcs`
245 A coordinate system to convert catalog positions to sky coordinates,
246 if necessary.
247
248 Returns
249 -------
250 retStruct : `lsst.pipe.base.Struct`
251 A struct with output_ref and output_target attribute containing the
252 output matched catalogs.
253 """
254 output = self.match_tract_catalog.run(catalog_ref, catalog_target, wcs=wcs)
255 if output.exceptions:
256 self.log.warning('Exceptions: %s', output.exceptions)
257 retStruct = pipeBase.Struct(cat_output_ref=output.cat_output_ref,
258 cat_output_target=output.cat_output_target)
259 return retStruct
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition SkyWcs.h:118
pipeBase.Struct run(self, pd.DataFrame|astropy.table.Table catalog_ref, pd.DataFrame|astropy.table.Table catalog_target, afwGeom.SkyWcs wcs=None)
pipeBase.Struct run(self, pd.DataFrame|astropy.table.Table catalog_ref, pd.DataFrame|astropy.table.Table catalog_target, afwGeom.SkyWcs wcs=None)