22"""Task to run a finalized image characterization, using additional data.
25__all__ = [
'FinalizeCharacterizationConnections',
26 'FinalizeCharacterizationConfig',
27 'FinalizeCharacterizationTask']
40from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask
43from .reserveIsolatedStars
import ReserveIsolatedStarsTask
47 dimensions=(
'instrument',
'visit',),
49 src_schema = pipeBase.connectionTypes.InitInput(
50 doc=
'Input schema used for src catalogs.',
52 storageClass=
'SourceCatalog',
54 srcs = pipeBase.connectionTypes.Input(
55 doc=
'Source catalogs for the visit',
57 storageClass=
'SourceCatalog',
58 dimensions=(
'instrument',
'visit',
'detector'),
62 calexps = pipeBase.connectionTypes.Input(
63 doc=
'Calexps for the visit',
65 storageClass=
'ExposureF',
66 dimensions=(
'instrument',
'visit',
'detector'),
70 isolated_star_cats = pipeBase.connectionTypes.Input(
71 doc=(
'Catalog of isolated stars with average positions, number of associated '
72 'sources, and indexes to the isolated_star_sources catalogs.'),
73 name=
'isolated_star_cat',
74 storageClass=
'DataFrame',
75 dimensions=(
'instrument',
'tract',
'skymap'),
79 isolated_star_sources = pipeBase.connectionTypes.Input(
80 doc=(
'Catalog of isolated star sources with sourceIds, and indexes to the '
81 'isolated_star_cats catalogs.'),
82 name=
'isolated_star_sources',
83 storageClass=
'DataFrame',
84 dimensions=(
'instrument',
'tract',
'skymap'),
88 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
89 doc=(
'Per-visit finalized psf models and aperture corrections. This '
90 'catalog uses detector id for the id and are sorted for fast '
91 'lookups of a detector.'),
92 name=
'finalized_psf_ap_corr_catalog',
93 storageClass=
'ExposureCatalog',
94 dimensions=(
'instrument',
'visit'),
96 finalized_src_table = pipeBase.connectionTypes.Output(
97 doc=(
'Per-visit catalog of measurements for psf/flag/etc.'),
98 name=
'finalized_src_table',
99 storageClass=
'DataFrame',
100 dimensions=(
'instrument',
'visit'),
105 pipelineConnections=FinalizeCharacterizationConnections):
106 """Configuration for FinalizeCharacterizationTask."""
107 source_selector = sourceSelectorRegistry.makeField(
108 doc=
"How to select sources",
111 id_column = pexConfig.Field(
112 doc=
'Name of column in isolated_star_sources with source id.',
116 reserve_selection = pexConfig.ConfigurableField(
117 target=ReserveIsolatedStarsTask,
118 doc=
'Task to select reserved stars',
120 make_psf_candidates = pexConfig.ConfigurableField(
121 target=measAlg.MakePsfCandidatesTask,
122 doc=
'Task to make psf candidates from selected stars.',
124 psf_determiner = measAlg.psfDeterminerRegistry.makeField(
125 'PSF Determination algorithm',
128 measurement = pexConfig.ConfigurableField(
129 target=SingleFrameMeasurementTask,
130 doc=
'Measure sources for aperture corrections'
132 measure_ap_corr = pexConfig.ConfigurableField(
133 target=MeasureApCorrTask,
134 doc=
"Subtask to measure aperture corrections"
136 apply_ap_corr = pexConfig.ConfigurableField(
137 target=ApplyApCorrTask,
138 doc=
"Subtask to apply aperture corrections"
145 source_selector.setDefaults()
151 source_selector.doFlags =
True
152 source_selector.doSignalToNoise =
True
153 source_selector.doFluxLimit =
False
154 source_selector.doUnresolved =
False
155 source_selector.doIsolated =
False
157 source_selector.signalToNoise.minimum = 20.0
158 source_selector.signalToNoise.maximum = 1000.0
160 source_selector.signalToNoise.fluxField =
'base_GaussianFlux_instFlux'
161 source_selector.signalToNoise.errField =
'base_GaussianFlux_instFluxErr'
163 source_selector.flags.bad = [
'base_PixelFlags_flag_edge',
164 'base_PixelFlags_flag_interpolatedCenter',
165 'base_PixelFlags_flag_saturatedCenter',
166 'base_PixelFlags_flag_crCenter',
167 'base_PixelFlags_flag_bad',
168 'base_PixelFlags_flag_interpolated',
169 'base_PixelFlags_flag_saturated',
170 'slot_Centroid_flag',
171 'base_GaussianFlux_flag']
181 ap_selector.doFlags =
False
182 ap_selector.doUnresolved =
False
186 import lsst.meas.extensions.convolved
187 import lsst.meas.extensions.gaap
188 import lsst.meas.extensions.shapeHSM
194 'modelfit_DoubleShapeletPsfApprox',
196 'ext_photometryKron_KronFlux',
197 'ext_convolved_ConvolvedFlux',
199 'ext_shapeHSM_HsmShapeRegauss',
200 'ext_shapeHSM_HsmSourceMoments',
201 'ext_shapeHSM_HsmPsfMoments',
202 'ext_shapeHSM_HsmSourceMomentsRound',
204 self.
measurement.slots.modelFlux =
'modelfit_CModel'
205 self.
measurement.plugins[
'ext_convolved_ConvolvedFlux'].seeing.append(8.0)
206 self.
measurement.plugins[
'ext_gaap_GaapFlux'].sigmas = [
214 self.
measurement.plugins[
'ext_gaap_GaapFlux'].doPsfPhotometry =
True
215 self.
measurement.slots.shape =
'ext_shapeHSM_HsmSourceMoments'
216 self.
measurement.slots.psfShape =
'ext_shapeHSM_HsmPsfMoments'
217 self.
measurement.plugins[
'ext_shapeHSM_HsmShapeRegauss'].deblendNChild =
""
224 names = self.
measurement.plugins[
'ext_convolved_ConvolvedFlux'].getAllResultNames()
226 names = self.
measurement.plugins[
"ext_gaap_GaapFlux"].getAllGaapResultNames()
231 """Run final characterization on exposures."""
232 ConfigClass = FinalizeCharacterizationConfig
233 _DefaultName =
'finalize_characterization'
235 def __init__(self, initInputs=None, **kwargs):
236 super().__init__(initInputs=initInputs, **kwargs)
239 initInputs[
'src_schema'].schema
242 self.makeSubtask(
'reserve_selection')
243 self.makeSubtask(
'source_selector')
244 self.makeSubtask(
'make_psf_candidates')
245 self.makeSubtask(
'psf_determiner')
246 self.makeSubtask(
'measurement', schema=self.
schema)
247 self.makeSubtask(
'measure_ap_corr', schema=self.
schema)
248 self.makeSubtask(
'apply_ap_corr', schema=self.
schema)
251 self.source_selector.log.setLevel(self.source_selector.log.WARN)
254 input_handle_dict = butlerQC.get(inputRefs)
256 band = butlerQC.quantum.dataId[
'band']
257 visit = butlerQC.quantum.dataId[
'visit']
259 src_dict_temp = {handle.dataId[
'detector']: handle
260 for handle
in input_handle_dict[
'srcs']}
261 calexp_dict_temp = {handle.dataId[
'detector']: handle
262 for handle
in input_handle_dict[
'calexps']}
263 isolated_star_cat_dict_temp = {handle.dataId[
'tract']: handle
264 for handle
in input_handle_dict[
'isolated_star_cats']}
265 isolated_star_source_dict_temp = {handle.dataId[
'tract']: handle
266 for handle
in input_handle_dict[
'isolated_star_sources']}
269 src_dict = {detector: src_dict_temp[detector]
for
270 detector
in sorted(src_dict_temp.keys())}
271 calexp_dict = {detector: calexp_dict_temp[detector]
for
272 detector
in sorted(calexp_dict_temp.keys())}
273 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract]
for
274 tract
in sorted(isolated_star_cat_dict_temp.keys())}
275 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract]
for
276 tract
in sorted(isolated_star_source_dict_temp.keys())}
278 struct = self.
run(visit,
280 isolated_star_cat_dict,
281 isolated_star_source_dict,
285 butlerQC.put(struct.psf_ap_corr_cat,
286 outputRefs.finalized_psf_ap_corr_cat)
287 butlerQC.put(pd.DataFrame(struct.output_table),
288 outputRefs.finalized_src_table)
290 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
292 Run the FinalizeCharacterizationTask.
297 Visit number. Used in the output catalogs.
299 Band name. Used to select reserved stars.
300 isolated_star_cat_dict : `dict`
301 Per-tract dict of isolated star catalog handles.
302 isolated_star_source_dict : `dict`
303 Per-tract dict of isolated star source catalog handles.
305 Per-detector dict of src catalog handles.
307 Per-detector dict of calibrated exposure handles.
311 struct : `lsst.pipe.base.struct`
312 Struct
with outputs
for persistence.
318 isolated_star_cat_dict,
319 isolated_star_source_dict
322 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
323 exposure_cat_schema.addField(
'visit', type=
'L', doc=
'Visit number')
326 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
327 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
330 psf_ap_corr_cat.setMetadata(metadata)
332 measured_src_tables = []
334 for detector
in src_dict:
335 src = src_dict[detector].get()
336 exposure = calexp_dict[detector].get()
343 isolated_source_table
347 record = psf_ap_corr_cat.addNew()
348 record[
'id'] = int(detector)
349 record[
'visit'] = visit
352 if ap_corr_map
is not None:
353 record.setApCorrMap(ap_corr_map)
355 measured_src[
'visit'][:] = visit
356 measured_src[
'detector'][:] = detector
358 measured_src_tables.append(measured_src.asAstropy().as_array())
360 measured_src_table = np.concatenate(measured_src_tables)
362 return pipeBase.Struct(psf_ap_corr_cat=psf_ap_corr_cat,
363 output_table=measured_src_table)
366 """Make the schema mapper from the input schema to the output schema.
378 Output schema (with alias map)
381 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema())
382 mapper.addMapping(input_schema['slot_Centroid_x'].asKey())
383 mapper.addMapping(input_schema[
'slot_Centroid_y'].asKey())
386 aper_fields = input_schema.extract(
'base_CircularApertureFlux_*')
387 for field, item
in aper_fields.items():
388 mapper.addMapping(item.key)
391 apflux_fields = input_schema.extract(
'slot_ApFlux_*')
392 for field, item
in apflux_fields.items():
393 mapper.addMapping(item.key)
395 calibflux_fields = input_schema.extract(
'slot_CalibFlux_*')
396 for field, item
in calibflux_fields.items():
397 mapper.addMapping(item.key)
400 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(),
401 'calib_psf_selection_flux')
403 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(),
404 'calib_psf_selection_flux_err')
406 output_schema = mapper.getOutputSchema()
408 output_schema.addField(
409 'calib_psf_candidate',
411 doc=(
'set if the source was a candidate for PSF determination, '
412 'as determined from FinalizeCharacterizationTask.'),
414 output_schema.addField(
415 'calib_psf_reserved',
417 doc=(
'set if source was reserved from PSF determination by '
418 'FinalizeCharacterizationTask.'),
420 output_schema.addField(
423 doc=(
'set if source was used in the PSF determination by '
424 'FinalizeCharacterizationTask.'),
426 output_schema.addField(
429 doc=
'Visit number for the sources.',
431 output_schema.addField(
434 doc=
'Detector number for the sources.',
437 alias_map = input_schema.getAliasMap()
439 alias_map_output.set(
'slot_Centroid', alias_map.get(
'slot_Centroid'))
440 alias_map_output.set(
'slot_ApFlux', alias_map.get(
'slot_ApFlux'))
441 alias_map_output.set(
'slot_CalibFlux', alias_map.get(
'slot_CalibFlux'))
443 output_schema.setAliasMap(alias_map_output)
445 return mapper, output_schema
448 """Make the schema mapper from the input schema to the selection schema.
460 Selection schema (with alias map)
463 mapper.addMinimalSchema(input_schema)
465 selection_schema = mapper.getOutputSchema()
467 selection_schema.setAliasMap(input_schema.getAliasMap())
469 return mapper, selection_schema
473 Concatenate isolated star catalogs and make reserve selection.
478 Band name. Used to select reserved stars.
479 isolated_star_cat_dict : `dict`
480 Per-tract dict of isolated star catalog handles.
481 isolated_star_source_dict : `dict`
482 Per-tract dict of isolated star source catalog handles.
486 isolated_table : `np.ndarray` (N,)
487 Table of isolated stars,
with indexes to isolated sources.
488 isolated_source_table : `np.ndarray` (M,)
489 Table of isolated sources,
with indexes to isolated stars.
492 isolated_sources = []
493 merge_cat_counter = 0
494 merge_source_counter = 0
496 for tract
in isolated_star_cat_dict:
497 df_cat = isolated_star_cat_dict[tract].get()
498 table_cat = df_cat.to_records()
500 df_source = isolated_star_source_dict[tract].get(
501 parameters={
'columns': [self.config.id_column,
504 table_source = df_source.to_records()
507 (use_band,) = (table_cat[f
'nsource_{band}'] > 0).nonzero()
509 if len(use_band) == 0:
511 self.log.info(
"No sources found in %s band in tract %d.", band, tract)
516 obj_index = table_source[
'obj_index'][:]
517 a, b = esutil.numpy_util.match(use_band, obj_index)
520 table_source[
'obj_index'][b] = a
521 _, index_new = np.unique(a, return_index=
True)
522 table_cat[f
'source_cat_index_{band}'][use_band] = index_new
533 table_source = table_source[b]
534 table_cat = table_cat[use_band]
537 table_cat = np.lib.recfunctions.append_fields(
540 np.zeros(table_cat.size, dtype=bool),
543 table_source = np.lib.recfunctions.append_fields(
546 np.zeros(table_source.size, dtype=bool),
551 table_cat[
'reserved'][:] = self.reserve_selection.run(
553 extra=f
'{band}_{tract}',
555 table_source[
'reserved'][:] = table_cat[
'reserved'][table_source[
'obj_index']]
558 table_cat[f
'source_cat_index_{band}'] += merge_source_counter
559 table_source[
'obj_index'] += merge_cat_counter
561 isolated_tables.append(table_cat)
562 isolated_sources.append(table_source)
564 merge_cat_counter += len(table_cat)
565 merge_source_counter += len(table_source)
567 isolated_table = np.concatenate(isolated_tables)
568 isolated_source_table = np.concatenate(isolated_sources)
570 return isolated_table, isolated_source_table
573 """Compute psf model and aperture correction map for a single exposure.
578 Visit number (for logging).
580 Detector number (
for logging).
581 exposure : `lsst.afw.image.ExposureF`
583 isolated_source_table : `np.ndarray`
590 Aperture correction map.
592 Updated source catalog
with measurements, flags
and aperture corrections.
595 good_src = self.source_selector.selectSources(src)
605 selected_src.reserve(good_src.selected.sum())
606 selected_src.extend(src[good_src.selected], mapper=selection_mapper)
610 selected_src[
'calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool)
611 selected_src[
'calib_psf_used'] = np.zeros(len(selected_src), dtype=bool)
612 selected_src[
'calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool)
615 matched_src, matched_iso = esutil.numpy_util.match(
617 isolated_source_table[self.config.id_column]
620 matched_arr = np.zeros(len(selected_src), dtype=bool)
621 matched_arr[matched_src] =
True
622 selected_src[
'calib_psf_candidate'] = matched_arr
624 reserved_arr = np.zeros(len(selected_src), dtype=bool)
625 reserved_arr[matched_src] = isolated_source_table[
'reserved'][matched_iso]
626 selected_src[
'calib_psf_reserved'] = reserved_arr
628 selected_src = selected_src[selected_src[
'calib_psf_candidate']].copy(deep=
True)
632 measured_src.reserve(len(selected_src))
636 measured_src[
'calib_psf_candidate'] = selected_src[
'calib_psf_candidate']
637 measured_src[
'calib_psf_reserved'] = selected_src[
'calib_psf_reserved']
641 psf_selection_result = self.make_psf_candidates.run(selected_src, exposure=exposure)
642 except Exception
as e:
643 self.log.warning(
'Failed to make psf candidates for visit %d, detector %d: %s',
645 return None,
None, measured_src
647 psf_cand_cat = psf_selection_result.goodStarCat
651 psf_determiner_list = [cand
for cand, use
652 in zip(psf_selection_result.psfCandidates,
653 ~psf_cand_cat[
'calib_psf_reserved'])
if use]
654 flag_key = psf_cand_cat.schema[
'calib_psf_used'].asKey()
656 psf, cell_set = self.psf_determiner.determinePsf(exposure,
660 except Exception
as e:
661 self.log.warning(
'Failed to determine psf for visit %d, detector %d: %s',
663 return None,
None, measured_src
670 matched_selected, matched_measured = esutil.numpy_util.match(
674 measured_used = np.zeros(len(measured_src), dtype=bool)
675 measured_used[matched_measured] = selected_src[
'calib_psf_used'][matched_selected]
676 measured_src[
'calib_psf_used'] = measured_used
680 self.measurement.run(measCat=measured_src, exposure=exposure)
681 except Exception
as e:
682 self.log.warning(
'Failed to make measurements for visit %d, detector %d: %s',
684 return psf,
None, measured_src
688 ap_corr_map = self.measure_ap_corr.run(exposure=exposure,
689 catalog=measured_src).apCorrMap
690 except Exception
as e:
691 self.log.warning(
'Failed to compute aperture corrections for visit %d, detector %d: %s',
693 return psf,
None, measured_src
695 self.apply_ap_corr.run(catalog=measured_src, apCorrMap=ap_corr_map)
697 return psf, ap_corr_map, measured_src
A thin wrapper around std::map to allow aperture corrections to be attached to Exposures.
Mapping class that holds aliases for a Schema.
Custom catalog class for ExposureRecord/Table.
Defines the fields and offsets for a table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An intermediate base class for Psfs that use an image representation.
_make_output_schema_mapper(self, input_schema)
runQuantum(self, butlerQC, inputRefs, outputRefs)
run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict)
compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table)
_make_selection_schema_mapper(self, input_schema)
concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict)