22"""Brighter Fatter Kernel calibration definition."""
25__all__ = [
'BrighterFatterKernel']
29from astropy.table
import Table
35 """Calibration of brighter-fatter kernels for an instrument.
37 ampKernels are the kernels for each amplifier
in a detector,
as
38 generated by having level ==
'AMP'
40 detectorKernel
is the kernel generated
for a detector
as a
41 whole,
as generated by having level ==
'DETECTOR'
43 makeDetectorKernelFromAmpwiseKernels
is a method to generate the
44 kernel
for a detector, constructed by averaging together the
45 ampwise kernels
in the detector. The existing application code
is
46 only defined
for kernels
with level ==
'DETECTOR', so this method
47 is used
if the supplied kernel was built
with level ==
'AMP'.
52 Level the kernels will be generated
for.
53 log : `logging.Logger`, optional
54 Log to write messages to.
56 Parameters to
pass to parent constructor.
61 Document what
is stored
in the BFK calibration.
63 Version 1.1 adds the `expIdMask` property,
and substitutes
64 `means`
and `variances`
for `rawMeans`
and `rawVariances`
68 _SCHEMA =
'Brighter-fatter kernel'
71 def __init__(self, camera=None, level=None, **kwargs):
95 self.
initFromCamerainitFromCamera(camera, detectorId=kwargs.get(
'detectorId',
None))
97 self.requiredAttributes.update([
'level',
'expIdMask',
'rawMeans',
'rawVariances',
'rawXcorrs',
98 'badAmps',
'gain',
'noise',
'meanXcorrs',
'valid',
99 'ampKernels',
'detKernels'])
102 """Update calibration metadata.
104 This calls the base class's method after ensuring the required
105 calibration keywords will be saved.
109 setDate : `bool`, optional
110 Update the CALIBDATE fields in the metadata to the current
111 time. Defaults to
False.
113 Other keyword parameters to set
in the metadata.
115 kwargs['LEVEL'] = self.
levellevel
116 kwargs[
'KERNEL_DX'] = self.
shapeshape[0]
117 kwargs[
'KERNEL_DY'] = self.
shapeshape[1]
122 """Initialize kernel structure from camera.
127 Camera to use to define geometry.
128 detectorId : `int`, optional
129 Index of the detector to generate.
134 The initialized calibration.
139 Raised if no detectorId
is supplied
for a calibration
with
144 if detectorId
is not None:
145 detector = camera[detectorId]
150 if self.
levellevel ==
'AMP':
151 if detectorId
is None:
152 raise RuntimeError(
"A detectorId must be supplied if level='AMP'.")
157 ampName = amp.getName()
162 self.
gaingain[ampName] = amp.getGain()
163 self.
noisenoise[ampName] = amp.getReadNoise()
166 self.
validvalid[ampName] = []
167 elif self.
levellevel ==
'DETECTOR':
168 if detectorId
is None:
170 detName = det.getName()
178 """Return the set of lengths needed for reshaping components.
183 Product of the elements of self.shapeshape.
185 Size of an untiled covariance.
187 Number of observation pairs used in the kernel.
189 kernelLength = self.shapeshape[0] * self.shapeshape[1]
190 smallLength = int((self.shapeshape[0] - 1)*(self.shapeshape[1] - 1)/4)
191 if self.
levellevel ==
'AMP':
193 if len(nObservations) != 1:
194 raise RuntimeError(
"Inconsistent number of observations found.")
195 nObs = nObservations.pop()
199 return (kernelLength, smallLength, nObs)
203 """Construct a calibration from a dictionary of properties.
208 Dictionary of properties.
213 Constructed calibration.
218 Raised if the supplied dictionary
is for a different
220 Raised
if the version of the supplied dictionary
is 1.0.
224 if calib._OBSTYPE != (found := dictionary[
'metadata'][
'OBSTYPE']):
225 raise RuntimeError(f
"Incorrect brighter-fatter kernel supplied. Expected {calib._OBSTYPE}, "
227 calib.setMetadata(dictionary[
'metadata'])
228 calib.calibInfoFromDict(dictionary)
230 calib.level = dictionary[
'metadata'].get(
'LEVEL',
'AMP')
231 calib.shape = (dictionary[
'metadata'].get(
'KERNEL_DX', 0),
232 dictionary[
'metadata'].get(
'KERNEL_DY', 0))
234 calibVersion = dictionary[
'metadata'][
'bfk_VERSION']
235 if calibVersion == 1.0:
236 calib.log.warning(
"Old Version of brighter-fatter kernel found. Current version: "
237 f
"{calib._VERSION}. The new attribute 'expIdMask' will be "
238 "populated with 'True' values, and the new attributes 'rawMeans'"
239 "and 'rawVariances' will be populated with the masked 'means'."
240 "and 'variances' values."
243 calib.expIdMask = {amp: np.repeat(
True, len(dictionary[
'means'][amp]))
for amp
in
245 calib.rawMeans = {amp: np.array(dictionary[
'means'][amp])
for amp
in dictionary[
'means']}
246 calib.rawVariances = {amp: np.array(dictionary[
'variances'][amp])
for amp
in
247 dictionary[
'variances']}
248 elif calibVersion == 1.1:
249 calib.expIdMask = {amp: np.array(dictionary[
'expIdMask'][amp])
for amp
in dictionary[
'expIdMask']}
250 calib.rawMeans = {amp: np.array(dictionary[
'rawMeans'][amp])
for amp
in dictionary[
'rawMeans']}
251 calib.rawVariances = {amp: np.array(dictionary[
'rawVariances'][amp])
for amp
in
252 dictionary[
'rawVariances']}
254 raise RuntimeError(f
"Unknown version for brighter-fatter kernel: {calibVersion}")
257 _, smallLength, nObs = calib.getLengths()
258 smallShapeSide =
int(np.sqrt(smallLength))
260 calib.rawXcorrs = {amp: np.array(dictionary[
'rawXcorrs'][amp]).reshape((nObs,
263 for amp
in dictionary[
'rawXcorrs']}
265 calib.gain = dictionary[
'gain']
266 calib.noise = dictionary[
'noise']
268 calib.meanXcorrs = {amp: np.array(dictionary[
'meanXcorrs'][amp]).reshape(calib.shape)
269 for amp
in dictionary[
'rawXcorrs']}
270 calib.ampKernels = {amp: np.array(dictionary[
'ampKernels'][amp]).reshape(calib.shape)
271 for amp
in dictionary[
'ampKernels']}
272 calib.valid = {amp: bool(value)
for amp, value
in dictionary[
'valid'].
items()}
273 calib.badAmps = [amp
for amp, valid
in dictionary[
'valid'].
items()
if valid
is False]
275 calib.detKernels = {det: np.array(dictionary[
'detKernels'][det]).reshape(calib.shape)
276 for det
in dictionary[
'detKernels']}
278 calib.updateMetadata()
282 """Return a dictionary containing the calibration properties.
284 The dictionary should be able to be round-tripped through
290 Dictionary of properties.
295 metadata = self.getMetadata()
296 outDict['metadata'] = metadata
299 kernelLength, smallLength, nObs = self.
getLengthsgetLengths()
301 outDict[
'expIdMask'] = {amp: np.array(self.
expIdMaskexpIdMask[amp]).tolist()
for amp
in self.
expIdMaskexpIdMask}
302 outDict[
'rawMeans'] = {amp: np.array(self.
rawMeansrawMeans[amp]).tolist()
for amp
in self.
rawMeansrawMeans}
303 outDict[
'rawVariances'] = {amp: np.array(self.
rawVariancesrawVariances[amp]).tolist()
for amp
in
305 outDict[
'rawXcorrs'] = {amp: np.array(self.
rawXcorrsrawXcorrs[amp]).reshape(nObs*smallLength).tolist()
307 outDict[
'badAmps'] = self.
badAmpsbadAmps
308 outDict[
'gain'] = self.
gaingain
309 outDict[
'noise'] = self.
noisenoise
311 outDict[
'meanXcorrs'] = {amp: self.
meanXcorrsmeanXcorrs[amp].reshape(kernelLength).tolist()
313 outDict[
'ampKernels'] = {amp: self.
ampKernelsampKernels[amp].reshape(kernelLength).tolist()
315 outDict[
'valid'] = self.
validvalid
317 outDict[
'detKernels'] = {det: self.
detKernelsdetKernels[det].reshape(kernelLength).tolist()
323 """Construct calibration from a list of tables.
325 This method uses the `fromDict` method to create the
326 calibration, after constructing an appropriate dictionary from
331 tableList : `list` [`astropy.table.Table`]
332 List of tables to use to construct the brighter-fatter
338 The calibration defined
in the tables.
340 ampTable = tableList[0]
342 metadata = ampTable.meta
344 inDict['metadata'] = metadata
346 amps = ampTable[
'AMPLIFIER']
351 calibVersion = metadata[
'bfk_VERSION']
353 if calibVersion == 1.0:
357 rawMeanList = ampTable[
'MEANS']
358 rawVarianceList = ampTable[
'VARIANCES']
360 inDict[
'means'] = {amp: mean
for amp, mean
in zip(amps, rawMeanList)}
361 inDict[
'variances'] = {amp: var
for amp, var
in zip(amps, rawVarianceList)}
362 elif calibVersion == 1.1:
365 expIdMaskList = ampTable[
'EXP_ID_MASK']
366 rawMeanList = ampTable[
'RAW_MEANS']
367 rawVarianceList = ampTable[
'RAW_VARIANCES']
369 inDict[
'expIdMask'] = {amp: mask
for amp, mask
in zip(amps, expIdMaskList)}
370 inDict[
'rawMeans'] = {amp: mean
for amp, mean
in zip(amps, rawMeanList)}
371 inDict[
'rawVariances'] = {amp: var
for amp, var
in zip(amps, rawVarianceList)}
373 raise RuntimeError(f
"Unknown version for brighter-fatter kernel: {calibVersion}")
375 rawXcorrs = ampTable[
'RAW_XCORRS']
376 gainList = ampTable[
'GAIN']
377 noiseList = ampTable[
'NOISE']
379 meanXcorrs = ampTable[
'MEAN_XCORRS']
380 ampKernels = ampTable[
'KERNEL']
381 validList = ampTable[
'VALID']
383 inDict[
'rawXcorrs'] = {amp: kernel
for amp, kernel
in zip(amps, rawXcorrs)}
384 inDict[
'gain'] = {amp: gain
for amp, gain
in zip(amps, gainList)}
385 inDict[
'noise'] = {amp: noise
for amp, noise
in zip(amps, noiseList)}
386 inDict[
'meanXcorrs'] = {amp: kernel
for amp, kernel
in zip(amps, meanXcorrs)}
387 inDict[
'ampKernels'] = {amp: kernel
for amp, kernel
in zip(amps, ampKernels)}
388 inDict[
'valid'] = {amp: bool(valid)
for amp, valid
in zip(amps, validList)}
390 inDict[
'badAmps'] = [amp
for amp, valid
in inDict[
'valid'].
items()
if valid
is False]
392 if len(tableList) > 1:
393 detTable = tableList[1]
394 inDict[
'detKernels'] = {det: kernel
for det, kernel
395 in zip(detTable[
'DETECTOR'], detTable[
'KERNEL'])}
397 inDict[
'detKernels'] = {}
402 """Construct a list of tables containing the information in this
405 The list of tables should create an identical calibration
406 after being passed to this class's fromTable method.
410 tableList : `list` [`lsst.afw.table.Table`]
411 List of tables containing the crosstalk calibration
419 kernelLength, smallLength, nObs = self.
getLengthsgetLengths()
433 if self.
levellevel ==
'AMP':
436 expIdMaskList.append(self.
expIdMaskexpIdMask[amp])
437 rawMeanList.append(self.
rawMeansrawMeans[amp])
438 rawVarianceList.append(self.
rawVariancesrawVariances[amp])
439 rawXcorrs.append(np.array(self.
rawXcorrsrawXcorrs[amp]).reshape(nObs*smallLength).tolist())
440 gainList.append(self.
gaingain[amp])
441 noiseList.append(self.
noisenoise[amp])
443 meanXcorrsList.append(self.
meanXcorrsmeanXcorrs[amp].reshape(kernelLength).tolist())
444 kernelList.append(self.
ampKernelsampKernels[amp].reshape(kernelLength).tolist())
445 validList.append(
int(self.
validvalid[amp]
and not (amp
in self.
badAmpsbadAmps)))
447 ampTable = Table({
'AMPLIFIER': ampList,
448 'EXP_ID_MASK': expIdMaskList,
449 'RAW_MEANS': rawMeanList,
450 'RAW_VARIANCES': rawVarianceList,
451 'RAW_XCORRS': rawXcorrs,
454 'MEAN_XCORRS': meanXcorrsList,
455 'KERNEL': kernelList,
459 ampTable.meta = self.getMetadata().
toDict()
460 tableList.append(ampTable)
467 kernelList.append(self.
detKernelsdetKernels[det].reshape(kernelLength).tolist())
469 detTable = Table({
'DETECTOR': detList,
470 'KERNEL': kernelList})
471 detTable.meta = self.getMetadata().
toDict()
472 tableList.append(detTable)
478 """Average the amplifier level kernels to create a detector level
481 inKernels = np.array([self.ampKernelsampKernels[amp] for amp
in
482 self.
ampKernelsampKernels
if amp
not in ampsToExclude])
483 averagingList = np.transpose(inKernels)
484 avgKernel = np.zeros_like(inKernels[0])
486 sctrl.setNumSigmaClip(5.0)
487 for i
in range(np.shape(avgKernel)[0]):
488 for j
in range(np.shape(avgKernel)[1]):
490 afwMath.MEANCLIP, sctrl).getValue()
492 self.
detKernelsdetKernels[detectorName] = avgKernel
495 self.detKernel[detectorName] = self.ampKernel[ampName]
std::vector< SchemaItem< Flag > > * items
An immutable representation of a camera.
Pass parameters to a Statistics object.
def __init__(self, camera=None, level=None, **kwargs)
def fromTable(cls, tableList)
def initFromCamera(self, camera, detectorId=None)
def updateMetadata(self, setDate=False, **kwargs)
def replaceDetectorKernelWithAmpKernel(self, ampName, detectorName)
def fromDict(cls, dictionary)
def makeDetectorKernelFromAmpwiseKernels(self, detectorName, ampsToExclude=[])
daf::base::PropertyList * list
daf::base::PropertySet * set
Statistics makeStatistics(lsst::afw::image::Image< Pixel > const &img, lsst::afw::image::Mask< image::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl=StatisticsControl())
Handle a watered-down front-end to the constructor (no variance)