LSST Applications g0fba68d861+5616995c1c,g1ebb85f214+2420ccdea7,g1fd858c14a+44c57a1f81,g21d47ad084+8e51fce9ac,g262e1987ae+1a7d68eb3b,g2cef7863aa+3bd8df3d95,g35bb328faa+fcb1d3bbc8,g36ff55ed5b+2420ccdea7,g47891489e3+5c6313fe9a,g53246c7159+fcb1d3bbc8,g646c943bdb+dbb9921566,g67b6fd64d1+5c6313fe9a,g6bd32b75b5+2420ccdea7,g74acd417e5+37fc0c974d,g786e29fd12+cf7ec2a62a,g86c591e316+6e13bcb9e9,g87389fa792+1e0a283bba,g89139ef638+5c6313fe9a,g90f42f885a+fce05a46d3,g9125e01d80+fcb1d3bbc8,g93e38de9ac+5345a64125,g95a1e89356+47d08a1cc6,g97be763408+bba861c665,ga9e4eb89a6+85210110a1,gb0b61e0e8e+1f27f70249,gb58c049af0+f03b321e39,gb89ab40317+5c6313fe9a,gc4e39d7843+4e09c98c3d,gd16ba4ae74+5402bcf54a,gd8ff7fe66e+2420ccdea7,gd9a9a58781+fcb1d3bbc8,gdab6d2f7ff+37fc0c974d,gde280f09ee+604b327636,ge278dab8ac+50e2446c94,ge410e46f29+5c6313fe9a,gef3c2e6661+6b480e0fb7,gf67bdafdda+5c6313fe9a,gffca2db377+fcb1d3bbc8,v29.2.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
finalizeCharacterization.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Task to run a finalized image characterization, using additional data.
23"""
24
25__all__ = ['FinalizeCharacterizationConnections',
26 'FinalizeCharacterizationConfig',
27 'FinalizeCharacterizationTask']
28
29import astropy.table
30import numpy as np
31import esutil
32
33
34import lsst.pex.config as pexConfig
35import lsst.pipe.base as pipeBase
36import lsst.daf.base as dafBase
37import lsst.afw.table as afwTable
38import lsst.meas.algorithms as measAlg
40from lsst.meas.algorithms import MeasureApCorrTask
41from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask
42from lsst.meas.algorithms.sourceSelector import sourceSelectorRegistry
43
44from .reserveIsolatedStars import ReserveIsolatedStarsTask
45
46
47class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
48 dimensions=('instrument', 'visit',),
49 defaultTemplates={}):
50 src_schema = pipeBase.connectionTypes.InitInput(
51 doc='Input schema used for src catalogs.',
52 name='src_schema',
53 storageClass='SourceCatalog',
54 )
55 srcs = pipeBase.connectionTypes.Input(
56 doc='Source catalogs for the visit',
57 name='src',
58 storageClass='SourceCatalog',
59 dimensions=('instrument', 'visit', 'detector'),
60 deferLoad=True,
61 multiple=True,
62 deferGraphConstraint=True,
63 )
64 calexps = pipeBase.connectionTypes.Input(
65 doc='Calexps for the visit',
66 name='calexp',
67 storageClass='ExposureF',
68 dimensions=('instrument', 'visit', 'detector'),
69 deferLoad=True,
70 multiple=True,
71 )
72 isolated_star_cats = pipeBase.connectionTypes.Input(
73 doc=('Catalog of isolated stars with average positions, number of associated '
74 'sources, and indexes to the isolated_star_sources catalogs.'),
75 name='isolated_star_presource_associations',
76 storageClass='ArrowAstropy',
77 dimensions=('instrument', 'tract', 'skymap'),
78 deferLoad=True,
79 multiple=True,
80 )
81 isolated_star_sources = pipeBase.connectionTypes.Input(
82 doc=('Catalog of isolated star sources with sourceIds, and indexes to the '
83 'isolated_star_cats catalogs.'),
84 name='isolated_star_presources',
85 storageClass='ArrowAstropy',
86 dimensions=('instrument', 'tract', 'skymap'),
87 deferLoad=True,
88 multiple=True,
89 )
90 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
91 doc=('Per-visit finalized psf models and aperture corrections. This '
92 'catalog uses detector id for the id and are sorted for fast '
93 'lookups of a detector.'),
94 name='finalized_psf_ap_corr_catalog',
95 storageClass='ExposureCatalog',
96 dimensions=('instrument', 'visit'),
97 )
98 finalized_src_table = pipeBase.connectionTypes.Output(
99 doc=('Per-visit catalog of measurements for psf/flag/etc.'),
100 name='finalized_src_table',
101 storageClass='ArrowAstropy',
102 dimensions=('instrument', 'visit'),
103 )
104
105
106class FinalizeCharacterizationConfig(pipeBase.PipelineTaskConfig,
107 pipelineConnections=FinalizeCharacterizationConnections):
108 """Configuration for FinalizeCharacterizationTask."""
109 source_selector = sourceSelectorRegistry.makeField(
110 doc="How to select sources",
111 default="science"
112 )
113 id_column = pexConfig.Field(
114 doc='Name of column in isolated_star_sources with source id.',
115 dtype=str,
116 default='sourceId',
117 )
118 reserve_selection = pexConfig.ConfigurableField(
119 target=ReserveIsolatedStarsTask,
120 doc='Task to select reserved stars',
121 )
122 make_psf_candidates = pexConfig.ConfigurableField(
123 target=measAlg.MakePsfCandidatesTask,
124 doc='Task to make psf candidates from selected stars.',
125 )
126 psf_determiner = measAlg.psfDeterminerRegistry.makeField(
127 'PSF Determination algorithm',
128 default='piff'
129 )
130 measurement = pexConfig.ConfigurableField(
131 target=SingleFrameMeasurementTask,
132 doc='Measure sources for aperture corrections'
133 )
134 measure_ap_corr = pexConfig.ConfigurableField(
135 target=MeasureApCorrTask,
136 doc="Subtask to measure aperture corrections"
137 )
138 apply_ap_corr = pexConfig.ConfigurableField(
139 target=ApplyApCorrTask,
140 doc="Subtask to apply aperture corrections"
141 )
142
143 def setDefaults(self):
144 super().setDefaults()
145
146 source_selector = self.source_selector['science']
147 source_selector.setDefaults()
148
149 # We use the source selector only to select out flagged objects
150 # and signal-to-noise. Isolated, unresolved sources are handled
151 # by the isolated star catalog.
152
153 source_selector.doFlags = True
154 source_selector.doSignalToNoise = True
155 source_selector.doFluxLimit = False
156 source_selector.doUnresolved = False
157 source_selector.doIsolated = False
158
159 source_selector.signalToNoise.minimum = 50.0
160 source_selector.signalToNoise.maximum = 1000.0
161
162 source_selector.signalToNoise.fluxField = 'base_GaussianFlux_instFlux'
163 source_selector.signalToNoise.errField = 'base_GaussianFlux_instFluxErr'
164
165 source_selector.flags.bad = ['base_PixelFlags_flag_edge',
166 'base_PixelFlags_flag_nodata',
167 'base_PixelFlags_flag_interpolatedCenter',
168 'base_PixelFlags_flag_saturatedCenter',
169 'base_PixelFlags_flag_crCenter',
170 'base_PixelFlags_flag_bad',
171 'base_PixelFlags_flag_interpolated',
172 'base_PixelFlags_flag_saturated',
173 'slot_Centroid_flag',
174 'base_GaussianFlux_flag']
175
176 # Configure aperture correction to select only high s/n sources (that
177 # were used in the psf modeling) to avoid background problems when
178 # computing the aperture correction map.
179 self.measure_ap_corr.sourceSelector = 'science'
180
181 ap_selector = self.measure_ap_corr.sourceSelector['science']
182 # We do not need to filter flags or unresolved because we have used
183 # the filtered isolated stars as an input
184 ap_selector.doFlags = False
185 ap_selector.doUnresolved = False
186
187 import lsst.meas.modelfit # noqa: F401
188 import lsst.meas.extensions.photometryKron # noqa: F401
189 import lsst.meas.extensions.convolved # noqa: F401
190 import lsst.meas.extensions.gaap # noqa: F401
191 import lsst.meas.extensions.shapeHSM # noqa: F401
192
193 # Set up measurement defaults
194 self.measurement.plugins.names = [
195 'base_FPPosition',
196 'base_PsfFlux',
197 'base_GaussianFlux',
198 'modelfit_DoubleShapeletPsfApprox',
199 'modelfit_CModel',
200 'ext_photometryKron_KronFlux',
201 'ext_convolved_ConvolvedFlux',
202 'ext_gaap_GaapFlux',
203 'ext_shapeHSM_HsmShapeRegauss',
204 'ext_shapeHSM_HsmSourceMoments',
205 'ext_shapeHSM_HsmPsfMoments',
206 'ext_shapeHSM_HsmSourceMomentsRound',
207 'ext_shapeHSM_HigherOrderMomentsSource',
208 'ext_shapeHSM_HigherOrderMomentsPSF',
209 ]
210 self.measurement.slots.modelFlux = 'modelfit_CModel'
211 self.measurement.plugins['ext_convolved_ConvolvedFlux'].seeing.append(8.0)
212 self.measurement.plugins['ext_gaap_GaapFlux'].sigmas = [
213 0.5,
214 0.7,
215 1.0,
216 1.5,
217 2.5,
218 3.0
219 ]
220 self.measurement.plugins['ext_gaap_GaapFlux'].doPsfPhotometry = True
221 self.measurement.slots.shape = 'ext_shapeHSM_HsmSourceMoments'
222 self.measurement.slots.psfShape = 'ext_shapeHSM_HsmPsfMoments'
223 self.measurement.plugins['ext_shapeHSM_HsmShapeRegauss'].deblendNChild = ""
224
225 # TODO: Remove in DM-44658, streak masking to happen only in ip_diffim
226 # Keep track of which footprints contain streaks
227 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['STREAK']
228 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['STREAK']
229
230 # Turn off slot setting for measurement for centroid and shape
231 # (for which we use the input src catalog measurements)
232 self.measurement.slots.centroid = None
233 self.measurement.slots.apFlux = None
234 self.measurement.slots.calibFlux = None
235
236 names = self.measurement.plugins['ext_convolved_ConvolvedFlux'].getAllResultNames()
237 self.measure_ap_corr.allowFailure += names
238 names = self.measurement.plugins["ext_gaap_GaapFlux"].getAllGaapResultNames()
239 self.measure_ap_corr.allowFailure += names
240
241
242class FinalizeCharacterizationTask(pipeBase.PipelineTask):
243 """Run final characterization on exposures."""
244 ConfigClass = FinalizeCharacterizationConfig
245 _DefaultName = 'finalize_characterization'
246
247 def __init__(self, initInputs=None, **kwargs):
248 super().__init__(initInputs=initInputs, **kwargs)
249
251 initInputs['src_schema'].schema
252 )
253
254 self.makeSubtask('reserve_selection')
255 self.makeSubtask('source_selector')
256 self.makeSubtask('make_psf_candidates')
257 self.makeSubtask('psf_determiner')
258 self.makeSubtask('measurement', schema=self.schema)
259 self.makeSubtask('measure_ap_corr', schema=self.schema)
260 self.makeSubtask('apply_ap_corr', schema=self.schema)
261
262 # Only log warning and fatal errors from the source_selector
263 self.source_selector.log.setLevel(self.source_selector.log.WARN)
264
265 def runQuantum(self, butlerQC, inputRefs, outputRefs):
266 input_handle_dict = butlerQC.get(inputRefs)
267
268 band = butlerQC.quantum.dataId['band']
269 visit = butlerQC.quantum.dataId['visit']
270
271 src_dict_temp = {handle.dataId['detector']: handle
272 for handle in input_handle_dict['srcs']}
273 calexp_dict_temp = {handle.dataId['detector']: handle
274 for handle in input_handle_dict['calexps']}
275 isolated_star_cat_dict_temp = {handle.dataId['tract']: handle
276 for handle in input_handle_dict['isolated_star_cats']}
277 isolated_star_source_dict_temp = {handle.dataId['tract']: handle
278 for handle in input_handle_dict['isolated_star_sources']}
279 # TODO: Sort until DM-31701 is done and we have deterministic
280 # dataset ordering.
281 src_dict = {detector: src_dict_temp[detector] for
282 detector in sorted(src_dict_temp.keys())}
283 calexp_dict = {detector: calexp_dict_temp[detector] for
284 detector in sorted(calexp_dict_temp.keys())}
285 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for
286 tract in sorted(isolated_star_cat_dict_temp.keys())}
287 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
288 tract in sorted(isolated_star_source_dict_temp.keys())}
289
290 struct = self.run(visit,
291 band,
292 isolated_star_cat_dict,
293 isolated_star_source_dict,
294 src_dict,
295 calexp_dict)
296
297 butlerQC.put(struct.psf_ap_corr_cat,
298 outputRefs.finalized_psf_ap_corr_cat)
299 butlerQC.put(astropy.table.Table(struct.output_table),
300 outputRefs.finalized_src_table)
301
302 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
303 """
304 Run the FinalizeCharacterizationTask.
305
306 Parameters
307 ----------
308 visit : `int`
309 Visit number. Used in the output catalogs.
310 band : `str`
311 Band name. Used to select reserved stars.
312 isolated_star_cat_dict : `dict`
313 Per-tract dict of isolated star catalog handles.
314 isolated_star_source_dict : `dict`
315 Per-tract dict of isolated star source catalog handles.
316 src_dict : `dict`
317 Per-detector dict of src catalog handles.
318 calexp_dict : `dict`
319 Per-detector dict of calibrated exposure handles.
320
321 Returns
322 -------
323 struct : `lsst.pipe.base.struct`
324 Struct with outputs for persistence.
325
326 Raises
327 ------
328 NoWorkFound
329 Raised if the selector returns no good sources.
330 """
331 # Check if we have the same inputs for each of the
332 # src_dict and calexp_dict.
333 src_detectors = set(src_dict.keys())
334 calexp_detectors = set(calexp_dict.keys())
335
336 if src_detectors != calexp_detectors:
337 detector_keys = sorted(src_detectors.intersection(calexp_detectors))
338 self.log.warning(
339 "Input src and calexp have mismatched detectors; "
340 "running intersection of %d detectors.",
341 len(detector_keys),
342 )
343 else:
344 detector_keys = sorted(src_detectors)
345
346 # We do not need the isolated star table in this task.
347 # However, it is used in tests to confirm consistency of indexes.
348 _, isolated_source_table = self.concat_isolated_star_cats(
349 band,
350 isolated_star_cat_dict,
351 isolated_star_source_dict
352 )
353
354 if isolated_source_table is None:
355 raise pipeBase.NoWorkFound(f'No good isolated sources found for any detectors in visit {visit}')
356
357 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
358 exposure_cat_schema.addField('visit', type='L', doc='Visit number')
359
360 metadata = dafBase.PropertyList()
361 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
362 metadata.add("COMMENT", "Only detectors with data have entries.")
363
364 psf_ap_corr_cat = afwTable.ExposureCatalog(exposure_cat_schema)
365 psf_ap_corr_cat.setMetadata(metadata)
366
367 measured_src_tables = []
368 measured_src_table = None
369
370 self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys))
371 for detector in detector_keys:
372 self.log.info("Starting finalizeCharacterization on detector ID %d.", detector)
373 src = src_dict[detector].get()
374 exposure = calexp_dict[detector].get()
375
376 psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
377 visit,
378 detector,
379 exposure,
380 src,
381 isolated_source_table
382 )
383
384 # And now we package it together...
385 if measured_src is not None:
386 record = psf_ap_corr_cat.addNew()
387 record['id'] = int(detector)
388 record['visit'] = visit
389 if psf is not None:
390 record.setPsf(psf)
391 if ap_corr_map is not None:
392 record.setApCorrMap(ap_corr_map)
393
394 measured_src['visit'][:] = visit
395 measured_src['detector'][:] = detector
396
397 measured_src_tables.append(measured_src.asAstropy().as_array())
398
399 if len(measured_src_tables) > 0:
400 measured_src_table = np.concatenate(measured_src_tables)
401
402 if measured_src_table is None:
403 raise pipeBase.NoWorkFound(f'No good sources found for any detectors in visit {visit}')
404
405 return pipeBase.Struct(psf_ap_corr_cat=psf_ap_corr_cat,
406 output_table=measured_src_table)
407
408 def _make_output_schema_mapper(self, input_schema):
409 """Make the schema mapper from the input schema to the output schema.
410
411 Parameters
412 ----------
413 input_schema : `lsst.afw.table.Schema`
414 Input schema.
415
416 Returns
417 -------
418 mapper : `lsst.afw.table.SchemaMapper`
419 Schema mapper
420 output_schema : `lsst.afw.table.Schema`
421 Output schema (with alias map)
422 """
423 mapper = afwTable.SchemaMapper(input_schema)
424 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema())
425 mapper.addMapping(input_schema['slot_Centroid_x'].asKey())
426 mapper.addMapping(input_schema['slot_Centroid_y'].asKey())
427
428 # The aperture fields may be used by the psf determiner.
429 aper_fields = input_schema.extract('base_CircularApertureFlux_*')
430 for field, item in aper_fields.items():
431 mapper.addMapping(item.key)
432
433 # The following two may be redundant, but then the mapping is a no-op.
434 # Note that the slot_CalibFlux mapping will copy over any
435 # normalized compensated fluxes that are used for calibration.
436 apflux_fields = input_schema.extract('slot_ApFlux_*')
437 for field, item in apflux_fields.items():
438 mapper.addMapping(item.key)
439
440 calibflux_fields = input_schema.extract('slot_CalibFlux_*')
441 for field, item in calibflux_fields.items():
442 mapper.addMapping(item.key)
443
444 mapper.addMapping(
445 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(),
446 'calib_psf_selection_flux')
447 mapper.addMapping(
448 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(),
449 'calib_psf_selection_flux_err')
450
451 output_schema = mapper.getOutputSchema()
452
453 output_schema.addField(
454 'calib_psf_candidate',
455 type='Flag',
456 doc=('set if the source was a candidate for PSF determination, '
457 'as determined from FinalizeCharacterizationTask.'),
458 )
459 output_schema.addField(
460 'calib_psf_reserved',
461 type='Flag',
462 doc=('set if source was reserved from PSF determination by '
463 'FinalizeCharacterizationTask.'),
464 )
465 output_schema.addField(
466 'calib_psf_used',
467 type='Flag',
468 doc=('set if source was used in the PSF determination by '
469 'FinalizeCharacterizationTask.'),
470 )
471 output_schema.addField(
472 'visit',
473 type=np.int64,
474 doc='Visit number for the sources.',
475 )
476 output_schema.addField(
477 'detector',
478 type=np.int32,
479 doc='Detector number for the sources.',
480 )
481
482 alias_map = input_schema.getAliasMap()
483 alias_map_output = afwTable.AliasMap()
484 alias_map_output.set('slot_Centroid', alias_map.get('slot_Centroid'))
485 alias_map_output.set('slot_ApFlux', alias_map.get('slot_ApFlux'))
486 alias_map_output.set('slot_CalibFlux', alias_map.get('slot_CalibFlux'))
487
488 output_schema.setAliasMap(alias_map_output)
489
490 return mapper, output_schema
491
492 def _make_selection_schema_mapper(self, input_schema):
493 """Make the schema mapper from the input schema to the selection schema.
494
495 Parameters
496 ----------
497 input_schema : `lsst.afw.table.Schema`
498 Input schema.
499
500 Returns
501 -------
502 mapper : `lsst.afw.table.SchemaMapper`
503 Schema mapper
504 selection_schema : `lsst.afw.table.Schema`
505 Selection schema (with alias map)
506 """
507 mapper = afwTable.SchemaMapper(input_schema)
508 mapper.addMinimalSchema(input_schema)
509
510 selection_schema = mapper.getOutputSchema()
511
512 selection_schema.setAliasMap(input_schema.getAliasMap())
513
514 return mapper, selection_schema
515
516 def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict):
517 """
518 Concatenate isolated star catalogs and make reserve selection.
519
520 Parameters
521 ----------
522 band : `str`
523 Band name. Used to select reserved stars.
524 isolated_star_cat_dict : `dict`
525 Per-tract dict of isolated star catalog handles.
526 isolated_star_source_dict : `dict`
527 Per-tract dict of isolated star source catalog handles.
528
529 Returns
530 -------
531 isolated_table : `np.ndarray` (N,)
532 Table of isolated stars, with indexes to isolated sources.
533 Returns None if there are no usable isolated catalogs.
534 isolated_source_table : `np.ndarray` (M,)
535 Table of isolated sources, with indexes to isolated stars.
536 Returns None if there are no usable isolated catalogs.
537 """
538 isolated_tables = []
539 isolated_sources = []
540 merge_cat_counter = 0
541 merge_source_counter = 0
542
543 for tract in isolated_star_cat_dict:
544 astropy_cat = isolated_star_cat_dict[tract].get()
545 table_cat = np.asarray(astropy_cat)
546
547 astropy_source = isolated_star_source_dict[tract].get(
548 parameters={'columns': [self.config.id_column, 'obj_index']}
549 )
550 table_source = np.asarray(astropy_source)
551
552 # Cut isolated star table to those observed in this band, and adjust indexes
553 (use_band,) = (table_cat[f'nsource_{band}'] > 0).nonzero()
554
555 if len(use_band) == 0:
556 # There are no sources in this band in this tract.
557 self.log.info("No sources found in %s band in tract %d.", band, tract)
558 continue
559
560 # With the following matching:
561 # table_source[b] <-> table_cat[use_band[a]]
562 obj_index = table_source['obj_index'][:]
563 a, b = esutil.numpy_util.match(use_band, obj_index)
564
565 # Update indexes and cut to band-selected stars/sources
566 table_source['obj_index'][b] = a
567 _, index_new = np.unique(a, return_index=True)
568 table_cat[f'source_cat_index_{band}'][use_band] = index_new
569
570 # After the following cuts, the catalogs have the following properties:
571 # - table_cat only contains isolated stars that have at least one source
572 # in ``band``.
573 # - table_source only contains ``band`` sources.
574 # - The slice table_cat["source_cat_index_{band}"]: table_cat["source_cat_index_{band}"]
575 # + table_cat["nsource_{band}]
576 # applied to table_source will give all the sources associated with the star.
577 # - For each source, table_source["obj_index"] points to the index of the associated
578 # isolated star.
579 table_source = table_source[b]
580 table_cat = table_cat[use_band]
581
582 # Add reserved flag column to tables
583 table_cat = np.lib.recfunctions.append_fields(
584 table_cat,
585 'reserved',
586 np.zeros(table_cat.size, dtype=bool),
587 usemask=False
588 )
589 table_source = np.lib.recfunctions.append_fields(
590 table_source,
591 'reserved',
592 np.zeros(table_source.size, dtype=bool),
593 usemask=False
594 )
595
596 # Get reserve star flags
597 table_cat['reserved'][:] = self.reserve_selection.run(
598 len(table_cat),
599 extra=f'{band}_{tract}',
600 )
601 table_source['reserved'][:] = table_cat['reserved'][table_source['obj_index']]
602
603 # Offset indexes to account for tract merging
604 table_cat[f'source_cat_index_{band}'] += merge_source_counter
605 table_source['obj_index'] += merge_cat_counter
606
607 isolated_tables.append(table_cat)
608 isolated_sources.append(table_source)
609
610 merge_cat_counter += len(table_cat)
611 merge_source_counter += len(table_source)
612
613 if len(isolated_tables) > 0:
614 isolated_table = np.concatenate(isolated_tables)
615 isolated_source_table = np.concatenate(isolated_sources)
616 else:
617 isolated_table = None
618 isolated_source_table = None
619
620 return isolated_table, isolated_source_table
621
622 def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table):
623 """Compute psf model and aperture correction map for a single exposure.
624
625 Parameters
626 ----------
627 visit : `int`
628 Visit number (for logging).
629 detector : `int`
630 Detector number (for logging).
631 exposure : `lsst.afw.image.ExposureF`
632 src : `lsst.afw.table.SourceCatalog`
633 isolated_source_table : `np.ndarray`
634
635 Returns
636 -------
637 psf : `lsst.meas.algorithms.ImagePsf`
638 PSF Model
639 ap_corr_map : `lsst.afw.image.ApCorrMap`
640 Aperture correction map.
641 measured_src : `lsst.afw.table.SourceCatalog`
642 Updated source catalog with measurements, flags and aperture corrections.
643 """
644 # Extract footprints from the input src catalog for noise replacement.
645 footprints = SingleFrameMeasurementTask.getFootprintsFromCatalog(src)
646
647 # Apply source selector (s/n, flags, etc.)
648 good_src = self.source_selector.selectSources(src)
649 if sum(good_src.selected) == 0:
650 self.log.warning('No good sources remain after cuts for visit %d, detector %d',
651 visit, detector)
652 return None, None, None
653
654 # Cut down input src to the selected sources
655 # We use a separate schema/mapper here than for the output/measurement catalog because of
656 # clashes between fields that were previously run and those that need to be rerun with
657 # the new psf model. This may be slightly inefficient but keeps input
658 # and output values cleanly separated.
659 selection_mapper, selection_schema = self._make_selection_schema_mapper(src.schema)
660
661 selected_src = afwTable.SourceCatalog(selection_schema)
662 selected_src.reserve(good_src.selected.sum())
663 selected_src.extend(src[good_src.selected], mapper=selection_mapper)
664
665 # The calib flags have been copied from the input table,
666 # and we reset them here just to ensure they aren't propagated.
667 selected_src['calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool)
668 selected_src['calib_psf_used'] = np.zeros(len(selected_src), dtype=bool)
669 selected_src['calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool)
670
671 # Find the isolated sources and set flags
672 matched_src, matched_iso = esutil.numpy_util.match(
673 selected_src['id'],
674 isolated_source_table[self.config.id_column]
675 )
676 if len(matched_src) == 0:
677 self.log.warning(
678 "No candidates from matched isolate stars for visit=%s, detector=%s "
679 "(this is probably the result of an earlier astrometry failure).",
680 visit, detector,
681 )
682 return None, None, None
683
684 matched_arr = np.zeros(len(selected_src), dtype=bool)
685 matched_arr[matched_src] = True
686 selected_src['calib_psf_candidate'] = matched_arr
687
688 reserved_arr = np.zeros(len(selected_src), dtype=bool)
689 reserved_arr[matched_src] = isolated_source_table['reserved'][matched_iso]
690 selected_src['calib_psf_reserved'] = reserved_arr
691
692 selected_src = selected_src[selected_src['calib_psf_candidate']].copy(deep=True)
693
694 # Make the measured source catalog as well, based on the selected catalog.
695 measured_src = afwTable.SourceCatalog(self.schema)
696 measured_src.reserve(len(selected_src))
697 measured_src.extend(selected_src, mapper=self.schema_mapper)
698
699 # We need to copy over the calib_psf flags because they were not in the mapper
700 measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate']
701 measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved']
702
703 # Select the psf candidates from the selection catalog
704 try:
705 psf_selection_result = self.make_psf_candidates.run(selected_src, exposure=exposure)
706 except Exception as e:
707 self.log.exception('Failed to make PSF candidates for visit %d, detector %d: %s',
708 visit, detector, e)
709 return None, None, measured_src
710
711 psf_cand_cat = psf_selection_result.goodStarCat
712
713 # Make list of psf candidates to send to the determiner
714 # (omitting those marked as reserved)
715 psf_determiner_list = [cand for cand, use
716 in zip(psf_selection_result.psfCandidates,
717 ~psf_cand_cat['calib_psf_reserved']) if use]
718 flag_key = psf_cand_cat.schema['calib_psf_used'].asKey()
719 try:
720 psf, cell_set = self.psf_determiner.determinePsf(exposure,
721 psf_determiner_list,
722 self.metadata,
723 flagKey=flag_key)
724 except Exception as e:
725 self.log.exception('Failed to determine PSF for visit %d, detector %d: %s',
726 visit, detector, e)
727 return None, None, measured_src
728 # Verify that the PSF is usable by downstream tasks
729 sigma = psf.computeShape(psf.getAveragePosition()).getDeterminantRadius()
730 if np.isnan(sigma):
731 self.log.warning('Failed to determine psf for visit %d, detector %d: '
732 'Computed final PSF size is NAN.',
733 visit, detector)
734 return None, None, measured_src
735
736 # Set the psf in the exposure for measurement/aperture corrections.
737 exposure.setPsf(psf)
738
739 # At this point, we need to transfer the psf used flag from the selection
740 # catalog to the measurement catalog.
741 matched_selected, matched_measured = esutil.numpy_util.match(
742 selected_src['id'],
743 measured_src['id']
744 )
745 measured_used = np.zeros(len(measured_src), dtype=bool)
746 measured_used[matched_measured] = selected_src['calib_psf_used'][matched_selected]
747 measured_src['calib_psf_used'] = measured_used
748
749 # Next, we do the measurement on all the psf candidate, used, and reserved stars.
750 # We use the full footprint list from the input src catalog for noise replacement.
751 try:
752 self.measurement.run(measCat=measured_src, exposure=exposure, footprints=footprints)
753 except Exception as e:
754 self.log.warning('Failed to make measurements for visit %d, detector %d: %s',
755 visit, detector, e)
756 return psf, None, measured_src
757
758 # And finally the ap corr map.
759 try:
760 ap_corr_map = self.measure_ap_corr.run(exposure=exposure,
761 catalog=measured_src).apCorrMap
762 except Exception as e:
763 self.log.warning('Failed to compute aperture corrections for visit %d, detector %d: %s',
764 visit, detector, e)
765 return psf, None, measured_src
766
767 # Need to merge the original normalization aperture correction map.
768 ap_corr_map_input = exposure.apCorrMap
769 for key in ap_corr_map_input:
770 if key not in ap_corr_map:
771 ap_corr_map[key] = ap_corr_map_input[key]
772
773 self.apply_ap_corr.run(catalog=measured_src, apCorrMap=ap_corr_map)
774
775 return psf, ap_corr_map, measured_src
Mapping class that holds aliases for a Schema.
Definition AliasMap.h:36
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict)
compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table)
concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict)