Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+05816baf74,g1ec0fe41b4+f536777771,g1fd858c14a+a9301854fb,g35bb328faa+fcb1d3bbc8,g4af146b050+a5c07d5b1d,g4d2262a081+6e5fcc2a4e,g53246c7159+fcb1d3bbc8,g56a49b3a55+9c12191793,g5a012ec0e7+3632fc3ff3,g60b5630c4e+ded28b650d,g67b6fd64d1+ed4b5058f4,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g8352419a5c+fcb1d3bbc8,g87b7deb4dc+7b42cf88bf,g8852436030+e5453db6e6,g89139ef638+ed4b5058f4,g8e3bb8577d+d38d73bdbd,g9125e01d80+fcb1d3bbc8,g94187f82dc+ded28b650d,g989de1cb63+ed4b5058f4,g9d31334357+ded28b650d,g9f33ca652e+50a8019d8c,gabe3b4be73+1e0a283bba,gabf8522325+fa80ff7197,gb1101e3267+d9fb1f8026,gb58c049af0+f03b321e39,gb665e3612d+2a0c9e9e84,gb89ab40317+ed4b5058f4,gcf25f946ba+e5453db6e6,gd6cbbdb0b4+bb83cc51f8,gdd1046aedd+ded28b650d,gde0f65d7ad+941d412827,ge278dab8ac+d65b3c2b70,ge410e46f29+ed4b5058f4,gf23fb2af72+b7cae620c0,gf5e32f922b+fcb1d3bbc8,gf67bdafdda+ed4b5058f4,w.2025.16
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
diff_matched_tract_catalog.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 'DiffMatchedTractCatalogConfig', 'DiffMatchedTractCatalogTask', 'MatchedCatalogFluxesConfig',
24]
25
26import lsst.afw.geom as afwGeom
27from lsst.meas.astrom.matcher_probabilistic import ConvertCatalogCoordinatesConfig
29import lsst.pex.config as pexConfig
30import lsst.pipe.base as pipeBase
31import lsst.pipe.base.connectionTypes as cT
32from lsst.skymap import BaseSkyMap
33from lsst.daf.butler import DatasetProvenance
34
35import astropy.table
36import astropy.units as u
37import numpy as np
38import pandas as pd
39from smatch.matcher import sphdist
40from typing import Sequence
41import warnings
42
43
44def is_sequence_set(x: Sequence):
45 return len(x) == len(set(x))
46
47
48DiffMatchedTractCatalogBaseTemplates = {
49 "name_input_cat_ref": "truth_summary",
50 "name_input_cat_target": "objectTable_tract",
51 "name_skymap": BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
52}
53
54
56 pipeBase.PipelineTaskConnections,
57 dimensions=("tract", "skymap"),
58 defaultTemplates=DiffMatchedTractCatalogBaseTemplates,
59):
60 cat_ref = cT.Input(
61 doc="Reference object catalog to match from",
62 name="{name_input_cat_ref}",
63 storageClass="ArrowAstropy",
64 dimensions=("tract", "skymap"),
65 deferLoad=True,
66 )
67 cat_target = cT.Input(
68 doc="Target object catalog to match",
69 name="{name_input_cat_target}",
70 storageClass="ArrowAstropy",
71 dimensions=("tract", "skymap"),
72 deferLoad=True,
73 )
74 skymap = cT.Input(
75 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
76 name="{name_skymap}",
77 storageClass="SkyMap",
78 dimensions=("skymap",),
79 )
80 cat_match_ref = cT.Input(
81 doc="Reference match catalog with indices of target matches",
82 name="match_ref_{name_input_cat_ref}_{name_input_cat_target}",
83 storageClass="ArrowAstropy",
84 dimensions=("tract", "skymap"),
85 deferLoad=True,
86 )
87 cat_match_target = cT.Input(
88 doc="Target match catalog with indices of references matches",
89 name="match_target_{name_input_cat_ref}_{name_input_cat_target}",
90 storageClass="ArrowAstropy",
91 dimensions=("tract", "skymap"),
92 deferLoad=True,
93 )
94 columns_match_target = cT.Input(
95 doc="Target match catalog columns",
96 name="match_target_{name_input_cat_ref}_{name_input_cat_target}.columns",
97 storageClass="ArrowColumnList",
98 dimensions=("tract", "skymap"),
99 )
100 cat_matched = cT.Output(
101 doc="Catalog with reference and target columns for joined sources",
102 name="matched_{name_input_cat_ref}_{name_input_cat_target}",
103 storageClass="ArrowAstropy",
104 dimensions=("tract", "skymap"),
105 )
106
107 def __init__(self, *, config=None):
108 if config.refcat_sharding_type != "tract":
109 if config.refcat_sharding_type == "none":
110 old = self.cat_ref
111 del self.cat_ref
112 self.cat_ref = cT.Input(
113 doc=old.doc,
114 name=old.name,
115 storageClass=old.storageClass,
116 dimensions=(),
117 deferLoad=old.deferLoad,
118 )
119
120
121class MatchedCatalogFluxesConfig(pexConfig.Config):
122 column_ref_flux = pexConfig.Field(
123 dtype=str,
124 doc='Reference catalog flux column name',
125 )
126 columns_target_flux = pexConfig.ListField(
127 dtype=str,
128 listCheck=is_sequence_set,
129 doc="List of target catalog flux column names",
130 )
131 columns_target_flux_err = pexConfig.ListField(
132 dtype=str,
133 listCheck=is_sequence_set,
134 doc="List of target catalog flux error column names",
135 )
136
137 # this should be an orderedset
138 @property
139 def columns_in_ref(self) -> list[str]:
140 return [self.column_ref_flux]
141
142 # this should also be an orderedset
143 @property
144 def columns_in_target(self) -> list[str]:
145 columns = [col for col in self.columns_target_flux]
146 columns.extend(col for col in self.columns_target_flux_err if col not in columns)
147 return columns
148
149
151 pipeBase.PipelineTaskConfig,
152 pipelineConnections=DiffMatchedTractCatalogConnections,
153):
154 column_matched_prefix_ref = pexConfig.Field[str](
155 default='refcat_',
156 doc='The prefix for matched columns copied from the reference catalog',
157 )
158 include_unmatched = pexConfig.Field[bool](
159 default=False,
160 doc='Whether to include unmatched rows in the matched table',
161 )
162
163 @property
164 def columns_in_ref(self) -> list[str]:
165 columns_all = [self.coord_format.column_ref_coord1, self.coord_format.column_ref_coord2]
166 for column_lists in (
167 (
168 self.columns_ref_copy,
169 ),
170 (x.columns_in_ref for x in self.columns_flux.values()),
171 ):
172 for column_list in column_lists:
173 columns_all.extend(column_list)
174
175 return list({column: None for column in columns_all}.keys())
176
177 @property
178 def columns_in_target(self) -> list[str]:
179 columns_all = [self.coord_format.column_target_coord1, self.coord_format.column_target_coord2]
180 if self.coord_format.coords_ref_to_convert is not None:
181 columns_all.extend(col for col in self.coord_format.coords_ref_to_convert.values()
182 if col not in columns_all)
183 for column_lists in (
184 (
189 ),
190 (x.columns_in_target for x in self.columns_flux.values()),
191 ):
192 for column_list in column_lists:
193 columns_all.extend(col for col in column_list if col not in columns_all)
194 return columns_all
195
196 columns_flux = pexConfig.ConfigDictField(
197 doc="Configs for flux columns for each band",
198 keytype=str,
199 itemtype=MatchedCatalogFluxesConfig,
200 default={},
201 )
202 columns_ref_mag_to_nJy = pexConfig.DictField[str, str](
203 doc='Reference table AB mag columns to convert to nJy flux columns with new names',
204 default={},
205 )
206 columns_ref_copy = pexConfig.ListField[str](
207 doc='Reference table columns to copy into cat_matched',
208 default=[],
209 listCheck=is_sequence_set,
210 )
211 columns_target_coord_err = pexConfig.ListField[str](
212 doc='Target table coordinate columns with standard errors (sigma)',
213 listCheck=lambda x: (len(x) == 2) and (x[0] != x[1]),
214 )
215 columns_target_copy = pexConfig.ListField[str](
216 doc='Target table columns to copy into cat_matched',
217 default=('patch',),
218 listCheck=is_sequence_set,
219 )
220 columns_target_mag_to_nJy = pexConfig.DictField[str, str](
221 doc='Target table AB mag columns to convert to nJy flux columns with new names',
222 default={},
223 )
224 columns_target_select_true = pexConfig.ListField[str](
225 doc='Target table columns to require to be True for selecting sources',
226 default=('detect_isPrimary',),
227 listCheck=is_sequence_set,
228 )
229 columns_target_select_false = pexConfig.ListField[str](
230 doc='Target table columns to require to be False for selecting sources',
231 default=('merge_peak_sky',),
232 listCheck=is_sequence_set,
233 )
234 coord_format = pexConfig.ConfigField[ConvertCatalogCoordinatesConfig](
235 doc="Configuration for coordinate conversion",
236 )
237 refcat_sharding_type = pexConfig.ChoiceField[str](
238 doc="The type of sharding (spatial splitting) for the reference catalog",
239 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
240 default="tract",
241 )
242
243 def validate(self):
244 super().validate()
245
246 errors = []
247
248 for columns_mag, columns_in, name_columns_copy in (
249 (self.columns_ref_mag_to_nJy, self.columns_in_ref, "columns_ref_copy"),
250 (self.columns_target_mag_to_nJy, self.columns_in_target, "columns_target_copy"),
251 ):
252 columns_copy = getattr(self, name_columns_copy)
253 for column_old, column_new in columns_mag.items():
254 if column_old not in columns_in:
255 errors.append(
256 f"{column_old=} key in self.columns_mag_to_nJy not found in {columns_in=}; did you"
257 f" forget to add it to self.{name_columns_copy}={columns_copy}?"
258 )
259 if column_new in columns_copy:
260 errors.append(
261 f"{column_new=} value found in self.{name_columns_copy}={columns_copy}"
262 f" this will cause a collision. Please choose a different name."
263 )
264 if errors:
265 raise ValueError("\n".join(errors))
266
267
268class DiffMatchedTractCatalogTask(pipeBase.PipelineTask):
269 """Load subsets of matched catalogs and output a merged catalog of matched sources.
270 """
271 ConfigClass = DiffMatchedTractCatalogConfig
272 _DefaultName = "DiffMatchedTractCatalog"
273
274 def runQuantum(self, butlerQC, inputRefs, outputRefs):
275 inputs = butlerQC.get(inputRefs)
276 skymap = inputs.pop("skymap")
277
278 columns_match_target = ['match_row']
279 if 'match_candidate' in inputs['columns_match_target']:
280 columns_match_target.append('match_candidate')
281
282 outputs = self.run(
283 catalog_ref=inputs['cat_ref'].get(parameters={'columns': self.config.columns_in_ref}),
284 catalog_target=inputs['cat_target'].get(parameters={'columns': self.config.columns_in_target}),
285 catalog_match_ref=inputs['cat_match_ref'].get(
286 parameters={'columns': ['match_candidate', 'match_row']},
287 ),
288 catalog_match_target=inputs['cat_match_target'].get(
289 parameters={'columns': columns_match_target},
290 ),
291 wcs=skymap[butlerQC.quantum.dataId["tract"]].wcs,
292 )
293 butlerQC.put(outputs, outputRefs)
294
295 def run(
296 self,
297 catalog_ref: pd.DataFrame | astropy.table.Table,
298 catalog_target: pd.DataFrame | astropy.table.Table,
299 catalog_match_ref: pd.DataFrame | astropy.table.Table,
300 catalog_match_target: pd.DataFrame | astropy.table.Table,
301 wcs: afwGeom.SkyWcs = None,
302 ) -> pipeBase.Struct:
303 """Load matched reference and target (measured) catalogs, measure summary statistics, and output
304 a combined matched catalog with columns from both inputs.
305
306 Parameters
307 ----------
308 catalog_ref : `pandas.DataFrame` | `astropy.table.Table`
309 A reference catalog to diff objects/sources from.
310 catalog_target : `pandas.DataFrame` | `astropy.table.Table`
311 A target catalog to diff reference objects/sources to.
312 catalog_match_ref : `pandas.DataFrame` | `astropy.table.Table`
313 A catalog with match indices of target sources and selection flags
314 for each reference source.
315 catalog_match_target : `pandas.DataFrame` | `astropy.table.Table`
316 A catalog with selection flags for each target source.
317 wcs : `lsst.afw.image.SkyWcs`
318 A coordinate system to convert catalog positions to sky coordinates,
319 if necessary.
320
321 Returns
322 -------
323 retStruct : `lsst.pipe.base.Struct`
324 A struct with output_ref and output_target attribute containing the
325 output matched catalogs.
326 """
327 # Would be nice if this could refer directly to ConfigClass
328 config: DiffMatchedTractCatalogConfig = self.config
329
330 is_ref_pd = isinstance(catalog_ref, pd.DataFrame)
331 is_target_pd = isinstance(catalog_target, pd.DataFrame)
332 is_match_ref_pd = isinstance(catalog_match_ref, pd.DataFrame)
333 is_match_target_pd = isinstance(catalog_match_target, pd.DataFrame)
334 if is_ref_pd:
335 catalog_ref = astropy.table.Table.from_pandas(catalog_ref)
336 if is_target_pd:
337 catalog_target = astropy.table.Table.from_pandas(catalog_target)
338 if is_match_ref_pd:
339 catalog_match_ref = astropy.table.Table.from_pandas(catalog_match_ref)
340 if is_match_target_pd:
341 catalog_match_target = astropy.table.Table.from_pandas(catalog_match_target)
342
343 # Strip any provenance from tables before merging to prevent
344 # warnings from conflicts being issued by astropy.utils.merge during
345 # vstack or hstack calls.
346 DatasetProvenance.strip_provenance_from_flat_dict(catalog_ref.meta)
347 DatasetProvenance.strip_provenance_from_flat_dict(catalog_target.meta)
348 DatasetProvenance.strip_provenance_from_flat_dict(catalog_match_ref.meta)
349 DatasetProvenance.strip_provenance_from_flat_dict(catalog_match_target.meta)
350
351 # TODO: Remove pandas support in DM-46523
352 if is_ref_pd or is_target_pd or is_match_ref_pd or is_match_target_pd:
353 warnings.warn("pandas usage in MatchProbabilisticTask is deprecated; it will be removed "
354 " in favour of astropy.table after release 28.0.0", category=FutureWarning)
355
356 select_ref = catalog_match_ref['match_candidate']
357 # Add additional selection criteria for target sources beyond those for matching
358 # (not recommended, but can be done anyway)
359 select_target = (catalog_match_target['match_candidate']
360 if 'match_candidate' in catalog_match_target.columns
361 else np.ones(len(catalog_match_target), dtype=bool))
362 for column in config.columns_target_select_true:
363 select_target &= catalog_target[column]
364 for column in config.columns_target_select_false:
365 select_target &= ~catalog_target[column]
366
367 ref, target = config.coord_format.format_catalogs(
368 catalog_ref=catalog_ref, catalog_target=catalog_target,
369 select_ref=None, select_target=select_target, wcs=wcs, radec_to_xy_func=radec_to_xy,
370 )
371 cat_ref = ref.catalog
372 cat_target = target.catalog
373 n_target = len(cat_target)
374
375 if config.include_unmatched:
376 for cat_add, cat_match in ((cat_ref, catalog_match_ref), (cat_target, catalog_match_target)):
377 cat_add['match_candidate'] = cat_match['match_candidate']
378
379 match_row = catalog_match_ref['match_row']
380 matched_ref = match_row >= 0
381 matched_row = match_row[matched_ref]
382 matched_target = np.zeros(n_target, dtype=bool)
383 matched_target[matched_row] = True
384
385 # Add/compute distance columns
386 coord1_target_err, coord2_target_err = config.columns_target_coord_err
387 column_dist, column_dist_err = 'match_distance', 'match_distanceErr'
388 dist = np.full(n_target, np.nan)
389
390 target_match_c1, target_match_c2 = (coord[matched_row] for coord in (target.coord1, target.coord2))
391 target_ref_c1, target_ref_c2 = (coord[matched_ref] for coord in (ref.coord1, ref.coord2))
392
393 dist_err = np.full(n_target, np.nan)
394 dist[matched_row] = sphdist(
395 target_match_c1, target_match_c2, target_ref_c1, target_ref_c2
396 ) if config.coord_format.coords_spherical else np.hypot(
397 target_match_c1 - target_ref_c1, target_match_c2 - target_ref_c2,
398 )
399 cat_target_matched = cat_target[matched_row]
400 # This will convert a masked array to an array filled with nans
401 # wherever there are bad values (otherwise sphdist can raise)
402 c1_err, c2_err = (
403 np.ma.getdata(cat_target_matched[c_err]) for c_err in (coord1_target_err, coord2_target_err)
404 )
405 # Should probably explicitly add cosine terms if ref has errors too
406 dist_err[matched_row] = sphdist(
407 target_match_c1, target_match_c2, target_match_c1 + c1_err, target_match_c2 + c2_err
408 ) if config.coord_format.coords_spherical else np.hypot(c1_err, c2_err)
409 cat_target[column_dist], cat_target[column_dist_err] = dist, dist_err
410
411 # Create a matched table, preserving the target catalog's named index (if it has one)
412 cat_left = cat_target[matched_row]
413 cat_right = cat_ref[matched_ref]
414 cat_right.rename_columns(
415 list(cat_right.columns),
416 new_names=[f'{config.column_matched_prefix_ref}{col}' for col in cat_right.columns],
417 )
418 cat_matched = astropy.table.hstack((cat_left, cat_right))
419
420 if config.include_unmatched:
421 # Create an unmatched table with the same schema as the matched one
422 # ... but only for objects with no matches (for completeness/purity)
423 # and that were selected for matching (or inclusion via config)
424 cat_right = astropy.table.Table(
425 cat_ref[~matched_ref & select_ref]
426 )
427 cat_right.rename_columns(
428 cat_right.colnames,
429 [f"{config.column_matched_prefix_ref}{col}" for col in cat_right.colnames],
430 )
431 match_row_target = catalog_match_target['match_row']
432 cat_left = cat_target[~(match_row_target >= 0) & select_target]
433 # This may be slower than pandas but will, for example, create
434 # masked columns for booleans, which pandas does not support.
435 # See https://github.com/pandas-dev/pandas/issues/46662
436 cat_unmatched = astropy.table.vstack([cat_left, cat_right])
437
438 for columns_convert_base, prefix in (
439 (config.columns_ref_mag_to_nJy, config.column_matched_prefix_ref),
440 (config.columns_target_mag_to_nJy, ""),
441 ):
442 if columns_convert_base:
443 columns_convert = {
444 f"{prefix}{k}": f"{prefix}{v}" for k, v in columns_convert_base.items()
445 } if prefix else columns_convert_base
446 to_convert = [cat_matched]
447 if config.include_unmatched:
448 to_convert.append(cat_unmatched)
449 for cat_convert in to_convert:
450 cat_convert.rename_columns(
451 tuple(columns_convert.keys()),
452 tuple(columns_convert.values()),
453 )
454 for column_flux in columns_convert.values():
455 cat_convert[column_flux] = u.ABmag.to(u.nJy, cat_convert[column_flux])
456
457 if config.include_unmatched:
458 # This is probably less efficient than just doing an outer join originally; worth checking
459 cat_matched = astropy.table.vstack([cat_matched, cat_unmatched])
460
461 retStruct = pipeBase.Struct(cat_matched=cat_matched)
462 return retStruct
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition SkyWcs.h:117
pipeBase.Struct run(self, pd.DataFrame|astropy.table.Table catalog_ref, pd.DataFrame|astropy.table.Table catalog_target, pd.DataFrame|astropy.table.Table catalog_match_ref, pd.DataFrame|astropy.table.Table catalog_match_target, afwGeom.SkyWcs wcs=None)