22"""Task to run a finalized image characterization, using additional data.
25__all__ = [
'FinalizeCharacterizationConnections',
26 'FinalizeCharacterizationConfig',
27 'FinalizeCharacterizationTask']
40from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask
43from .reserveIsolatedStars
import ReserveIsolatedStarsTask
47 dimensions=(
'instrument',
'visit',),
49 src_schema = pipeBase.connectionTypes.InitInput(
50 doc=
'Input schema used for src catalogs.',
52 storageClass=
'SourceCatalog',
54 srcs = pipeBase.connectionTypes.Input(
55 doc=
'Source catalogs for the visit',
57 storageClass=
'SourceCatalog',
58 dimensions=(
'instrument',
'visit',
'detector'),
62 calexps = pipeBase.connectionTypes.Input(
63 doc=
'Calexps for the visit',
65 storageClass=
'ExposureF',
66 dimensions=(
'instrument',
'visit',
'detector'),
70 isolated_star_cats = pipeBase.connectionTypes.Input(
71 doc=(
'Catalog of isolated stars with average positions, number of associated '
72 'sources, and indexes to the isolated_star_sources catalogs.'),
73 name=
'isolated_star_cat',
74 storageClass=
'DataFrame',
75 dimensions=(
'instrument',
'tract',
'skymap'),
79 isolated_star_sources = pipeBase.connectionTypes.Input(
80 doc=(
'Catalog of isolated star sources with sourceIds, and indexes to the '
81 'isolated_star_cats catalogs.'),
82 name=
'isolated_star_sources',
83 storageClass=
'DataFrame',
84 dimensions=(
'instrument',
'tract',
'skymap'),
88 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
89 doc=(
'Per-visit finalized psf models and aperture corrections. This '
90 'catalog uses detector id for the id and are sorted for fast '
91 'lookups of a detector.'),
92 name=
'finalized_psf_ap_corr_catalog',
93 storageClass=
'ExposureCatalog',
94 dimensions=(
'instrument',
'visit'),
96 finalized_src_table = pipeBase.connectionTypes.Output(
97 doc=(
'Per-visit catalog of measurements for psf/flag/etc.'),
98 name=
'finalized_src_table',
99 storageClass=
'DataFrame',
100 dimensions=(
'instrument',
'visit'),
105 pipelineConnections=FinalizeCharacterizationConnections):
106 """Configuration for FinalizeCharacterizationTask."""
107 source_selector = sourceSelectorRegistry.makeField(
108 doc=
"How to select sources",
111 id_column = pexConfig.Field(
112 doc=
'Name of column in isolated_star_sources with source id.',
116 reserve_selection = pexConfig.ConfigurableField(
117 target=ReserveIsolatedStarsTask,
118 doc=
'Task to select reserved stars',
120 make_psf_candidates = pexConfig.ConfigurableField(
121 target=measAlg.MakePsfCandidatesTask,
122 doc=
'Task to make psf candidates from selected stars.',
124 psf_determiner = measAlg.psfDeterminerRegistry.makeField(
125 'PSF Determination algorithm',
128 measurement = pexConfig.ConfigurableField(
129 target=SingleFrameMeasurementTask,
130 doc=
'Measure sources for aperture corrections'
132 measure_ap_corr = pexConfig.ConfigurableField(
133 target=MeasureApCorrTask,
134 doc=
"Subtask to measure aperture corrections"
136 apply_ap_corr = pexConfig.ConfigurableField(
137 target=ApplyApCorrTask,
138 doc=
"Subtask to apply aperture corrections"
145 source_selector.setDefaults()
151 source_selector.doFlags =
True
152 source_selector.doSignalToNoise =
True
153 source_selector.doFluxLimit =
False
154 source_selector.doUnresolved =
False
155 source_selector.doIsolated =
False
157 source_selector.signalToNoise.minimum = 20.0
158 source_selector.signalToNoise.maximum = 1000.0
160 source_selector.signalToNoise.fluxField =
'base_GaussianFlux_instFlux'
161 source_selector.signalToNoise.errField =
'base_GaussianFlux_instFluxErr'
163 source_selector.flags.bad = [
'base_PixelFlags_flag_edge',
164 'base_PixelFlags_flag_interpolatedCenter',
165 'base_PixelFlags_flag_saturatedCenter',
166 'base_PixelFlags_flag_crCenter',
167 'base_PixelFlags_flag_bad',
168 'base_PixelFlags_flag_interpolated',
169 'base_PixelFlags_flag_saturated',
170 'slot_Centroid_flag',
171 'base_GaussianFlux_flag']
179 ap_selector.doFluxLimit =
False
180 ap_selector.doFlags =
True
181 ap_selector.doUnresolved =
False
182 ap_selector.doSignalToNoise =
True
183 ap_selector.doIsolated =
False
184 ap_selector.flags.good = [
'calib_psf_used']
185 ap_selector.flags.bad = []
186 ap_selector.signalToNoise.minimum = 200.0
187 ap_selector.signalToNoise.maximum =
None
188 ap_selector.signalToNoise.fluxField =
'base_PsfFlux_instFlux'
189 ap_selector.signalToNoise.errField =
'base_PsfFlux_instFluxErr'
193 import lsst.meas.extensions.convolved
194 import lsst.meas.extensions.gaap
195 import lsst.meas.extensions.shapeHSM
201 'modelfit_DoubleShapeletPsfApprox',
203 'ext_photometryKron_KronFlux',
204 'ext_convolved_ConvolvedFlux',
206 'ext_shapeHSM_HsmShapeRegauss',
207 'ext_shapeHSM_HsmSourceMoments',
208 'ext_shapeHSM_HsmPsfMoments',
209 'ext_shapeHSM_HsmSourceMomentsRound',
211 self.
measurement.slots.modelFlux =
'modelfit_CModel'
212 self.
measurement.plugins[
'ext_convolved_ConvolvedFlux'].seeing.append(8.0)
213 self.
measurement.plugins[
'ext_gaap_GaapFlux'].sigmas = [
221 self.
measurement.plugins[
'ext_gaap_GaapFlux'].doPsfPhotometry =
True
222 self.
measurement.slots.shape =
'ext_shapeHSM_HsmSourceMoments'
223 self.
measurement.slots.psfShape =
'ext_shapeHSM_HsmPsfMoments'
224 self.
measurement.plugins[
'ext_shapeHSM_HsmShapeRegauss'].deblendNChild =
""
231 names = self.
measurement.plugins[
'ext_convolved_ConvolvedFlux'].getAllResultNames()
233 names = self.
measurement.plugins[
"ext_gaap_GaapFlux"].getAllGaapResultNames()
238 """Run final characterization on exposures."""
239 ConfigClass = FinalizeCharacterizationConfig
240 _DefaultName =
'finalize_characterization'
242 def __init__(self, initInputs=None, **kwargs):
243 super().__init__(initInputs=initInputs, **kwargs)
246 initInputs[
'src_schema'].schema
249 self.makeSubtask(
'reserve_selection')
250 self.makeSubtask(
'source_selector')
251 self.makeSubtask(
'make_psf_candidates')
252 self.makeSubtask(
'psf_determiner')
253 self.makeSubtask(
'measurement', schema=self.
schema)
254 self.makeSubtask(
'measure_ap_corr', schema=self.
schema)
255 self.makeSubtask(
'apply_ap_corr', schema=self.
schema)
258 self.source_selector.log.setLevel(self.source_selector.log.WARN)
261 input_handle_dict = butlerQC.get(inputRefs)
263 band = butlerQC.quantum.dataId[
'band']
264 visit = butlerQC.quantum.dataId[
'visit']
266 src_dict_temp = {handle.dataId[
'detector']: handle
267 for handle
in input_handle_dict[
'srcs']}
268 calexp_dict_temp = {handle.dataId[
'detector']: handle
269 for handle
in input_handle_dict[
'calexps']}
270 isolated_star_cat_dict_temp = {handle.dataId[
'tract']: handle
271 for handle
in input_handle_dict[
'isolated_star_cats']}
272 isolated_star_source_dict_temp = {handle.dataId[
'tract']: handle
273 for handle
in input_handle_dict[
'isolated_star_sources']}
276 src_dict = {detector: src_dict_temp[detector]
for
277 detector
in sorted(src_dict_temp.keys())}
278 calexp_dict = {detector: calexp_dict_temp[detector]
for
279 detector
in sorted(calexp_dict_temp.keys())}
280 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract]
for
281 tract
in sorted(isolated_star_cat_dict_temp.keys())}
282 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract]
for
283 tract
in sorted(isolated_star_source_dict_temp.keys())}
285 struct = self.
run(visit,
287 isolated_star_cat_dict,
288 isolated_star_source_dict,
292 butlerQC.put(struct.psf_ap_corr_cat,
293 outputRefs.finalized_psf_ap_corr_cat)
294 butlerQC.put(pd.DataFrame(struct.output_table),
295 outputRefs.finalized_src_table)
297 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
299 Run the FinalizeCharacterizationTask.
304 Visit number. Used in the output catalogs.
306 Band name. Used to select reserved stars.
307 isolated_star_cat_dict : `dict`
308 Per-tract dict of isolated star catalog handles.
309 isolated_star_source_dict : `dict`
310 Per-tract dict of isolated star source catalog handles.
312 Per-detector dict of src catalog handles.
314 Per-detector dict of calibrated exposure handles.
318 struct : `lsst.pipe.base.struct`
319 Struct
with outputs
for persistence.
325 isolated_star_cat_dict,
326 isolated_star_source_dict
329 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
330 exposure_cat_schema.addField(
'visit', type=
'L', doc=
'Visit number')
333 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
334 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
337 psf_ap_corr_cat.setMetadata(metadata)
339 measured_src_tables = []
341 for detector
in src_dict:
342 src = src_dict[detector].get()
343 exposure = calexp_dict[detector].get()
350 isolated_source_table
354 record = psf_ap_corr_cat.addNew()
355 record[
'id'] = int(detector)
356 record[
'visit'] = visit
359 if ap_corr_map
is not None:
360 record.setApCorrMap(ap_corr_map)
362 measured_src[
'visit'][:] = visit
363 measured_src[
'detector'][:] = detector
365 measured_src_tables.append(measured_src.asAstropy().as_array())
367 measured_src_table = np.concatenate(measured_src_tables)
369 return pipeBase.Struct(psf_ap_corr_cat=psf_ap_corr_cat,
370 output_table=measured_src_table)
372 def _make_output_schema_mapper(self, input_schema):
373 """Make the schema mapper from the input schema to the output schema.
385 Output schema (with alias map)
388 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema())
389 mapper.addMapping(input_schema['slot_Centroid_x'].asKey())
390 mapper.addMapping(input_schema[
'slot_Centroid_y'].asKey())
393 aper_fields = input_schema.extract(
'base_CircularApertureFlux_*')
394 for field, item
in aper_fields.items():
395 mapper.addMapping(item.key)
398 apflux_fields = input_schema.extract(
'slot_ApFlux_*')
399 for field, item
in apflux_fields.items():
400 mapper.addMapping(item.key)
402 calibflux_fields = input_schema.extract(
'slot_CalibFlux_*')
403 for field, item
in calibflux_fields.items():
404 mapper.addMapping(item.key)
407 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(),
408 'calib_psf_selection_flux')
410 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(),
411 'calib_psf_selection_flux_err')
413 output_schema = mapper.getOutputSchema()
415 output_schema.addField(
416 'calib_psf_candidate',
418 doc=(
'set if the source was a candidate for PSF determination, '
419 'as determined from FinalizeCharacterizationTask.'),
421 output_schema.addField(
422 'calib_psf_reserved',
424 doc=(
'set if source was reserved from PSF determination by '
425 'FinalizeCharacterizationTask.'),
427 output_schema.addField(
430 doc=(
'set if source was used in the PSF determination by '
431 'FinalizeCharacterizationTask.'),
433 output_schema.addField(
436 doc=
'Visit number for the sources.',
438 output_schema.addField(
441 doc=
'Detector number for the sources.',
444 alias_map = input_schema.getAliasMap()
446 alias_map_output.set(
'slot_Centroid', alias_map.get(
'slot_Centroid'))
447 alias_map_output.set(
'slot_ApFlux', alias_map.get(
'slot_ApFlux'))
448 alias_map_output.set(
'slot_CalibFlux', alias_map.get(
'slot_CalibFlux'))
450 output_schema.setAliasMap(alias_map_output)
452 return mapper, output_schema
454 def _make_selection_schema_mapper(self, input_schema):
455 """Make the schema mapper from the input schema to the selection schema.
467 Selection schema (with alias map)
470 mapper.addMinimalSchema(input_schema)
472 selection_schema = mapper.getOutputSchema()
474 selection_schema.setAliasMap(input_schema.getAliasMap())
476 return mapper, selection_schema
480 Concatenate isolated star catalogs and make reserve selection.
485 Band name. Used to select reserved stars.
486 isolated_star_cat_dict : `dict`
487 Per-tract dict of isolated star catalog handles.
488 isolated_star_source_dict : `dict`
489 Per-tract dict of isolated star source catalog handles.
493 isolated_table : `np.ndarray` (N,)
494 Table of isolated stars,
with indexes to isolated sources.
495 isolated_source_table : `np.ndarray` (M,)
496 Table of isolated sources,
with indexes to isolated stars.
499 isolated_sources = []
500 merge_cat_counter = 0
501 merge_source_counter = 0
503 for tract
in isolated_star_cat_dict:
504 df_cat = isolated_star_cat_dict[tract].get()
505 table_cat = df_cat.to_records()
507 df_source = isolated_star_source_dict[tract].get(
508 parameters={
'columns': [self.config.id_column,
511 table_source = df_source.to_records()
514 (use_band,) = (table_cat[f
'nsource_{band}'] > 0).nonzero()
516 if len(use_band) == 0:
518 self.log.info(
"No sources found in %s band in tract %d.", band, tract)
523 obj_index = table_source[
'obj_index'][:]
524 a, b = esutil.numpy_util.match(use_band, obj_index)
527 table_source[
'obj_index'][b] = a
528 _, index_new = np.unique(a, return_index=
True)
529 table_cat[f
'source_cat_index_{band}'][use_band] = index_new
540 table_source = table_source[b]
541 table_cat = table_cat[use_band]
544 table_cat = np.lib.recfunctions.append_fields(
547 np.zeros(table_cat.size, dtype=bool),
550 table_source = np.lib.recfunctions.append_fields(
553 np.zeros(table_source.size, dtype=bool),
558 table_cat[
'reserved'][:] = self.reserve_selection.run(
560 extra=f
'{band}_{tract}',
562 table_source[
'reserved'][:] = table_cat[
'reserved'][table_source[
'obj_index']]
565 table_cat[f
'source_cat_index_{band}'] += merge_source_counter
566 table_source[
'obj_index'] += merge_cat_counter
568 isolated_tables.append(table_cat)
569 isolated_sources.append(table_source)
571 merge_cat_counter += len(table_cat)
572 merge_source_counter += len(table_source)
574 isolated_table = np.concatenate(isolated_tables)
575 isolated_source_table = np.concatenate(isolated_sources)
577 return isolated_table, isolated_source_table
580 """Compute psf model and aperture correction map for a single exposure.
585 Visit number (for logging).
587 Detector number (
for logging).
588 exposure : `lsst.afw.image.ExposureF`
590 isolated_source_table : `np.ndarray`
597 Aperture correction map.
599 Updated source catalog
with measurements, flags
and aperture corrections.
602 good_src = self.source_selector.selectSources(src)
612 selected_src.reserve(good_src.selected.sum())
613 selected_src.extend(src[good_src.selected], mapper=selection_mapper)
617 selected_src[
'calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool)
618 selected_src[
'calib_psf_used'] = np.zeros(len(selected_src), dtype=bool)
619 selected_src[
'calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool)
622 matched_src, matched_iso = esutil.numpy_util.match(
624 isolated_source_table[self.config.id_column]
627 matched_arr = np.zeros(len(selected_src), dtype=bool)
628 matched_arr[matched_src] =
True
629 selected_src[
'calib_psf_candidate'] = matched_arr
631 reserved_arr = np.zeros(len(selected_src), dtype=bool)
632 reserved_arr[matched_src] = isolated_source_table[
'reserved'][matched_iso]
633 selected_src[
'calib_psf_reserved'] = reserved_arr
635 selected_src = selected_src[selected_src[
'calib_psf_candidate']].copy(deep=
True)
639 measured_src.reserve(len(selected_src))
640 measured_src.extend(selected_src, mapper=self.schema_mapper)
643 measured_src[
'calib_psf_candidate'] = selected_src[
'calib_psf_candidate']
644 measured_src[
'calib_psf_reserved'] = selected_src[
'calib_psf_reserved']
648 psf_selection_result = self.make_psf_candidates.run(selected_src, exposure=exposure)
649 except Exception
as e:
650 self.log.warning(
'Failed to make psf candidates for visit %d, detector %d: %s',
652 return None,
None, measured_src
654 psf_cand_cat = psf_selection_result.goodStarCat
658 psf_determiner_list = [cand
for cand, use
659 in zip(psf_selection_result.psfCandidates,
660 ~psf_cand_cat[
'calib_psf_reserved'])
if use]
661 flag_key = psf_cand_cat.schema[
'calib_psf_used'].asKey()
663 psf, cell_set = self.psf_determiner.determinePsf(exposure,
667 except Exception
as e:
668 self.log.warning(
'Failed to determine psf for visit %d, detector %d: %s',
670 return None,
None, measured_src
677 matched_selected, matched_measured = esutil.numpy_util.match(
681 measured_used = np.zeros(len(measured_src), dtype=bool)
682 measured_used[matched_measured] = selected_src[
'calib_psf_used'][matched_selected]
683 measured_src[
'calib_psf_used'] = measured_used
687 self.measurement.run(measCat=measured_src, exposure=exposure)
688 except Exception
as e:
689 self.log.warning(
'Failed to make measurements for visit %d, detector %d: %s',
691 return psf,
None, measured_src
695 ap_corr_map = self.measure_ap_corr.run(exposure=exposure,
696 catalog=measured_src).apCorrMap
697 except Exception
as e:
698 self.log.warning(
'Failed to compute aperture corrections for visit %d, detector %d: %s',
700 return psf,
None, measured_src
702 self.apply_ap_corr.run(catalog=measured_src, apCorrMap=ap_corr_map)
704 return psf, ap_corr_map, measured_src
A thin wrapper around std::map to allow aperture corrections to be attached to Exposures.
Mapping class that holds aliases for a Schema.
Custom catalog class for ExposureRecord/Table.
Defines the fields and offsets for a table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An intermediate base class for Psfs that use an image representation.
def _make_output_schema_mapper(self, input_schema)
def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table)
def _make_selection_schema_mapper(self, input_schema)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict)
def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict)