22"""Task to run a finalized image characterization, using additional data.
35from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask
38from .reserveIsolatedStars
import ReserveIsolatedStarsTask
40__all__ = [
'FinalizeCharacterizationConnections',
41 'FinalizeCharacterizationConfig',
42 'FinalizeCharacterizationTask']
46 dimensions=(
'instrument',
'visit',),
48 src_schema = pipeBase.connectionTypes.InitInput(
49 doc=
'Input schema used for src catalogs.',
51 storageClass=
'SourceCatalog',
53 srcs = pipeBase.connectionTypes.Input(
54 doc=
'Source catalogs for the visit',
56 storageClass=
'SourceCatalog',
57 dimensions=(
'instrument',
'visit',
'detector'),
61 calexps = pipeBase.connectionTypes.Input(
62 doc=
'Calexps for the visit',
64 storageClass=
'ExposureF',
65 dimensions=(
'instrument',
'visit',
'detector'),
69 isolated_star_cats = pipeBase.connectionTypes.Input(
70 doc=(
'Catalog of isolated stars with average positions, number of associated '
71 'sources, and indexes to the isolated_star_sources catalogs.'),
72 name=
'isolated_star_cat',
73 storageClass=
'DataFrame',
74 dimensions=(
'instrument',
'tract',
'skymap'),
78 isolated_star_sources = pipeBase.connectionTypes.Input(
79 doc=(
'Catalog of isolated star sources with sourceIds, and indexes to the '
80 'isolated_star_cats catalogs.'),
81 name=
'isolated_star_sources',
82 storageClass=
'DataFrame',
83 dimensions=(
'instrument',
'tract',
'skymap'),
87 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
88 doc=(
'Per-visit finalized psf models and aperture corrections. This '
89 'catalog uses detector id for the id and are sorted for fast '
90 'lookups of a detector.'),
91 name=
'finalized_psf_ap_corr_catalog',
92 storageClass=
'ExposureCatalog',
93 dimensions=(
'instrument',
'visit'),
95 finalized_src_table = pipeBase.connectionTypes.Output(
96 doc=(
'Per-visit catalog of measurements for psf/flag/etc.'),
97 name=
'finalized_src_table',
98 storageClass=
'DataFrame',
99 dimensions=(
'instrument',
'visit'),
104 pipelineConnections=FinalizeCharacterizationConnections):
105 """Configuration for FinalizeCharacterizationTask."""
106 source_selector = sourceSelectorRegistry.makeField(
107 doc=
"How to select sources",
110 id_column = pexConfig.Field(
111 doc=
'Name of column in isolated_star_sources with source id.',
115 reserve_selection = pexConfig.ConfigurableField(
116 target=ReserveIsolatedStarsTask,
117 doc=
'Task to select reserved stars',
119 make_psf_candidates = pexConfig.ConfigurableField(
120 target=measAlg.MakePsfCandidatesTask,
121 doc=
'Task to make psf candidates from selected stars.',
123 psf_determiner = measAlg.psfDeterminerRegistry.makeField(
124 'PSF Determination algorithm',
127 measurement = pexConfig.ConfigurableField(
128 target=SingleFrameMeasurementTask,
129 doc=
'Measure sources for aperture corrections'
131 measure_ap_corr = pexConfig.ConfigurableField(
132 target=MeasureApCorrTask,
133 doc=
"Subtask to measure aperture corrections"
135 apply_ap_corr = pexConfig.ConfigurableField(
136 target=ApplyApCorrTask,
137 doc=
"Subtask to apply aperture corrections"
144 source_selector.setDefaults()
150 source_selector.doFlags =
True
151 source_selector.doSignalToNoise =
True
152 source_selector.doFluxLimit =
False
153 source_selector.doUnresolved =
False
154 source_selector.doIsolated =
False
156 source_selector.signalToNoise.minimum = 20.0
157 source_selector.signalToNoise.maximum = 1000.0
159 source_selector.signalToNoise.fluxField =
'base_GaussianFlux_instFlux'
160 source_selector.signalToNoise.errField =
'base_GaussianFlux_instFluxErr'
162 source_selector.flags.bad = [
'base_PixelFlags_flag_edge',
163 'base_PixelFlags_flag_interpolatedCenter',
164 'base_PixelFlags_flag_saturatedCenter',
165 'base_PixelFlags_flag_crCenter',
166 'base_PixelFlags_flag_bad',
167 'base_PixelFlags_flag_interpolated',
168 'base_PixelFlags_flag_saturated',
169 'slot_Centroid_flag',
170 'base_GaussianFlux_flag']
177 ap_selector = self.
measure_ap_corrmeasure_ap_corr.sourceSelector[
'science']
178 ap_selector.doFluxLimit =
False
179 ap_selector.doFlags =
True
180 ap_selector.doUnresolved =
False
181 ap_selector.doSignalToNoise =
True
182 ap_selector.doIsolated =
False
183 ap_selector.flags.good = [
'calib_psf_used']
184 ap_selector.flags.bad = []
185 ap_selector.signalToNoise.minimum = 200.0
186 ap_selector.signalToNoise.maximum =
None
187 ap_selector.signalToNoise.fluxField =
'base_PsfFlux_instFlux'
188 ap_selector.signalToNoise.errField =
'base_PsfFlux_instFluxErr'
192 import lsst.meas.extensions.convolved
193 import lsst.meas.extensions.gaap
194 import lsst.meas.extensions.shapeHSM
200 'modelfit_DoubleShapeletPsfApprox',
202 'ext_photometryKron_KronFlux',
203 'ext_convolved_ConvolvedFlux',
205 'ext_shapeHSM_HsmShapeRegauss',
206 'ext_shapeHSM_HsmSourceMoments',
207 'ext_shapeHSM_HsmPsfMoments',
208 'ext_shapeHSM_HsmSourceMomentsRound',
210 self.
measurementmeasurement.slots.modelFlux =
'modelfit_CModel'
211 self.
measurementmeasurement.plugins[
'ext_convolved_ConvolvedFlux'].seeing.append(8.0)
212 self.
measurementmeasurement.plugins[
'ext_gaap_GaapFlux'].sigmas = [
220 self.
measurementmeasurement.plugins[
'ext_gaap_GaapFlux'].doPsfPhotometry =
True
221 self.
measurementmeasurement.slots.shape =
'ext_shapeHSM_HsmSourceMoments'
222 self.
measurementmeasurement.slots.psfShape =
'ext_shapeHSM_HsmPsfMoments'
223 self.
measurementmeasurement.plugins[
'ext_shapeHSM_HsmShapeRegauss'].deblendNChild =
""
230 names = self.
measurementmeasurement.plugins[
'ext_convolved_ConvolvedFlux'].getAllResultNames()
232 names = self.
measurementmeasurement.plugins[
"ext_gaap_GaapFlux"].getAllGaapResultNames()
237 """Run final characterization on exposures."""
238 ConfigClass = FinalizeCharacterizationConfig
239 _DefaultName =
'finalize_characterization'
242 super().
__init__(initInputs=initInputs, **kwargs)
245 initInputs[
'src_schema'].schema
248 self.makeSubtask(
'reserve_selection')
249 self.makeSubtask(
'source_selector')
250 self.makeSubtask(
'make_psf_candidates')
251 self.makeSubtask(
'psf_determiner')
252 self.makeSubtask(
'measurement', schema=self.
schemaschema)
253 self.makeSubtask(
'measure_ap_corr', schema=self.
schemaschema)
254 self.makeSubtask(
'apply_ap_corr', schema=self.
schemaschema)
257 self.source_selector.log.setLevel(self.source_selector.log.WARN)
260 input_handle_dict = butlerQC.get(inputRefs)
262 band = butlerQC.quantum.dataId[
'band']
263 visit = butlerQC.quantum.dataId[
'visit']
265 src_dict_temp = {handle.dataId[
'detector']: handle
266 for handle
in input_handle_dict[
'srcs']}
267 calexp_dict_temp = {handle.dataId[
'detector']: handle
268 for handle
in input_handle_dict[
'calexps']}
269 isolated_star_cat_dict_temp = {handle.dataId[
'tract']: handle
270 for handle
in input_handle_dict[
'isolated_star_cats']}
271 isolated_star_source_dict_temp = {handle.dataId[
'tract']: handle
272 for handle
in input_handle_dict[
'isolated_star_sources']}
275 src_dict = {detector: src_dict_temp[detector]
for
276 detector
in sorted(src_dict_temp.keys())}
277 calexp_dict = {detector: calexp_dict_temp[detector]
for
278 detector
in sorted(calexp_dict_temp.keys())}
279 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract]
for
280 tract
in sorted(isolated_star_cat_dict_temp.keys())}
281 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract]
for
282 tract
in sorted(isolated_star_source_dict_temp.keys())}
284 struct = self.
runrun(visit,
286 isolated_star_cat_dict,
287 isolated_star_source_dict,
291 butlerQC.put(struct.psf_ap_corr_cat,
292 outputRefs.finalized_psf_ap_corr_cat)
293 butlerQC.put(pd.DataFrame(struct.output_table),
294 outputRefs.finalized_src_table)
296 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
298 Run the FinalizeCharacterizationTask.
303 Visit number. Used in the output catalogs.
305 Band name. Used to select reserved stars.
306 isolated_star_cat_dict : `dict`
307 Per-tract dict of isolated star catalog handles.
308 isolated_star_source_dict : `dict`
309 Per-tract dict of isolated star source catalog handles.
311 Per-detector dict of src catalog handles.
313 Per-detector dict of calibrated exposure handles.
317 struct : `lsst.pipe.base.struct`
318 Struct
with outputs
for persistence.
324 isolated_star_cat_dict,
325 isolated_star_source_dict
328 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema()
329 exposure_cat_schema.addField(
'visit', type=
'L', doc=
'Visit number')
332 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
333 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
336 psf_ap_corr_cat.setMetadata(metadata)
338 measured_src_tables = []
340 for detector
in src_dict:
341 src = src_dict[detector].get()
342 exposure = calexp_dict[detector].get()
349 isolated_source_table
353 record = psf_ap_corr_cat.addNew()
354 record[
'id'] =
int(detector)
355 record[
'visit'] = visit
358 if ap_corr_map
is not None:
359 record.setApCorrMap(ap_corr_map)
361 measured_src[
'visit'][:] = visit
362 measured_src[
'detector'][:] = detector
364 measured_src_tables.append(measured_src.asAstropy().as_array())
366 measured_src_table = np.concatenate(measured_src_tables)
368 return pipeBase.Struct(psf_ap_corr_cat=psf_ap_corr_cat,
369 output_table=measured_src_table)
371 def _make_output_schema_mapper(self, input_schema):
372 """Make the schema mapper from the input schema to the output schema.
384 Output schema (with alias map)
387 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema())
388 mapper.addMapping(input_schema['slot_Centroid_x'].asKey())
389 mapper.addMapping(input_schema[
'slot_Centroid_y'].asKey())
392 aper_fields = input_schema.extract(
'base_CircularApertureFlux_*')
393 for field, item
in aper_fields.items():
394 mapper.addMapping(item.key)
397 apflux_fields = input_schema.extract(
'slot_ApFlux_*')
398 for field, item
in apflux_fields.items():
399 mapper.addMapping(item.key)
401 calibflux_fields = input_schema.extract(
'slot_CalibFlux_*')
402 for field, item
in calibflux_fields.items():
403 mapper.addMapping(item.key)
406 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(),
407 'calib_psf_selection_flux')
409 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(),
410 'calib_psf_selection_flux_err')
412 output_schema = mapper.getOutputSchema()
414 output_schema.addField(
415 'calib_psf_candidate',
417 doc=(
'set if the source was a candidate for PSF determination, '
418 'as determined from FinalizeCharacterizationTask.'),
420 output_schema.addField(
421 'calib_psf_reserved',
423 doc=(
'set if source was reserved from PSF determination by '
424 'FinalizeCharacterizationTask.'),
426 output_schema.addField(
429 doc=(
'set if source was used in the PSF determination by '
430 'FinalizeCharacterizationTask.'),
432 output_schema.addField(
435 doc=
'Visit number for the sources.',
437 output_schema.addField(
440 doc=
'Detector number for the sources.',
443 alias_map = input_schema.getAliasMap()
445 alias_map_output.set(
'slot_Centroid', alias_map.get(
'slot_Centroid'))
446 alias_map_output.set(
'slot_ApFlux', alias_map.get(
'slot_ApFlux'))
447 alias_map_output.set(
'slot_CalibFlux', alias_map.get(
'slot_CalibFlux'))
449 output_schema.setAliasMap(alias_map_output)
451 return mapper, output_schema
453 def _make_selection_schema_mapper(self, input_schema):
454 """Make the schema mapper from the input schema to the selection schema.
466 Selection schema (with alias map)
469 mapper.addMinimalSchema(input_schema)
471 selection_schema = mapper.getOutputSchema()
473 selection_schema.setAliasMap(input_schema.getAliasMap())
475 return mapper, selection_schema
479 Concatenate isolated star catalogs and make reserve selection.
484 Band name. Used to select reserved stars.
485 isolated_star_cat_dict : `dict`
486 Per-tract dict of isolated star catalog handles.
487 isolated_star_source_dict : `dict`
488 Per-tract dict of isolated star source catalog handles.
492 isolated_table : `np.ndarray` (N,)
493 Table of isolated stars,
with indexes to isolated sources.
494 isolated_source_table : `np.ndarray` (M,)
495 Table of isolated sources,
with indexes to isolated stars.
498 isolated_sources = []
499 merge_cat_counter = 0
500 merge_source_counter = 0
502 for tract
in isolated_star_cat_dict:
503 df_cat = isolated_star_cat_dict[tract].get()
504 table_cat = df_cat.to_records()
506 df_source = isolated_star_source_dict[tract].get(
507 parameters={
'columns': [self.config.id_column,
510 table_source = df_source.to_records()
513 (use_band,) = (table_cat[f
'nsource_{band}'] > 0).nonzero()
515 if len(use_band) == 0:
517 self.log.
info(
"No sources found in %s band in tract %d.", band, tract)
522 obj_index = table_source[
'obj_index'][:]
523 a, b = esutil.numpy_util.match(use_band, obj_index)
526 table_source[
'obj_index'][b] = a
527 _, index_new = np.unique(a, return_index=
True)
528 table_cat[f
'source_cat_index_{band}'][use_band] = index_new
539 table_source = table_source[b]
540 table_cat = table_cat[use_band]
543 table_cat = np.lib.recfunctions.append_fields(
546 np.zeros(table_cat.size, dtype=bool),
549 table_source = np.lib.recfunctions.append_fields(
552 np.zeros(table_source.size, dtype=bool),
557 table_cat[
'reserved'][:] = self.reserve_selection.
run(
559 extra=f
'{band}_{tract}',
561 table_source[
'reserved'][:] = table_cat[
'reserved'][table_source[
'obj_index']]
564 table_cat[f
'source_cat_index_{band}'] += merge_source_counter
565 table_source[
'obj_index'] += merge_cat_counter
567 isolated_tables.append(table_cat)
568 isolated_sources.append(table_source)
570 merge_cat_counter += len(table_cat)
571 merge_source_counter += len(table_source)
573 isolated_table = np.concatenate(isolated_tables)
574 isolated_source_table = np.concatenate(isolated_sources)
576 return isolated_table, isolated_source_table
579 """Compute psf model and aperture correction map for a single exposure.
584 Visit number (for logging).
586 Detector number (
for logging).
587 exposure : `lsst.afw.image.ExposureF`
589 isolated_source_table : `np.ndarray`
596 Aperture correction map.
598 Updated source catalog
with measurements, flags
and aperture corrections.
601 good_src = self.source_selector.selectSources(src)
611 selected_src.reserve(good_src.selected.sum())
612 selected_src.extend(src[good_src.selected], mapper=selection_mapper)
616 selected_src[
'calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool)
617 selected_src[
'calib_psf_used'] = np.zeros(len(selected_src), dtype=bool)
618 selected_src[
'calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool)
621 matched_src, matched_iso = esutil.numpy_util.match(
623 isolated_source_table[self.config.id_column]
626 matched_arr = np.zeros(len(selected_src), dtype=bool)
627 matched_arr[matched_src] =
True
628 selected_src[
'calib_psf_candidate'] = matched_arr
630 reserved_arr = np.zeros(len(selected_src), dtype=bool)
631 reserved_arr[matched_src] = isolated_source_table[
'reserved'][matched_iso]
632 selected_src[
'calib_psf_reserved'] = reserved_arr
634 selected_src = selected_src[selected_src[
'calib_psf_candidate']].copy(deep=
True)
638 measured_src.reserve(len(selected_src))
639 measured_src.extend(selected_src, mapper=self.schema_mapper)
642 measured_src[
'calib_psf_candidate'] = selected_src[
'calib_psf_candidate']
643 measured_src[
'calib_psf_reserved'] = selected_src[
'calib_psf_reserved']
647 psf_selection_result = self.make_psf_candidates.
run(selected_src, exposure=exposure)
648 except Exception
as e:
649 self.log.
warning(
'Failed to make psf candidates for visit %d, detector %d: %s',
651 return None,
None, measured_src
653 psf_cand_cat = psf_selection_result.goodStarCat
657 psf_determiner_list = [cand
for cand, use
658 in zip(psf_selection_result.psfCandidates,
659 ~psf_cand_cat[
'calib_psf_reserved'])
if use]
660 flag_key = psf_cand_cat.schema[
'calib_psf_used'].asKey()
662 psf, cell_set = self.psf_determiner.determinePsf(exposure,
666 except Exception
as e:
667 self.log.
warning(
'Failed to determine psf for visit %d, detector %d: %s',
669 return None,
None, measured_src
676 matched_selected, matched_measured = esutil.numpy_util.match(
680 measured_used = np.zeros(len(measured_src), dtype=bool)
681 measured_used[matched_measured] = selected_src[
'calib_psf_used'][matched_selected]
682 measured_src[
'calib_psf_used'] = measured_used
686 self.measurement.
run(measCat=measured_src, exposure=exposure)
687 except Exception
as e:
688 self.log.
warning(
'Failed to make measurements for visit %d, detector %d: %s',
690 return psf,
None, measured_src
694 ap_corr_map = self.measure_ap_corr.
run(exposure=exposure,
695 catalog=measured_src).apCorrMap
696 except Exception
as e:
697 self.log.
warning(
'Failed to compute aperture corrections for visit %d, detector %d: %s',
699 return psf,
None, measured_src
701 self.apply_ap_corr.
run(catalog=measured_src, apCorrMap=ap_corr_map)
703 return psf, ap_corr_map, measured_src
A thin wrapper around std::map to allow aperture corrections to be attached to Exposures.
Mapping class that holds aliases for a Schema.
Custom catalog class for ExposureRecord/Table.
Defines the fields and offsets for a table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An intermediate base class for Psfs that use an image representation.
def _make_output_schema_mapper(self, input_schema)
def __init__(self, initInputs=None, **kwargs)
def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table)
def _make_selection_schema_mapper(self, input_schema)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_source_dict)
def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.