Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+05816baf74,g1ec0fe41b4+f536777771,g1fd858c14a+a9301854fb,g35bb328faa+fcb1d3bbc8,g4af146b050+a5c07d5b1d,g4d2262a081+6e5fcc2a4e,g53246c7159+fcb1d3bbc8,g56a49b3a55+9c12191793,g5a012ec0e7+3632fc3ff3,g60b5630c4e+ded28b650d,g67b6fd64d1+ed4b5058f4,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g8352419a5c+fcb1d3bbc8,g87b7deb4dc+7b42cf88bf,g8852436030+e5453db6e6,g89139ef638+ed4b5058f4,g8e3bb8577d+d38d73bdbd,g9125e01d80+fcb1d3bbc8,g94187f82dc+ded28b650d,g989de1cb63+ed4b5058f4,g9d31334357+ded28b650d,g9f33ca652e+50a8019d8c,gabe3b4be73+1e0a283bba,gabf8522325+fa80ff7197,gb1101e3267+d9fb1f8026,gb58c049af0+f03b321e39,gb665e3612d+2a0c9e9e84,gb89ab40317+ed4b5058f4,gcf25f946ba+e5453db6e6,gd6cbbdb0b4+bb83cc51f8,gdd1046aedd+ded28b650d,gde0f65d7ad+941d412827,ge278dab8ac+d65b3c2b70,ge410e46f29+ed4b5058f4,gf23fb2af72+b7cae620c0,gf5e32f922b+fcb1d3bbc8,gf67bdafdda+ed4b5058f4,w.2025.16
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
finalizeCharacterization.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Task to run a finalized image characterization, using additional data.
23"""
24
25__all__ = [
26 'FinalizeCharacterizationConnections',
27 'FinalizeCharacterizationConfig',
28 'FinalizeCharacterizationTask',
29 'FinalizeCharacterizationDetectorConnections',
30 'FinalizeCharacterizationDetectorConfig',
31 'FinalizeCharacterizationDetectorTask',
32 'ConsolidateFinalizeCharacterizationDetectorConnections',
33 'ConsolidateFinalizeCharacterizationDetectorConfig',
34 'ConsolidateFinalizeCharacterizationDetectorTask',
35]
36
37import astropy.table
38import numpy as np
39import esutil
40
41
42import lsst.pex.config as pexConfig
43import lsst.pipe.base as pipeBase
44import lsst.daf.base as dafBase
45import lsst.afw.table as afwTable
46import lsst.meas.algorithms as measAlg
48from lsst.meas.algorithms import MeasureApCorrTask
49from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask
50from lsst.meas.algorithms.sourceSelector import sourceSelectorRegistry
51
52from .reserveIsolatedStars import ReserveIsolatedStarsTask
53from .postprocess import TableVStack
54
55
57 pipeBase.PipelineTaskConnections,
58 dimensions=('instrument', 'visit',),
59 defaultTemplates={},
60):
61 src_schema = pipeBase.connectionTypes.InitInput(
62 doc='Input schema used for src catalogs.',
63 name='src_schema',
64 storageClass='SourceCatalog',
65 )
66 isolated_star_cats = pipeBase.connectionTypes.Input(
67 doc=('Catalog of isolated stars with average positions, number of associated '
68 'sources, and indexes to the isolated_star_sources catalogs.'),
69 name='isolated_star_presource_associations',
70 storageClass='ArrowAstropy',
71 dimensions=('instrument', 'tract', 'skymap'),
72 deferLoad=True,
73 multiple=True,
74 )
75 isolated_star_sources = pipeBase.connectionTypes.Input(
76 doc=('Catalog of isolated star sources with sourceIds, and indexes to the '
77 'isolated_star_cats catalogs.'),
78 name='isolated_star_presources',
79 storageClass='ArrowAstropy',
80 dimensions=('instrument', 'tract', 'skymap'),
81 deferLoad=True,
82 multiple=True,
83 )
84
85
87 FinalizeCharacterizationConnectionsBase,
88 dimensions=('instrument', 'visit',),
89 defaultTemplates={},
90):
91 srcs = pipeBase.connectionTypes.Input(
92 doc='Source catalogs for the visit',
93 name='src',
94 storageClass='SourceCatalog',
95 dimensions=('instrument', 'visit', 'detector'),
96 deferLoad=True,
97 multiple=True,
98 deferGraphConstraint=True,
99 )
100 calexps = pipeBase.connectionTypes.Input(
101 doc='Calexps for the visit',
102 name='calexp',
103 storageClass='ExposureF',
104 dimensions=('instrument', 'visit', 'detector'),
105 deferLoad=True,
106 multiple=True,
107 )
108 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
109 doc=('Per-visit finalized psf models and aperture corrections. This '
110 'catalog uses detector id for the id and are sorted for fast '
111 'lookups of a detector.'),
112 name='finalized_psf_ap_corr_catalog',
113 storageClass='ExposureCatalog',
114 dimensions=('instrument', 'visit'),
115 )
116 finalized_src_table = pipeBase.connectionTypes.Output(
117 doc=('Per-visit catalog of measurements for psf/flag/etc.'),
118 name='finalized_src_table',
119 storageClass='ArrowAstropy',
120 dimensions=('instrument', 'visit'),
121 )
122
123
125 FinalizeCharacterizationConnectionsBase,
126 dimensions=('instrument', 'visit', 'detector',),
127 defaultTemplates={},
128):
129 src = pipeBase.connectionTypes.Input(
130 doc='Source catalog for the visit/detector.',
131 name='src',
132 storageClass='SourceCatalog',
133 dimensions=('instrument', 'visit', 'detector'),
134 )
135 calexp = pipeBase.connectionTypes.Input(
136 doc='Calibrated exposure for the visit/detector.',
137 name='calexp',
138 storageClass='ExposureF',
139 dimensions=('instrument', 'visit', 'detector'),
140 )
141 finalized_psf_ap_corr_detector_cat = pipeBase.connectionTypes.Output(
142 doc=('Per-visit/per-detector finalized psf models and aperture corrections. This '
143 'catalog uses detector id for the id.'),
144 name='finalized_psf_ap_corr_detector_catalog',
145 storageClass='ExposureCatalog',
146 dimensions=('instrument', 'visit', 'detector'),
147 )
148 finalized_src_detector_table = pipeBase.connectionTypes.Output(
149 doc=('Per-visit/per-detector catalog of measurements for psf/flag/etc.'),
150 name='finalized_src_detector_table',
151 storageClass='ArrowAstropy',
152 dimensions=('instrument', 'visit', 'detector'),
153 )
154
155
157 pipeBase.PipelineTaskConfig,
158 pipelineConnections=FinalizeCharacterizationConnectionsBase,
159):
160 """Configuration for FinalizeCharacterizationBaseTask."""
161 source_selector = sourceSelectorRegistry.makeField(
162 doc="How to select sources",
163 default="science"
164 )
165 id_column = pexConfig.Field(
166 doc='Name of column in isolated_star_sources with source id.',
167 dtype=str,
168 default='sourceId',
169 )
170 reserve_selection = pexConfig.ConfigurableField(
171 target=ReserveIsolatedStarsTask,
172 doc='Task to select reserved stars',
173 )
174 make_psf_candidates = pexConfig.ConfigurableField(
175 target=measAlg.MakePsfCandidatesTask,
176 doc='Task to make psf candidates from selected stars.',
177 )
178 psf_determiner = measAlg.psfDeterminerRegistry.makeField(
179 'PSF Determination algorithm',
180 default='piff'
181 )
182 measurement = pexConfig.ConfigurableField(
183 target=SingleFrameMeasurementTask,
184 doc='Measure sources for aperture corrections'
185 )
186 measure_ap_corr = pexConfig.ConfigurableField(
187 target=MeasureApCorrTask,
188 doc="Subtask to measure aperture corrections"
189 )
190 apply_ap_corr = pexConfig.ConfigurableField(
191 target=ApplyApCorrTask,
192 doc="Subtask to apply aperture corrections"
193 )
194
195 def setDefaults(self):
196 super().setDefaults()
197
198 source_selector = self.source_selector['science']
199 source_selector.setDefaults()
200
201 # We use the source selector only to select out flagged objects
202 # and signal-to-noise. Isolated, unresolved sources are handled
203 # by the isolated star catalog.
204
205 source_selector.doFlags = True
206 source_selector.doSignalToNoise = True
207 source_selector.doFluxLimit = False
208 source_selector.doUnresolved = False
209 source_selector.doIsolated = False
210
211 source_selector.signalToNoise.minimum = 50.0
212 source_selector.signalToNoise.maximum = 1000.0
213
214 source_selector.signalToNoise.fluxField = 'base_GaussianFlux_instFlux'
215 source_selector.signalToNoise.errField = 'base_GaussianFlux_instFluxErr'
216
217 source_selector.flags.bad = ['base_PixelFlags_flag_edge',
218 'base_PixelFlags_flag_nodata',
219 'base_PixelFlags_flag_interpolatedCenter',
220 'base_PixelFlags_flag_saturatedCenter',
221 'base_PixelFlags_flag_crCenter',
222 'base_PixelFlags_flag_bad',
223 'base_PixelFlags_flag_interpolated',
224 'base_PixelFlags_flag_saturated',
225 'slot_Centroid_flag',
226 'base_GaussianFlux_flag']
227
228 # Configure aperture correction to select only high s/n sources (that
229 # were used in the psf modeling) to avoid background problems when
230 # computing the aperture correction map.
231 self.measure_ap_corr.sourceSelector = 'science'
232
233 ap_selector = self.measure_ap_corr.sourceSelector['science']
234 # We do not need to filter flags or unresolved because we have used
235 # the filtered isolated stars as an input
236 ap_selector.doFlags = False
237 ap_selector.doUnresolved = False
238
239 import lsst.meas.modelfit # noqa: F401
240 import lsst.meas.extensions.photometryKron # noqa: F401
241 import lsst.meas.extensions.convolved # noqa: F401
242 import lsst.meas.extensions.gaap # noqa: F401
243 import lsst.meas.extensions.shapeHSM # noqa: F401
244
245 # Set up measurement defaults
246 self.measurement.plugins.names = [
247 'base_FPPosition',
248 'base_PsfFlux',
249 'base_GaussianFlux',
250 'modelfit_DoubleShapeletPsfApprox',
251 'modelfit_CModel',
252 'ext_photometryKron_KronFlux',
253 'ext_convolved_ConvolvedFlux',
254 'ext_gaap_GaapFlux',
255 'ext_shapeHSM_HsmShapeRegauss',
256 'ext_shapeHSM_HsmSourceMoments',
257 'ext_shapeHSM_HsmPsfMoments',
258 'ext_shapeHSM_HsmSourceMomentsRound',
259 'ext_shapeHSM_HigherOrderMomentsSource',
260 'ext_shapeHSM_HigherOrderMomentsPSF',
261 ]
262 self.measurement.slots.modelFlux = 'modelfit_CModel'
263 self.measurement.plugins['ext_convolved_ConvolvedFlux'].seeing.append(8.0)
264 self.measurement.plugins['ext_gaap_GaapFlux'].sigmas = [
265 0.5,
266 0.7,
267 1.0,
268 1.5,
269 2.5,
270 3.0
271 ]
272 self.measurement.plugins['ext_gaap_GaapFlux'].doPsfPhotometry = True
273 self.measurement.slots.shape = 'ext_shapeHSM_HsmSourceMoments'
274 self.measurement.slots.psfShape = 'ext_shapeHSM_HsmPsfMoments'
275 self.measurement.plugins['ext_shapeHSM_HsmShapeRegauss'].deblendNChild = ""
276
277 # TODO: Remove in DM-44658, streak masking to happen only in ip_diffim
278 # Keep track of which footprints contain streaks
279 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['STREAK']
280 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['STREAK']
281
282 # Turn off slot setting for measurement for centroid and shape
283 # (for which we use the input src catalog measurements)
284 self.measurement.slots.centroid = None
285 self.measurement.slots.apFlux = None
286 self.measurement.slots.calibFlux = None
287
288 names = self.measurement.plugins['ext_convolved_ConvolvedFlux'].getAllResultNames()
289 self.measure_ap_corr.allowFailure += names
290 names = self.measurement.plugins["ext_gaap_GaapFlux"].getAllGaapResultNames()
291 self.measure_ap_corr.allowFailure += names
292
293
295 FinalizeCharacterizationConfigBase,
296 pipelineConnections=FinalizeCharacterizationConnections,
297):
298 pass
299
300
302 FinalizeCharacterizationConfigBase,
303 pipelineConnections=FinalizeCharacterizationDetectorConnections,
304):
305 pass
306
307
308class FinalizeCharacterizationTaskBase(pipeBase.PipelineTask):
309 """Run final characterization on exposures."""
310 ConfigClass = FinalizeCharacterizationConfigBase
311 _DefaultName = 'finalize_characterization_base'
312
313 def __init__(self, initInputs=None, **kwargs):
314 super().__init__(initInputs=initInputs, **kwargs)
315
317 initInputs['src_schema'].schema
318 )
319
320 self.makeSubtask('reserve_selection')
321 self.makeSubtask('source_selector')
322 self.makeSubtask('make_psf_candidates')
323 self.makeSubtask('psf_determiner')
324 self.makeSubtask('measurement', schema=self.schema)
325 self.makeSubtask('measure_ap_corr', schema=self.schema)
326 self.makeSubtask('apply_ap_corr', schema=self.schema)
327
328 # Only log warning and fatal errors from the source_selector
329 self.source_selector.log.setLevel(self.source_selector.log.WARN)
330
331 def _make_output_schema_mapper(self, input_schema):
332 """Make the schema mapper from the input schema to the output schema.
333
334 Parameters
335 ----------
336 input_schema : `lsst.afw.table.Schema`
337 Input schema.
338
339 Returns
340 -------
341 mapper : `lsst.afw.table.SchemaMapper`
342 Schema mapper
343 output_schema : `lsst.afw.table.Schema`
344 Output schema (with alias map)
345 """
346 mapper = afwTable.SchemaMapper(input_schema)
347 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema())
348 mapper.addMapping(input_schema['slot_Centroid_x'].asKey())
349 mapper.addMapping(input_schema['slot_Centroid_y'].asKey())
350
351 # The aperture fields may be used by the psf determiner.
352 aper_fields = input_schema.extract('base_CircularApertureFlux_*')
353 for field, item in aper_fields.items():
354 mapper.addMapping(item.key)
355
356 # The following two may be redundant, but then the mapping is a no-op.
357 # Note that the slot_CalibFlux mapping will copy over any
358 # normalized compensated fluxes that are used for calibration.
359 apflux_fields = input_schema.extract('slot_ApFlux_*')
360 for field, item in apflux_fields.items():
361 mapper.addMapping(item.key)
362
363 calibflux_fields = input_schema.extract('slot_CalibFlux_*')
364 for field, item in calibflux_fields.items():
365 mapper.addMapping(item.key)
366
367 mapper.addMapping(
368 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(),
369 'calib_psf_selection_flux')
370 mapper.addMapping(
371 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(),
372 'calib_psf_selection_flux_err')
373
374 output_schema = mapper.getOutputSchema()
375
376 output_schema.addField(
377 'calib_psf_candidate',
378 type='Flag',
379 doc=('set if the source was a candidate for PSF determination, '
380 'as determined from FinalizeCharacterizationTask.'),
381 )
382 output_schema.addField(
383 'calib_psf_reserved',
384 type='Flag',
385 doc=('set if source was reserved from PSF determination by '
386 'FinalizeCharacterizationTask.'),
387 )
388 output_schema.addField(
389 'calib_psf_used',
390 type='Flag',
391 doc=('set if source was used in the PSF determination by '
392 'FinalizeCharacterizationTask.'),
393 )
394 output_schema.addField(
395 'visit',
396 type=np.int64,
397 doc='Visit number for the sources.',
398 )
399 output_schema.addField(
400 'detector',
401 type=np.int32,
402 doc='Detector number for the sources.',
403 )
404
405 alias_map = input_schema.getAliasMap()
406 alias_map_output = afwTable.AliasMap()
407 alias_map_output.set('slot_Centroid', alias_map.get('slot_Centroid'))
408 alias_map_output.set('slot_ApFlux', alias_map.get('slot_ApFlux'))
409 alias_map_output.set('slot_CalibFlux', alias_map.get('slot_CalibFlux'))
410
411 output_schema.setAliasMap(alias_map_output)
412
413 return mapper, output_schema
414
415 def _make_selection_schema_mapper(self, input_schema):
416 """Make the schema mapper from the input schema to the selection schema.
417
418 Parameters
419 ----------
420 input_schema : `lsst.afw.table.Schema`
421 Input schema.
422
423 Returns
424 -------
425 mapper : `lsst.afw.table.SchemaMapper`
426 Schema mapper
427 selection_schema : `lsst.afw.table.Schema`
428 Selection schema (with alias map)
429 """
430 mapper = afwTable.SchemaMapper(input_schema)
431 mapper.addMinimalSchema(input_schema)
432
433 selection_schema = mapper.getOutputSchema()
434
435 selection_schema.setAliasMap(input_schema.getAliasMap())
436
437 return mapper, selection_schema
438
439 def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict):
440 """
441 Concatenate isolated star catalogs and make reserve selection.
442
443 Parameters
444 ----------
445 band : `str`
446 Band name. Used to select reserved stars.
447 isolated_star_cat_dict : `dict`
448 Per-tract dict of isolated star catalog handles.
449 isolated_star_source_dict : `dict`
450 Per-tract dict of isolated star source catalog handles.
451
452 Returns
453 -------
454 isolated_table : `np.ndarray` (N,)
455 Table of isolated stars, with indexes to isolated sources.
456 Returns None if there are no usable isolated catalogs.
457 isolated_source_table : `np.ndarray` (M,)
458 Table of isolated sources, with indexes to isolated stars.
459 Returns None if there are no usable isolated catalogs.
460 """
461 isolated_tables = []
462 isolated_sources = []
463 merge_cat_counter = 0
464 merge_source_counter = 0
465
466 for tract in isolated_star_cat_dict:
467 astropy_cat = isolated_star_cat_dict[tract].get()
468 table_cat = np.asarray(astropy_cat)
469
470 astropy_source = isolated_star_source_dict[tract].get(
471 parameters={'columns': [self.config.id_column, 'obj_index']}
472 )
473 table_source = np.asarray(astropy_source)
474
475 # Cut isolated star table to those observed in this band, and adjust indexes
476 (use_band,) = (table_cat[f'nsource_{band}'] > 0).nonzero()
477
478 if len(use_band) == 0:
479 # There are no sources in this band in this tract.
480 self.log.info("No sources found in %s band in tract %d.", band, tract)
481 continue
482
483 # With the following matching:
484 # table_source[b] <-> table_cat[use_band[a]]
485 obj_index = table_source['obj_index'][:]
486 a, b = esutil.numpy_util.match(use_band, obj_index)
487
488 # Update indexes and cut to band-selected stars/sources
489 table_source['obj_index'][b] = a
490 _, index_new = np.unique(a, return_index=True)
491 table_cat[f'source_cat_index_{band}'][use_band] = index_new
492
493 # After the following cuts, the catalogs have the following properties:
494 # - table_cat only contains isolated stars that have at least one source
495 # in ``band``.
496 # - table_source only contains ``band`` sources.
497 # - The slice table_cat["source_cat_index_{band}"]: table_cat["source_cat_index_{band}"]
498 # + table_cat["nsource_{band}]
499 # applied to table_source will give all the sources associated with the star.
500 # - For each source, table_source["obj_index"] points to the index of the associated
501 # isolated star.
502 table_source = table_source[b]
503 table_cat = table_cat[use_band]
504
505 # Add reserved flag column to tables
506 table_cat = np.lib.recfunctions.append_fields(
507 table_cat,
508 'reserved',
509 np.zeros(table_cat.size, dtype=bool),
510 usemask=False
511 )
512 table_source = np.lib.recfunctions.append_fields(
513 table_source,
514 'reserved',
515 np.zeros(table_source.size, dtype=bool),
516 usemask=False
517 )
518
519 # Get reserve star flags
520 table_cat['reserved'][:] = self.reserve_selection.run(
521 len(table_cat),
522 extra=f'{band}_{tract}',
523 )
524 table_source['reserved'][:] = table_cat['reserved'][table_source['obj_index']]
525
526 # Offset indexes to account for tract merging
527 table_cat[f'source_cat_index_{band}'] += merge_source_counter
528 table_source['obj_index'] += merge_cat_counter
529
530 isolated_tables.append(table_cat)
531 isolated_sources.append(table_source)
532
533 merge_cat_counter += len(table_cat)
534 merge_source_counter += len(table_source)
535
536 if len(isolated_tables) > 0:
537 isolated_table = np.concatenate(isolated_tables)
538 isolated_source_table = np.concatenate(isolated_sources)
539 else:
540 isolated_table = None
541 isolated_source_table = None
542
543 return isolated_table, isolated_source_table
544
545 def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table):
546 """Compute psf model and aperture correction map for a single exposure.
547
548 Parameters
549 ----------
550 visit : `int`
551 Visit number (for logging).
552 detector : `int`
553 Detector number (for logging).
554 exposure : `lsst.afw.image.ExposureF`
555 src : `lsst.afw.table.SourceCatalog`
556 isolated_source_table : `np.ndarray`
557
558 Returns
559 -------
560 psf : `lsst.meas.algorithms.ImagePsf`
561 PSF Model
562 ap_corr_map : `lsst.afw.image.ApCorrMap`
563 Aperture correction map.
564 measured_src : `lsst.afw.table.SourceCatalog`
565 Updated source catalog with measurements, flags and aperture corrections.
566 """
567 # Extract footprints from the input src catalog for noise replacement.
568 footprints = SingleFrameMeasurementTask.getFootprintsFromCatalog(src)
569
570 # Apply source selector (s/n, flags, etc.)
571 good_src = self.source_selector.selectSources(src)
572 if sum(good_src.selected) == 0:
573 self.log.warning('No good sources remain after cuts for visit %d, detector %d',
574 visit, detector)
575 return None, None, None
576
577 # Cut down input src to the selected sources
578 # We use a separate schema/mapper here than for the output/measurement catalog because of
579 # clashes between fields that were previously run and those that need to be rerun with
580 # the new psf model. This may be slightly inefficient but keeps input
581 # and output values cleanly separated.
582 selection_mapper, selection_schema = self._make_selection_schema_mapper(src.schema)
583
584 selected_src = afwTable.SourceCatalog(selection_schema)
585 selected_src.reserve(good_src.selected.sum())
586 selected_src.extend(src[good_src.selected], mapper=selection_mapper)
587
588 # The calib flags have been copied from the input table,
589 # and we reset them here just to ensure they aren't propagated.
590 selected_src['calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool)
591 selected_src['calib_psf_used'] = np.zeros(len(selected_src), dtype=bool)
592 selected_src['calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool)
593
594 # Find the isolated sources and set flags
595 matched_src, matched_iso = esutil.numpy_util.match(
596 selected_src['id'],
597 isolated_source_table[self.config.id_column]
598 )
599 if len(matched_src) == 0:
600 self.log.warning(
601 "No candidates from matched isolate stars for visit=%s, detector=%s "
602 "(this is probably the result of an earlier astrometry failure).",
603 visit, detector,
604 )
605 return None, None, None
606
607 matched_arr = np.zeros(len(selected_src), dtype=bool)
608 matched_arr[matched_src] = True
609 selected_src['calib_psf_candidate'] = matched_arr
610
611 reserved_arr = np.zeros(len(selected_src), dtype=bool)
612 reserved_arr[matched_src] = isolated_source_table['reserved'][matched_iso]
613 selected_src['calib_psf_reserved'] = reserved_arr
614
615 selected_src = selected_src[selected_src['calib_psf_candidate']].copy(deep=True)
616
617 # Make the measured source catalog as well, based on the selected catalog.
618 measured_src = afwTable.SourceCatalog(self.schema)
619 measured_src.reserve(len(selected_src))
620 measured_src.extend(selected_src, mapper=self.schema_mapper)
621
622 # We need to copy over the calib_psf flags because they were not in the mapper
623 measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate']
624 measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved']
625
626 # Select the psf candidates from the selection catalog
627 try:
628 psf_selection_result = self.make_psf_candidates.run(selected_src, exposure=exposure)
629 except Exception as e:
630 self.log.exception('Failed to make PSF candidates for visit %d, detector %d: %s',
631 visit, detector, e)
632 return None, None, measured_src
633
634 psf_cand_cat = psf_selection_result.goodStarCat
635
636 # Make list of psf candidates to send to the determiner
637 # (omitting those marked as reserved)
638 psf_determiner_list = [cand for cand, use
639 in zip(psf_selection_result.psfCandidates,
640 ~psf_cand_cat['calib_psf_reserved']) if use]
641 flag_key = psf_cand_cat.schema['calib_psf_used'].asKey()
642 try:
643 psf, cell_set = self.psf_determiner.determinePsf(exposure,
644 psf_determiner_list,
645 self.metadata,
646 flagKey=flag_key)
647 except Exception as e:
648 self.log.exception('Failed to determine PSF for visit %d, detector %d: %s',
649 visit, detector, e)
650 return None, None, measured_src
651 # Verify that the PSF is usable by downstream tasks
652 sigma = psf.computeShape(psf.getAveragePosition()).getDeterminantRadius()
653 if np.isnan(sigma):
654 self.log.warning('Failed to determine psf for visit %d, detector %d: '
655 'Computed final PSF size is NAN.',
656 visit, detector)
657 return None, None, measured_src
658
659 # Set the psf in the exposure for measurement/aperture corrections.
660 exposure.setPsf(psf)
661
662 # At this point, we need to transfer the psf used flag from the selection
663 # catalog to the measurement catalog.
664 matched_selected, matched_measured = esutil.numpy_util.match(
665 selected_src['id'],
666 measured_src['id']
667 )
668 measured_used = np.zeros(len(measured_src), dtype=bool)
669 measured_used[matched_measured] = selected_src['calib_psf_used'][matched_selected]
670 measured_src['calib_psf_used'] = measured_used
671
672 # Next, we do the measurement on all the psf candidate, used, and reserved stars.
673 # We use the full footprint list from the input src catalog for noise replacement.
674 try:
675 self.measurement.run(measCat=measured_src, exposure=exposure, footprints=footprints)
676 except Exception as e:
677 self.log.warning('Failed to make measurements for visit %d, detector %d: %s',
678 visit, detector, e)
679 return psf, None, measured_src
680
681 # And finally the ap corr map.
682 try:
683 ap_corr_map = self.measure_ap_corr.run(exposure=exposure,
684 catalog=measured_src).apCorrMap
685 except Exception as e:
686 self.log.warning('Failed to compute aperture corrections for visit %d, detector %d: %s',
687 visit, detector, e)
688 return psf, None, measured_src
689
690 # Need to merge the original normalization aperture correction map.
691 ap_corr_map_input = exposure.apCorrMap
692 for key in ap_corr_map_input:
693 if key not in ap_corr_map:
694 ap_corr_map[key] = ap_corr_map_input[key]
695
696 self.apply_ap_corr.run(catalog=measured_src, apCorrMap=ap_corr_map)
697
698 return psf, ap_corr_map, measured_src
699
700
702 """Run final characterization on full visits."""
703 ConfigClass = FinalizeCharacterizationConfig
704 _DefaultName = 'finalize_characterization'
705
706 def runQuantum(self, butlerQC, inputRefs, outputRefs):
707 input_handle_dict = butlerQC.get(inputRefs)
708
709 band = butlerQC.quantum.dataId['band']
710 visit = butlerQC.quantum.dataId['visit']
711
712 src_dict_temp = {handle.dataId['detector']: handle
713 for handle in input_handle_dict['srcs']}
714 calexp_dict_temp = {handle.dataId['detector']: handle
715 for handle in input_handle_dict['calexps']}
716 isolated_star_cat_dict_temp = {handle.dataId['tract']: handle
717 for handle in input_handle_dict['isolated_star_cats']}
718 isolated_star_source_dict_temp = {handle.dataId['tract']: handle
719 for handle in input_handle_dict['isolated_star_sources']}
720
721 src_dict = {detector: src_dict_temp[detector] for
722 detector in sorted(src_dict_temp.keys())}
723 calexp_dict = {detector: calexp_dict_temp[detector] for
724 detector in sorted(calexp_dict_temp.keys())}
725 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for
726 tract in sorted(isolated_star_cat_dict_temp.keys())}
727 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
728 tract in sorted(isolated_star_source_dict_temp.keys())}
729
730 struct = self.run(
731 visit,
732 band,
733 isolated_star_cat_dict,
734 isolated_star_source_dict,
735 src_dict,
736 calexp_dict,
737 )
738
739 butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat)
740 butlerQC.put(struct.output_table, outputRefs.finalized_src_table)
741
742 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
743 """
744 Run the FinalizeCharacterizationTask.
745
746 Parameters
747 ----------
748 visit : `int`
749 Visit number. Used in the output catalogs.
750 band : `str`
751 Band name. Used to select reserved stars.
752 isolated_star_cat_dict : `dict`
753 Per-tract dict of isolated star catalog handles.
754 isolated_star_source_dict : `dict`
755 Per-tract dict of isolated star source catalog handles.
756 src_dict : `dict`
757 Per-detector dict of src catalog handles.
758 calexp_dict : `dict`
759 Per-detector dict of calibrated exposure handles.
760
761 Returns
762 -------
763 struct : `lsst.pipe.base.struct`
764 Struct with outputs for persistence.
765
766 Raises
767 ------
768 NoWorkFound
769 Raised if the selector returns no good sources.
770 """
771 # Check if we have the same inputs for each of the
772 # src_dict and calexp_dict.
773 src_detectors = set(src_dict.keys())
774 calexp_detectors = set(calexp_dict.keys())
775
776 if src_detectors != calexp_detectors:
777 detector_keys = sorted(src_detectors.intersection(calexp_detectors))
778 self.log.warning(
779 "Input src and calexp have mismatched detectors; "
780 "running intersection of %d detectors.",
781 len(detector_keys),
782 )
783 else:
784 detector_keys = sorted(src_detectors)
785
786 # We do not need the isolated star table in this task.
787 # However, it is used in tests to confirm consistency of indexes.
788 _, isolated_source_table = self.concat_isolated_star_cats(
789 band,
790 isolated_star_cat_dict,
791 isolated_star_source_dict
792 )
793
794 if isolated_source_table is None:
795 raise pipeBase.NoWorkFound(f'No good isolated sources found for any detectors in visit {visit}')
796
797 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
798 exposure_cat_schema.addField('visit', type='L', doc='Visit number')
799
800 metadata = dafBase.PropertyList()
801 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
802 metadata.add("COMMENT", "Only detectors with data have entries.")
803
804 psf_ap_corr_cat = afwTable.ExposureCatalog(exposure_cat_schema)
805 psf_ap_corr_cat.setMetadata(metadata)
806
807 measured_src_tables = []
808 measured_src_table = None
809
810 self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys))
811 for detector in detector_keys:
812 self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)
813 src = src_dict[detector].get()
814 exposure = calexp_dict[detector].get()
815
816 psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
817 visit,
818 detector,
819 exposure,
820 src,
821 isolated_source_table
822 )
823
824 # And now we package it together...
825 if measured_src is not None:
826 record = psf_ap_corr_cat.addNew()
827 record['id'] = int(detector)
828 record['visit'] = visit
829 if psf is not None:
830 record.setPsf(psf)
831 if ap_corr_map is not None:
832 record.setApCorrMap(ap_corr_map)
833
834 measured_src['visit'][:] = visit
835 measured_src['detector'][:] = detector
836
837 measured_src_tables.append(measured_src.asAstropy())
838
839 if len(measured_src_tables) > 0:
840 measured_src_table = astropy.table.vstack(measured_src_tables, join_type='exact')
841
842 if measured_src_table is None:
843 raise pipeBase.NoWorkFound(f'No good sources found for any detectors in visit {visit}')
844
845 return pipeBase.Struct(
846 psf_ap_corr_cat=psf_ap_corr_cat,
847 output_table=measured_src_table,
848 )
849
850
852 """Run final characterization per detector."""
853 ConfigClass = FinalizeCharacterizationDetectorConfig
854 _DefaultName = 'finalize_characterization_detector'
855
856 def runQuantum(self, butlerQC, inputRefs, outputRefs):
857 input_handle_dict = butlerQC.get(inputRefs)
858
859 band = butlerQC.quantum.dataId['band']
860 visit = butlerQC.quantum.dataId['visit']
861 detector = butlerQC.quantum.dataId['detector']
862
863 isolated_star_cat_dict_temp = {handle.dataId['tract']: handle
864 for handle in input_handle_dict['isolated_star_cats']}
865 isolated_star_source_dict_temp = {handle.dataId['tract']: handle
866 for handle in input_handle_dict['isolated_star_sources']}
867
868 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for
869 tract in sorted(isolated_star_cat_dict_temp.keys())}
870 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
871 tract in sorted(isolated_star_source_dict_temp.keys())}
872
873 struct = self.run(
874 visit,
875 band,
876 detector,
877 isolated_star_cat_dict,
878 isolated_star_source_dict,
879 input_handle_dict['src'],
880 input_handle_dict['calexp'],
881 )
882
883 butlerQC.put(
884 struct.psf_ap_corr_cat,
885 outputRefs.finalized_psf_ap_corr_detector_cat,
886 )
887 butlerQC.put(
888 struct.output_table,
889 outputRefs.finalized_src_detector_table,
890 )
891
892 def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, src, exposure):
893 """
894 Run the FinalizeCharacterizationDetectorTask.
895
896 Parameters
897 ----------
898 visit : `int`
899 Visit number. Used in the output catalogs.
900 band : `str`
901 Band name. Used to select reserved stars.
902 detector : `int`
903 Detector number.
904 isolated_star_cat_dict : `dict`
905 Per-tract dict of isolated star catalog handles.
906 isolated_star_source_dict : `dict`
907 Per-tract dict of isolated star source catalog handles.
908 src : `lsst.afw.table.SourceCatalog`
909 Src catalog.
910 exposure : `lsst.afw.image.Exposure`
911 Calexp exposure.
912
913 Returns
914 -------
915 struct : `lsst.pipe.base.struct`
916 Struct with outputs for persistence.
917
918 Raises
919 ------
920 NoWorkFound
921 Raised if the selector returns no good sources.
922 """
923 # We do not need the isolated star table in this task.
924 # However, it is used in tests to confirm consistency of indexes.
925 _, isolated_source_table = self.concat_isolated_star_cats(
926 band,
927 isolated_star_cat_dict,
928 isolated_star_source_dict
929 )
930
931 if isolated_source_table is None:
932 raise pipeBase.NoWorkFound(f'No good isolated sources found for any detectors in visit {visit}')
933
934 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
935 exposure_cat_schema.addField('visit', type='L', doc='Visit number')
936
937 metadata = dafBase.PropertyList()
938 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
939 metadata.add("COMMENT", "Only one detector with data has an entry.")
940
941 psf_ap_corr_cat = afwTable.ExposureCatalog(exposure_cat_schema)
942 psf_ap_corr_cat.setMetadata(metadata)
943
944 self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)
945
946 psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
947 visit,
948 detector,
949 exposure,
950 src,
951 isolated_source_table,
952 )
953
954 # And now we package it together...
955 measured_src_table = None
956 if measured_src is not None:
957 record = psf_ap_corr_cat.addNew()
958 record['id'] = int(detector)
959 record['visit'] = visit
960 if psf is not None:
961 record.setPsf(psf)
962 if ap_corr_map is not None:
963 record.setApCorrMap(ap_corr_map)
964
965 measured_src['visit'][:] = visit
966 measured_src['detector'][:] = detector
967
968 measured_src_table = measured_src.asAstropy()
969
970 if measured_src_table is None:
971 raise pipeBase.NoWorkFound(f'No good sources found for visit {visit} / detector {detector}')
972
973 return pipeBase.Struct(
974 psf_ap_corr_cat=psf_ap_corr_cat,
975 output_table=measured_src_table,
976 )
977
978
980 pipeBase.PipelineTaskConnections,
981 dimensions=('instrument', 'visit',),
982):
983 finalized_psf_ap_corr_detector_cats = pipeBase.connectionTypes.Input(
984 doc='Per-visit/per-detector finalized psf models and aperture corrections.',
985 name='finalized_psf_ap_corr_detector_catalog',
986 storageClass='ExposureCatalog',
987 dimensions=('instrument', 'visit', 'detector'),
988 multiple=True,
989 deferLoad=True,
990 )
991 finalized_src_detector_tables = pipeBase.connectionTypes.Input(
992 doc=('Per-visit/per-detector catalog of measurements for psf/flag/etc.'),
993 name='finalized_src_detector_table',
994 storageClass='ArrowAstropy',
995 dimensions=('instrument', 'visit', 'detector'),
996 multiple=True,
997 deferLoad=True,
998 )
999 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
1000 doc=('Per-visit finalized psf models and aperture corrections. This '
1001 'catalog uses detector id for the id and are sorted for fast '
1002 'lookups of a detector.'),
1003 name='finalized_psf_ap_corr_catalog',
1004 storageClass='ExposureCatalog',
1005 dimensions=('instrument', 'visit'),
1006 )
1007 finalized_src_table = pipeBase.connectionTypes.Output(
1008 doc=('Per-visit catalog of measurements for psf/flag/etc.'),
1009 name='finalized_src_table',
1010 storageClass='ArrowAstropy',
1011 dimensions=('instrument', 'visit'),
1012 )
1013
1014
1016 pipeBase.PipelineTaskConfig,
1017 pipelineConnections=ConsolidateFinalizeCharacterizationDetectorConnections,
1018):
1019 pass
1020
1021
1023 """Consolidate per-detector finalize characterization catalogs."""
1024 ConfigClass = ConsolidateFinalizeCharacterizationDetectorConfig
1025 _DefaultName = 'consolidate_finalize_characterization_detector'
1026
1027 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1028 input_handle_dict = butlerQC.get(inputRefs)
1029
1030 psf_ap_corr_detector_dict_temp = {
1031 handle.dataId['detector']: handle
1032 for handle in input_handle_dict['finalized_psf_ap_corr_detector_cats']
1033 }
1034 src_detector_table_dict_temp = {
1035 handle.dataId['detector']: handle
1036 for handle in input_handle_dict['finalized_src_detector_tables']
1037 }
1038
1039 psf_ap_corr_detector_dict = {
1040 detector: psf_ap_corr_detector_dict_temp[detector]
1041 for detector in sorted(psf_ap_corr_detector_dict_temp.keys())
1042 }
1043 src_detector_table_dict = {
1044 detector: src_detector_table_dict_temp[detector]
1045 for detector in sorted(src_detector_table_dict_temp.keys())
1046 }
1047
1048 result = self.run(
1049 psf_ap_corr_detector_dict=psf_ap_corr_detector_dict,
1050 src_detector_table_dict=src_detector_table_dict,
1051 )
1052
1053 butlerQC.put(result.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat)
1054 butlerQC.put(result.output_table, outputRefs.finalized_src_table)
1055
1056 def run(self, *, psf_ap_corr_detector_dict, src_detector_table_dict):
1057 """
1058 Run the ConsolidateFinalizeCharacterizationDetectorTask.
1059
1060 Parameters
1061 ----------
1062 psf_ap_corr_detector_dict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`]
1063 Dictionary of input exposure catalogs, keyed by detector id.
1064 src_detector_table_dict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`]
1065 Dictionary of input source tables, keyed by detector id.
1066
1067 Returns
1068 -------
1069 result : `lsst.pipe.base.struct`
1070 Struct with the following outputs:
1071 ``psf_ap_corr_cat``: Consolidated exposure catalog
1072 ``src_table``: Consolidated source table.
1073 """
1074 if not len(psf_ap_corr_detector_dict):
1075 raise pipeBase.NoWorkFound("No inputs found.")
1076
1077 if not np.all(
1078 np.asarray(psf_ap_corr_detector_dict.keys())
1079 == np.asarray(src_detector_table_dict.keys())
1080 ):
1081 raise ValueError(
1082 "Input psf_ap_corr_detector_dict and src_detector_table_dict must have the same keys",
1083 )
1084
1085 psf_ap_corr_cat = None
1086 for detector_id, handle in psf_ap_corr_detector_dict.items():
1087 if psf_ap_corr_cat is None:
1088 psf_ap_corr_cat = handle.get()
1089 else:
1090 psf_ap_corr_cat.append(handle.get().find(detector_id))
1091
1092 # Make sure it is a contiguous catalog.
1093 psf_ap_corr_cat = psf_ap_corr_cat.copy(deep=True)
1094
1095 src_table = TableVStack.vstack_handles(src_detector_table_dict.values())
1096
1097 return pipeBase.Struct(
1098 psf_ap_corr_cat=psf_ap_corr_cat,
1099 output_table=src_table,
1100 )
Mapping class that holds aliases for a Schema.
Definition AliasMap.h:36
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, src, exposure)
concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict)
compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table)
run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict)