LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+1b3060144d,g18429d2f64+f642bf4753,g199a45376c+0ba108daf9,g1fd858c14a+2dcf163641,g262e1987ae+7b8c96d2ca,g29ae962dfc+3bd6ecb08a,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53e1a9e7c5,g4595892280+fef73a337f,g47891489e3+2efcf17695,g4d44eb3520+642b70b07e,g53246c7159+8c5ae1fdc5,g67b6fd64d1+2efcf17695,g67fd3c3899+b70e05ef52,g74acd417e5+317eb4c7d4,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+2efcf17695,g8d7436a09f+3be3c13596,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+a4e7b16d9b,g97be763408+ad77d7208f,g9dd6db0277+b70e05ef52,ga681d05dcb+a3f46e7fff,gabf8522325+735880ea63,gac2eed3f23+2efcf17695,gb89ab40317+2efcf17695,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+b70e05ef52,gdab6d2f7ff+317eb4c7d4,gdc713202bf+b70e05ef52,gdfd2d52018+b10e285e0f,ge365c994fd+310e8507c4,ge410e46f29+2efcf17695,geaed405ab2+562b3308c0,gffca2db377+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
diff_matched_tract_catalog.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = [
23 'DiffMatchedTractCatalogConfig', 'DiffMatchedTractCatalogTask', 'MatchedCatalogFluxesConfig',
24]
25
26import lsst.afw.geom as afwGeom
27from lsst.meas.astrom.matcher_probabilistic import ConvertCatalogCoordinatesConfig
29import lsst.pex.config as pexConfig
30import lsst.pipe.base as pipeBase
31import lsst.pipe.base.connectionTypes as cT
32from lsst.skymap import BaseSkyMap
33from lsst.daf.butler import DatasetProvenance
34
35import astropy.table
36import astropy.units as u
37import numpy as np
38from smatch.matcher import sphdist
39from typing import Sequence
40
41
42def is_sequence_set(x: Sequence):
43 return len(x) == len(set(x))
44
45
46DiffMatchedTractCatalogBaseTemplates = {
47 "name_input_cat_ref": "truth_summary",
48 "name_input_cat_target": "objectTable_tract",
49 "name_skymap": BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
50}
51
52
54 pipeBase.PipelineTaskConnections,
55 dimensions=("tract", "skymap"),
56 defaultTemplates=DiffMatchedTractCatalogBaseTemplates,
57):
58 cat_ref = cT.Input(
59 doc="Reference object catalog to match from",
60 name="{name_input_cat_ref}",
61 storageClass="ArrowAstropy",
62 dimensions=("tract", "skymap"),
63 deferLoad=True,
64 )
65 cat_target = cT.Input(
66 doc="Target object catalog to match",
67 name="{name_input_cat_target}",
68 storageClass="ArrowAstropy",
69 dimensions=("tract", "skymap"),
70 deferLoad=True,
71 )
72 skymap = cT.Input(
73 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
74 name="{name_skymap}",
75 storageClass="SkyMap",
76 dimensions=("skymap",),
77 )
78 cat_match_ref = cT.Input(
79 doc="Reference match catalog with indices of target matches",
80 name="match_ref_{name_input_cat_ref}_{name_input_cat_target}",
81 storageClass="ArrowAstropy",
82 dimensions=("tract", "skymap"),
83 deferLoad=True,
84 )
85 cat_match_target = cT.Input(
86 doc="Target match catalog with indices of references matches",
87 name="match_target_{name_input_cat_ref}_{name_input_cat_target}",
88 storageClass="ArrowAstropy",
89 dimensions=("tract", "skymap"),
90 deferLoad=True,
91 )
92 columns_match_target = cT.Input(
93 doc="Target match catalog columns",
94 name="match_target_{name_input_cat_ref}_{name_input_cat_target}.columns",
95 storageClass="ArrowColumnList",
96 dimensions=("tract", "skymap"),
97 )
98 cat_matched = cT.Output(
99 doc="Catalog with reference and target columns for joined sources",
100 name="matched_{name_input_cat_ref}_{name_input_cat_target}",
101 storageClass="ArrowAstropy",
102 dimensions=("tract", "skymap"),
103 )
104
105 def __init__(self, *, config=None):
106 if config.refcat_sharding_type != "tract":
107 if config.refcat_sharding_type == "none":
108 old = self.cat_ref
109 del self.cat_ref
110 self.cat_ref = cT.Input(
111 doc=old.doc,
112 name=old.name,
113 storageClass=old.storageClass,
114 dimensions=(),
115 deferLoad=old.deferLoad,
116 )
117
118
119class MatchedCatalogFluxesConfig(pexConfig.Config):
120 column_ref_flux = pexConfig.Field(
121 dtype=str,
122 doc='Reference catalog flux column name',
123 )
124 columns_target_flux = pexConfig.ListField(
125 dtype=str,
126 listCheck=is_sequence_set,
127 doc="List of target catalog flux column names",
128 )
129 columns_target_flux_err = pexConfig.ListField(
130 dtype=str,
131 listCheck=is_sequence_set,
132 doc="List of target catalog flux error column names",
133 )
134
135 # this should be an orderedset
136 @property
137 def columns_in_ref(self) -> list[str]:
138 return [self.column_ref_flux]
139
140 # this should also be an orderedset
141 @property
142 def columns_in_target(self) -> list[str]:
143 columns = [col for col in self.columns_target_flux]
144 columns.extend(col for col in self.columns_target_flux_err if col not in columns)
145 return columns
146
147
149 pipeBase.PipelineTaskConfig,
150 pipelineConnections=DiffMatchedTractCatalogConnections,
151):
152 column_matched_prefix_ref = pexConfig.Field[str](
153 default='refcat_',
154 doc='The prefix for matched columns copied from the reference catalog',
155 )
156 include_unmatched = pexConfig.Field[bool](
157 default=False,
158 doc='Whether to include unmatched rows in the matched table',
159 )
160 prefix_best_coord = pexConfig.Field[str](
161 default=None,
162 doc="A string prefix for ra/dec coordinate columns generated from the reference coordinate if "
163 "available, and target otherwise. Ignored if None or include_unmatched is False.",
164 optional=True,
165 )
166
167 @property
168 def columns_in_ref(self) -> list[str]:
169 columns_all = [self.coord_format.column_ref_coord1, self.coord_format.column_ref_coord2]
170 for column_lists in (
171 (
172 self.columns_ref_copy,
173 ),
174 (x.columns_in_ref for x in self.columns_flux.values()),
175 ):
176 for column_list in column_lists:
177 columns_all.extend(column_list)
178
179 return list({column: None for column in columns_all}.keys())
180
181 @property
182 def columns_in_target(self) -> list[str]:
183 columns_all = [self.coord_format.column_target_coord1, self.coord_format.column_target_coord2]
184 if self.coord_format.coords_ref_to_convert is not None:
185 columns_all.extend(col for col in self.coord_format.coords_ref_to_convert.values()
186 if col not in columns_all)
187 for column_lists in (
188 (
193 ),
194 (x.columns_in_target for x in self.columns_flux.values()),
195 ):
196 for column_list in column_lists:
197 columns_all.extend(col for col in column_list if col not in columns_all)
198 return columns_all
199
200 columns_flux = pexConfig.ConfigDictField(
201 doc="Configs for flux columns for each band",
202 keytype=str,
203 itemtype=MatchedCatalogFluxesConfig,
204 default={},
205 )
206 columns_ref_mag_to_nJy = pexConfig.DictField[str, str](
207 doc='Reference table AB mag columns to convert to nJy flux columns with new names',
208 default={},
209 )
210 columns_ref_copy = pexConfig.ListField[str](
211 doc='Reference table columns to copy into cat_matched',
212 default=[],
213 listCheck=is_sequence_set,
214 )
215 columns_target_coord_err = pexConfig.ListField[str](
216 doc='Target table coordinate columns with standard errors (sigma)',
217 listCheck=lambda x: (len(x) == 2) and (x[0] != x[1]),
218 )
219 columns_target_copy = pexConfig.ListField[str](
220 doc='Target table columns to copy into cat_matched',
221 default=('patch',),
222 listCheck=is_sequence_set,
223 )
224 columns_target_mag_to_nJy = pexConfig.DictField[str, str](
225 doc='Target table AB mag columns to convert to nJy flux columns with new names',
226 default={},
227 )
228 columns_target_select_true = pexConfig.ListField[str](
229 doc='Target table columns to require to be True for selecting sources',
230 default=('detect_isPrimary',),
231 listCheck=is_sequence_set,
232 )
233 columns_target_select_false = pexConfig.ListField[str](
234 doc='Target table columns to require to be False for selecting sources',
235 default=('merge_peak_sky',),
236 listCheck=is_sequence_set,
237 )
238 coord_format = pexConfig.ConfigField[ConvertCatalogCoordinatesConfig](
239 doc="Configuration for coordinate conversion",
240 )
241 refcat_sharding_type = pexConfig.ChoiceField[str](
242 doc="The type of sharding (spatial splitting) for the reference catalog",
243 allowed={"tract": "Tract-based shards", "none": "No sharding at all"},
244 default="tract",
245 )
246
247 def validate(self):
248 super().validate()
249
250 errors = []
251
252 for columns_mag, columns_in, name_columns_copy in (
253 (self.columns_ref_mag_to_nJy, self.columns_in_ref, "columns_ref_copy"),
254 (self.columns_target_mag_to_nJy, self.columns_in_target, "columns_target_copy"),
255 ):
256 columns_copy = getattr(self, name_columns_copy)
257 for column_old, column_new in columns_mag.items():
258 if column_old not in columns_in:
259 errors.append(
260 f"{column_old=} key in self.columns_mag_to_nJy not found in {columns_in=}; did you"
261 f" forget to add it to self.{name_columns_copy}={columns_copy}?"
262 )
263 if column_new in columns_copy:
264 errors.append(
265 f"{column_new=} value found in self.{name_columns_copy}={columns_copy}"
266 f" this will cause a collision. Please choose a different name."
267 )
268 if errors:
269 raise ValueError("\n".join(errors))
270
271
272class DiffMatchedTractCatalogTask(pipeBase.PipelineTask):
273 """Load subsets of matched catalogs and output a merged catalog of matched sources.
274 """
275 ConfigClass = DiffMatchedTractCatalogConfig
276 _DefaultName = "DiffMatchedTractCatalog"
277
278 def runQuantum(self, butlerQC, inputRefs, outputRefs):
279 inputs = butlerQC.get(inputRefs)
280 skymap = inputs.pop("skymap")
281
282 columns_match_target = ['match_row']
283 if 'match_candidate' in inputs['columns_match_target']:
284 columns_match_target.append('match_candidate')
285
286 outputs = self.run(
287 catalog_ref=inputs['cat_ref'].get(parameters={'columns': self.config.columns_in_ref}),
288 catalog_target=inputs['cat_target'].get(parameters={'columns': self.config.columns_in_target}),
289 catalog_match_ref=inputs['cat_match_ref'].get(
290 parameters={'columns': ['match_candidate', 'match_row']},
291 ),
292 catalog_match_target=inputs['cat_match_target'].get(
293 parameters={'columns': columns_match_target},
294 ),
295 wcs=skymap[butlerQC.quantum.dataId["tract"]].wcs,
296 )
297 butlerQC.put(outputs, outputRefs)
298
299 def run(
300 self,
301 catalog_ref: astropy.table.Table,
302 catalog_target: astropy.table.Table,
303 catalog_match_ref: astropy.table.Table,
304 catalog_match_target: astropy.table.Table,
305 wcs: afwGeom.SkyWcs = None,
306 ) -> pipeBase.Struct:
307 """Load matched reference and target (measured) catalogs, measure summary statistics, and output
308 a combined matched catalog with columns from both inputs.
309
310 Parameters
311 ----------
312 catalog_ref : `astropy.table.Table`
313 A reference catalog to diff objects/sources from.
314 catalog_target : `astropy.table.Table`
315 A target catalog to diff reference objects/sources to.
316 catalog_match_ref : `astropy.table.Table`
317 A catalog with match indices of target sources and selection flags
318 for each reference source.
319 catalog_match_target : `astropy.table.Table`
320 A catalog with selection flags for each target source.
321 wcs : `lsst.afw.image.SkyWcs`
322 A coordinate system to convert catalog positions to sky coordinates,
323 if necessary.
324
325 Returns
326 -------
327 retStruct : `lsst.pipe.base.Struct`
328 A struct with output_ref and output_target attribute containing the
329 output matched catalogs.
330 """
331 # Would be nice if this could refer directly to ConfigClass
332 config: DiffMatchedTractCatalogConfig = self.config
333
334 # Strip any provenance from tables before merging to prevent
335 # warnings from conflicts being issued by astropy.utils.merge during
336 # vstack or hstack calls.
337 DatasetProvenance.strip_provenance_from_flat_dict(catalog_ref.meta)
338 DatasetProvenance.strip_provenance_from_flat_dict(catalog_target.meta)
339 DatasetProvenance.strip_provenance_from_flat_dict(catalog_match_ref.meta)
340 DatasetProvenance.strip_provenance_from_flat_dict(catalog_match_target.meta)
341
342 select_ref = catalog_match_ref['match_candidate']
343 # Add additional selection criteria for target sources beyond those for matching
344 # (not recommended, but can be done anyway)
345 select_target = (catalog_match_target['match_candidate']
346 if 'match_candidate' in catalog_match_target.columns
347 else np.ones(len(catalog_match_target), dtype=bool))
348 for column in config.columns_target_select_true:
349 select_target &= catalog_target[column]
350 for column in config.columns_target_select_false:
351 select_target &= ~catalog_target[column]
352
353 ref, target = config.coord_format.format_catalogs(
354 catalog_ref=catalog_ref, catalog_target=catalog_target,
355 select_ref=None, select_target=select_target, wcs=wcs, radec_to_xy_func=radec_to_xy,
356 )
357 cat_ref = ref.catalog
358 cat_target = target.catalog
359 n_target = len(cat_target)
360
361 if config.include_unmatched:
362 for cat_add, cat_match in ((cat_ref, catalog_match_ref), (cat_target, catalog_match_target)):
363 cat_add['match_candidate'] = cat_match['match_candidate']
364
365 match_row = catalog_match_ref['match_row']
366 matched_ref = match_row >= 0
367 matched_row = match_row[matched_ref]
368 matched_target = np.zeros(n_target, dtype=bool)
369 matched_target[matched_row] = True
370
371 # Add/compute distance columns
372 coord1_target_err, coord2_target_err = config.columns_target_coord_err
373 column_dist, column_dist_err = 'match_distance', 'match_distanceErr'
374 dist = np.full(n_target, np.nan)
375
376 target_match_c1, target_match_c2 = (coord[matched_row] for coord in (target.coord1, target.coord2))
377 target_ref_c1, target_ref_c2 = (coord[matched_ref] for coord in (ref.coord1, ref.coord2))
378
379 dist_err = np.full(n_target, np.nan)
380 dist[matched_row] = sphdist(
381 target_match_c1, target_match_c2, target_ref_c1, target_ref_c2
382 ) if config.coord_format.coords_spherical else np.hypot(
383 target_match_c1 - target_ref_c1, target_match_c2 - target_ref_c2,
384 )
385 cat_target_matched = cat_target[matched_row]
386 # This will convert a masked array to an array filled with nans
387 # wherever there are bad values (otherwise sphdist can raise)
388 c1_err, c2_err = (
389 np.ma.getdata(cat_target_matched[c_err]) for c_err in (coord1_target_err, coord2_target_err)
390 )
391 # Should probably explicitly add cosine terms if ref has errors too
392 dist_err[matched_row] = sphdist(
393 target_match_c1, target_match_c2, target_match_c1 + c1_err, target_match_c2 + c2_err
394 ) if config.coord_format.coords_spherical else np.hypot(c1_err, c2_err)
395 cat_target[column_dist], cat_target[column_dist_err] = dist, dist_err
396
397 # Create a matched table, preserving the target catalog's named index (if it has one)
398 cat_left = cat_target[matched_row]
399 cat_right = cat_ref[matched_ref]
400 cat_right.rename_columns(
401 list(cat_right.columns),
402 new_names=[f'{config.column_matched_prefix_ref}{col}' for col in cat_right.columns],
403 )
404 cat_matched = astropy.table.hstack((cat_left, cat_right))
405
406 if config.include_unmatched:
407 # Create an unmatched table with the same schema as the matched one
408 # ... but only for objects with no matches (for completeness/purity)
409 # and that were selected for matching (or inclusion via config)
410 cat_right = astropy.table.Table(
411 cat_ref[~matched_ref & select_ref]
412 )
413 cat_right.rename_columns(
414 cat_right.colnames,
415 [f"{config.column_matched_prefix_ref}{col}" for col in cat_right.colnames],
416 )
417 match_row_target = catalog_match_target['match_row']
418 cat_left = cat_target[~(match_row_target >= 0) & select_target]
419 # This may be slower than pandas but will, for example, create
420 # masked columns for booleans, which pandas does not support.
421 # See https://github.com/pandas-dev/pandas/issues/46662
422 cat_unmatched = astropy.table.vstack([cat_left, cat_right])
423
424 for columns_convert_base, prefix in (
425 (config.columns_ref_mag_to_nJy, config.column_matched_prefix_ref),
426 (config.columns_target_mag_to_nJy, ""),
427 ):
428 if columns_convert_base:
429 columns_convert = {
430 f"{prefix}{k}": f"{prefix}{v}" for k, v in columns_convert_base.items()
431 } if prefix else columns_convert_base
432 to_convert = [cat_matched]
433 if config.include_unmatched:
434 to_convert.append(cat_unmatched)
435 for cat_convert in to_convert:
436 cat_convert.rename_columns(
437 tuple(columns_convert.keys()),
438 tuple(columns_convert.values()),
439 )
440 for column_flux in columns_convert.values():
441 cat_convert[column_flux] = u.ABmag.to(u.nJy, cat_convert[column_flux])
442
443 if config.include_unmatched:
444 # This is probably less efficient than just doing an outer join originally; worth checking
445 cat_matched = astropy.table.vstack([cat_matched, cat_unmatched])
446 if (prefix_coord := config.prefix_best_coord) is not None:
447 columns_coord_best = (
448 f"{prefix_coord}{col_coord}" for col_coord in (
449 ("ra", "dec") if config.coord_format.coords_spherical else ("coord1", "coord2")
450 )
451 )
452 for column_coord_best, column_coord_ref, column_coord_target in zip(
453 columns_coord_best,
454 (config.coord_format.column_ref_coord1, config.coord_format.column_ref_coord2),
455 (config.coord_format.column_target_coord1, config.coord_format.column_target_coord2),
456 ):
457 column_full_ref = f'{config.column_matched_prefix_ref}{column_coord_ref}'
458 values = cat_matched[column_full_ref]
459 unit = values.unit
460 values_bad = np.ma.masked_invalid(values).mask
461 # Cast to an unmasked array - there will be no bad values
462 values = np.array(values)
463 values[values_bad] = cat_matched[column_coord_target][values_bad]
464 cat_matched[column_coord_best] = values
465 cat_matched[column_coord_best].unit = unit
466 cat_matched[column_coord_best].description = (
467 f"Best {columns_coord_best} value from {column_full_ref} if available"
468 f" else {column_coord_target}"
469 )
470
471 retStruct = pipeBase.Struct(cat_matched=cat_matched)
472 return retStruct
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition SkyWcs.h:118
pipeBase.Struct run(self, astropy.table.Table catalog_ref, astropy.table.Table catalog_target, astropy.table.Table catalog_match_ref, astropy.table.Table catalog_match_target, afwGeom.SkyWcs wcs=None)