22"""Brighter Fatter Kernel calibration definition."""
25__all__ = [
'BrighterFatterKernel']
29from astropy.table
import Table
35 """Calibration of brighter-fatter kernels for an instrument.
37 ampKernels are the kernels for each amplifier
in a detector,
as
38 generated by having ``level ==
'AMP'``.
40 detectorKernel
is the kernel generated
for a detector
as a
41 whole,
as generated by having ``level ==
'DETECTOR'``.
43 makeDetectorKernelFromAmpwiseKernels
is a method to generate the
44 kernel
for a detector, constructed by averaging together the
45 ampwise kernels
in the detector. The existing application code
is
46 only defined
for kernels
with ``level ==
'DETECTOR'``, so this method
47 is used
if the supplied kernel was built
with ``level ==
'AMP'``.
52 Camera describing detector geometry.
54 Level the kernels will be generated
for.
55 log : `logging.Logger`, optional
56 Log to write messages to.
58 Parameters to
pass to parent constructor.
62 Version 1.1 adds the `expIdMask` property,
and substitutes
63 `means`
and `variances`
for `rawMeans`
and `rawVariances`
66 expIdMask : `dict`, [`str`,`numpy.ndarray`]
67 Dictionary keyed by amp names containing the mask produced after
69 rawMeans : `dict`, [`str`, `numpy.ndarray`]
70 Dictionary keyed by amp names containing the unmasked average of the
71 means of the exposures
in each flat pair.
72 rawVariances : `dict`, [`str`, `numpy.ndarray`]
73 Dictionary keyed by amp names containing the variance of the
74 difference image of the exposures
in each flat pair.
75 Corresponds to rawVars of PTC.
76 rawXcorrs : `dict`, [`str`, `numpy.ndarray`]
77 Dictionary keyed by amp names containing an array of measured
78 covariances per mean flux.
79 Corresponds to covariances of PTC.
81 List of bad amplifiers names.
83 Tuple of the shape of the BFK kernels.
84 gain : `dict`, [`str`,`float`]
85 Dictionary keyed by amp names containing the fitted gains.
86 noise : `dict`, [`str`,`float`]
87 Dictionary keyed by amp names containing the fitted noise.
88 meanXcorrs : `dict`, [`str`,`numpy.ndarray`]
89 Dictionary keyed by amp names containing the averaged
91 valid : `dict`, [`str`,`bool`]
92 Dictionary keyed by amp names containing validity of data.
93 ampKernels : `dict`, [`str`, `numpy.ndarray`]
94 Dictionary keyed by amp names containing the BF kernels.
96 Dictionary keyed by detector names containing the BF kernels.
99 _SCHEMA =
'Brighter-fatter kernel'
102 def __init__(self, camera=None, level=None, **kwargs):
126 self.
initFromCamera(camera, detectorId=kwargs.get(
'detectorId',
None))
128 self.requiredAttributes.update([
'level',
'expIdMask',
'rawMeans',
'rawVariances',
'rawXcorrs',
129 'badAmps',
'gain',
'noise',
'meanXcorrs',
'valid',
130 'ampKernels',
'detKernels'])
133 """Update calibration metadata.
135 This calls the base class's method after ensuring the required
136 calibration keywords will be saved.
140 setDate : `bool`, optional
141 Update the CALIBDATE fields in the metadata to the current
142 time. Defaults to
False.
144 Other keyword parameters to set
in the metadata.
146 kwargs['LEVEL'] = self.
level
147 kwargs[
'KERNEL_DX'] = self.
shape[0]
148 kwargs[
'KERNEL_DY'] = self.
shape[1]
153 """Initialize kernel structure from camera.
158 Camera to use to define geometry.
159 detectorId : `int`, optional
160 Index of the detector to generate.
165 The initialized calibration.
170 Raised if no detectorId
is supplied
for a calibration
with
175 if detectorId
is not None:
176 detector = camera[detectorId]
181 if self.
level ==
'AMP':
182 if detectorId
is None:
183 raise RuntimeError(
"A detectorId must be supplied if level='AMP'.")
188 ampName = amp.getName()
193 self.
gain[ampName] = amp.getGain()
194 self.
noise[ampName] = amp.getReadNoise()
197 self.
valid[ampName] = []
198 elif self.
level ==
'DETECTOR':
199 if detectorId
is None:
201 detName = det.getName()
209 """Return the set of lengths needed for reshaping components.
214 Product of the elements of self.shape.
216 Size of an untiled covariance.
218 Number of observation pairs used in the kernel.
221 smallLength = int((self.shape[0] - 1)*(self.shape[1] - 1)/4)
222 if self.
level ==
'AMP':
224 if len(nObservations) != 1:
225 raise RuntimeError(
"Inconsistent number of observations found.")
226 nObs = nObservations.pop()
230 return (kernelLength, smallLength, nObs)
234 """Construct a calibration from a dictionary of properties.
239 Dictionary of properties.
244 Constructed calibration.
249 Raised if the supplied dictionary
is for a different
251 Raised
if the version of the supplied dictionary
is 1.0.
255 if calib._OBSTYPE != (found := dictionary[
'metadata'][
'OBSTYPE']):
256 raise RuntimeError(f
"Incorrect brighter-fatter kernel supplied. Expected {calib._OBSTYPE}, "
258 calib.setMetadata(dictionary[
'metadata'])
259 calib.calibInfoFromDict(dictionary)
261 calib.level = dictionary[
'metadata'].get(
'LEVEL',
'AMP')
262 calib.shape = (dictionary[
'metadata'].get(
'KERNEL_DX', 0),
263 dictionary[
'metadata'].get(
'KERNEL_DY', 0))
265 calibVersion = dictionary[
'metadata'][
'bfk_VERSION']
266 if calibVersion == 1.0:
267 calib.log.info(
"Old Version of brighter-fatter kernel found. Current version: "
268 f
"{calib._VERSION}. The new attribute 'expIdMask' will be "
269 "populated with 'True' values, and the new attributes 'rawMeans' "
270 "and 'rawVariances' will be populated with the masked 'means' "
271 "and 'variances' values."
274 calib.expIdMask = {amp: np.repeat(
True, len(dictionary[
'means'][amp]))
for amp
in
276 calib.rawMeans = {amp: np.array(dictionary[
'means'][amp])
for amp
in dictionary[
'means']}
277 calib.rawVariances = {amp: np.array(dictionary[
'variances'][amp])
for amp
in
278 dictionary[
'variances']}
279 elif calibVersion == 1.1:
280 calib.expIdMask = {amp: np.array(dictionary[
'expIdMask'][amp])
for amp
in dictionary[
'expIdMask']}
281 calib.rawMeans = {amp: np.array(dictionary[
'rawMeans'][amp])
for amp
in dictionary[
'rawMeans']}
282 calib.rawVariances = {amp: np.array(dictionary[
'rawVariances'][amp])
for amp
in
283 dictionary[
'rawVariances']}
285 raise RuntimeError(f
"Unknown version for brighter-fatter kernel: {calibVersion}")
288 _, smallLength, nObs = calib.getLengths()
289 smallShapeSide = int(np.sqrt(smallLength))
291 calib.rawXcorrs = {amp: np.array(dictionary[
'rawXcorrs'][amp]).reshape((nObs,
294 for amp
in dictionary[
'rawXcorrs']}
296 calib.gain = dictionary[
'gain']
297 calib.noise = dictionary[
'noise']
299 calib.meanXcorrs = {amp: np.array(dictionary[
'meanXcorrs'][amp]).reshape(calib.shape)
300 for amp
in dictionary[
'rawXcorrs']}
301 calib.ampKernels = {amp: np.array(dictionary[
'ampKernels'][amp]).reshape(calib.shape)
302 for amp
in dictionary[
'ampKernels']}
303 calib.valid = {amp: bool(value)
for amp, value
in dictionary[
'valid'].
items()}
304 calib.badAmps = [amp
for amp, valid
in dictionary[
'valid'].
items()
if valid
is False]
306 calib.detKernels = {det: np.array(dictionary[
'detKernels'][det]).reshape(calib.shape)
307 for det
in dictionary[
'detKernels']}
309 calib.updateMetadata()
313 """Return a dictionary containing the calibration properties.
315 The dictionary should be able to be round-tripped through
321 Dictionary of properties.
326 metadata = self.getMetadata()
327 outDict['metadata'] = metadata
330 kernelLength, smallLength, nObs = self.
getLengths()
332 outDict[
'expIdMask'] = {amp: np.array(self.
expIdMask[amp]).tolist()
for amp
in self.
expIdMask}
333 outDict[
'rawMeans'] = {amp: np.array(self.
rawMeans[amp]).tolist()
for amp
in self.
rawMeans}
334 outDict[
'rawVariances'] = {amp: np.array(self.
rawVariances[amp]).tolist()
for amp
in
339 correlationShape = np.array(self.
rawXcorrs[amp]).shape
340 if nObs != correlationShape[0]:
341 if correlationShape[0] == np.sum(self.
expIdMask[amp]):
345 raise ValueError(
"Could not coerce rawXcorrs into appropriate shape "
346 "(have %d correlations, but expect to see %d.",
347 correlationShape[0], np.sum(self.
expIdMask[amp]))
349 outDict[
'rawXcorrs'] = {amp: np.array(self.
rawXcorrs[amp]).reshape(nObs*smallLength).tolist()
351 outDict[
'badAmps'] = self.
badAmps
352 outDict[
'gain'] = self.
gain
353 outDict[
'noise'] = self.
noise
355 outDict[
'meanXcorrs'] = {amp: self.
meanXcorrs[amp].reshape(kernelLength).tolist()
357 outDict[
'ampKernels'] = {amp: self.
ampKernels[amp].reshape(kernelLength).tolist()
359 outDict[
'valid'] = self.
valid
361 outDict[
'detKernels'] = {det: self.
detKernels[det].reshape(kernelLength).tolist()
367 """Construct calibration from a list of tables.
369 This method uses the `fromDict` method to create the
370 calibration, after constructing an appropriate dictionary from
375 tableList : `list` [`astropy.table.Table`]
376 List of tables to use to construct the brighter-fatter
382 The calibration defined
in the tables.
384 ampTable = tableList[0]
386 metadata = ampTable.meta
388 inDict['metadata'] = metadata
390 amps = ampTable[
'AMPLIFIER']
395 calibVersion = metadata[
'bfk_VERSION']
397 if calibVersion == 1.0:
401 rawMeanList = ampTable[
'MEANS']
402 rawVarianceList = ampTable[
'VARIANCES']
404 inDict[
'means'] = {amp: mean
for amp, mean
in zip(amps, rawMeanList)}
405 inDict[
'variances'] = {amp: var
for amp, var
in zip(amps, rawVarianceList)}
406 elif calibVersion == 1.1:
409 expIdMaskList = ampTable[
'EXP_ID_MASK']
410 rawMeanList = ampTable[
'RAW_MEANS']
411 rawVarianceList = ampTable[
'RAW_VARIANCES']
413 inDict[
'expIdMask'] = {amp: mask
for amp, mask
in zip(amps, expIdMaskList)}
414 inDict[
'rawMeans'] = {amp: mean
for amp, mean
in zip(amps, rawMeanList)}
415 inDict[
'rawVariances'] = {amp: var
for amp, var
in zip(amps, rawVarianceList)}
417 raise RuntimeError(f
"Unknown version for brighter-fatter kernel: {calibVersion}")
419 rawXcorrs = ampTable[
'RAW_XCORRS']
420 gainList = ampTable[
'GAIN']
421 noiseList = ampTable[
'NOISE']
423 meanXcorrs = ampTable[
'MEAN_XCORRS']
424 ampKernels = ampTable[
'KERNEL']
425 validList = ampTable[
'VALID']
427 inDict[
'rawXcorrs'] = {amp: kernel
for amp, kernel
in zip(amps, rawXcorrs)}
428 inDict[
'gain'] = {amp: gain
for amp, gain
in zip(amps, gainList)}
429 inDict[
'noise'] = {amp: noise
for amp, noise
in zip(amps, noiseList)}
430 inDict[
'meanXcorrs'] = {amp: kernel
for amp, kernel
in zip(amps, meanXcorrs)}
431 inDict[
'ampKernels'] = {amp: kernel
for amp, kernel
in zip(amps, ampKernels)}
432 inDict[
'valid'] = {amp: bool(valid)
for amp, valid
in zip(amps, validList)}
434 inDict[
'badAmps'] = [amp
for amp, valid
in inDict[
'valid'].
items()
if valid
is False]
436 if len(tableList) > 1:
437 detTable = tableList[1]
438 inDict[
'detKernels'] = {det: kernel
for det, kernel
439 in zip(detTable[
'DETECTOR'], detTable[
'KERNEL'])}
441 inDict[
'detKernels'] = {}
446 """Construct a list of tables containing the information in this
449 The list of tables should create an identical calibration
450 after being passed to this class's fromTable method.
454 tableList : `list` [`lsst.afw.table.Table`]
455 List of tables containing the crosstalk calibration
463 kernelLength, smallLength, nObs = self.
getLengths()
477 if self.
level ==
'AMP':
480 expIdMaskList.append(self.
expIdMask[amp])
481 rawMeanList.append(self.
rawMeans[amp])
484 correlationShape = np.array(self.
rawXcorrs[amp]).shape
485 if nObs != correlationShape[0]:
486 if correlationShape[0] == np.sum(self.
expIdMask[amp]):
490 raise ValueError(
"Could not coerce rawXcorrs into appropriate shape "
491 "(have %d correlations, but expect to see %d.",
492 correlationShape[0], np.sum(self.
expIdMask[amp]))
494 rawXcorrs.append(np.array(self.
rawXcorrs[amp]).reshape(nObs*smallLength).tolist())
495 gainList.append(self.
gain[amp])
496 noiseList.append(self.
noise[amp])
498 meanXcorrsList.append(self.
meanXcorrs[amp].reshape(kernelLength).tolist())
499 kernelList.append(self.
ampKernels[amp].reshape(kernelLength).tolist())
500 validList.append(int(self.
valid[amp]
and not (amp
in self.
badAmps)))
502 ampTable = Table({
'AMPLIFIER': ampList,
503 'EXP_ID_MASK': expIdMaskList,
504 'RAW_MEANS': rawMeanList,
505 'RAW_VARIANCES': rawVarianceList,
506 'RAW_XCORRS': rawXcorrs,
509 'MEAN_XCORRS': meanXcorrsList,
510 'KERNEL': kernelList,
514 ampTable.meta = self.getMetadata().
toDict()
515 tableList.append(ampTable)
522 kernelList.append(self.
detKernels[det].reshape(kernelLength).tolist())
524 detTable = Table({
'DETECTOR': detList,
525 'KERNEL': kernelList})
526 detTable.meta = self.getMetadata().
toDict()
527 tableList.append(detTable)
532 """If the correlations were masked, they need to be repacked into the
538 Amplifier needing repacked.
539 correlationShape : `tuple` [`int`], (3, )
540 Shape the correlations are expected to take.
542 repackedCorrelations = []
546 repackedCorrelations.append(self.
rawXcorrs[amp][idx])
549 repackedCorrelations.append(np.full((correlationShape[1], correlationShape[2]), np.nan))
550 self.
rawXcorrs[amp] = repackedCorrelations
554 """Average the amplifier level kernels to create a detector level
555 kernel. There is no change
in index ordering/orientation
from
561 Detector
for which the averaged kernel will be used.
562 ampsToExclude : `list` [`str`], optional
563 Amps that should
not be included
in the average.
565 inKernels = np.array([self.ampKernels[amp] for amp
in
567 avgKernel = np.zeros_like(inKernels[0])
569 sctrl.setNumSigmaClip(5.0)
570 for i
in range(np.shape(avgKernel)[0]):
571 for j
in range(np.shape(avgKernel)[1]):
573 afwMath.MEANCLIP, sctrl).getValue()
578 self.detKernel[detectorName] = self.ampKernel[ampName]
std::vector< SchemaItem< Flag > > * items
An immutable representation of a camera.
Pass parameters to a Statistics object.
fromDict(cls, dictionary)
initFromCamera(self, camera, detectorId=None)
replaceDetectorKernelWithAmpKernel(self, ampName, detectorName)
fromTable(cls, tableList)
makeDetectorKernelFromAmpwiseKernels(self, detectorName, ampsToExclude=[])
updateMetadata(self, setDate=False, **kwargs)
__init__(self, camera=None, level=None, **kwargs)
repackCorrelations(self, amp, correlationShape)
daf::base::PropertyList * list
daf::base::PropertySet * set
Statistics makeStatistics(lsst::afw::image::Image< Pixel > const &img, lsst::afw::image::Mask< image::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl=StatisticsControl())
Handle a watered-down front-end to the constructor (no variance)