Loading [MathJax]/extensions/tex2jax.js
LSST Applications g032c94a9f9+a1301e4c20,g04a91732dc+321623803d,g07dc498a13+dc60e07e33,g0fba68d861+b7e4830700,g1409bbee79+dc60e07e33,g1a7e361dbc+dc60e07e33,g1fd858c14a+bc317df4c0,g208c678f98+64d2817f4c,g2c84ff76c0+9484f2668e,g35bb328faa+fcb1d3bbc8,g4d2262a081+fb060387ce,g4d39ba7253+0f38e7b1d1,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g60b5630c4e+0f38e7b1d1,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+1ad2ae6bba,g89139ef638+dc60e07e33,g8d6b6b353c+0f38e7b1d1,g9125e01d80+fcb1d3bbc8,g989de1cb63+dc60e07e33,g9f33ca652e+602c5da793,ga9baa6287d+0f38e7b1d1,gaaedd4e678+dc60e07e33,gabe3b4be73+1e0a283bba,gb1101e3267+4e433ac613,gb58c049af0+f03b321e39,gb90eeb9370+6b7d01c6c0,gcf25f946ba+1ad2ae6bba,gd315a588df+cb74d54ad7,gd6cbbdb0b4+c8606af20c,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+93c67b85fe,ge278dab8ac+932305ba37,ge82c20c137+76d20ab76d,gf18def8413+4ce00804e3,w.2025.11
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
deferredCharge.py
Go to the documentation of this file.
1# This file is part of ip_isr.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ('DeferredChargeConfig',
23 'DeferredChargeTask',
24 'SerialTrap',
25 'OverscanModel',
26 'SimpleModel',
27 'SimulatedModel',
28 'SegmentSimulator',
29 'FloatingOutputAmplifier',
30 'DeferredChargeCalib',
31 )
32
33import copy
34import numpy as np
35import warnings
36from astropy.table import Table
37
38from lsst.afw.cameraGeom import ReadoutCorner
39from lsst.pex.config import Config, Field
40from lsst.pipe.base import Task
41from .isrFunctions import gainContext
42from .calibType import IsrCalib
43
44import scipy.interpolate as interp
45
46
47class SerialTrap():
48 """Represents a serial register trap.
49
50 Parameters
51 ----------
52 size : `float`
53 Size of the charge trap, in electrons.
54 emission_time : `float`
55 Trap emission time constant, in inverse transfers.
56 pixel : `int`
57 Serial pixel location of the trap, including the prescan.
58 trap_type : `str`
59 Type of trap capture to use. Should be one of ``linear``,
60 ``logistic``, or ``spline``.
61 coeffs : `list` [`float`]
62 Coefficients for the capture process. Linear traps need one
63 coefficient, logistic traps need two, and spline based traps
64 need to have an even number of coefficients that can be split
65 into their spline locations and values.
66
67 Raises
68 ------
69 ValueError
70 Raised if the specified parameters are out of expected range.
71 """
72
73 def __init__(self, size, emission_time, pixel, trap_type, coeffs):
74 if size < 0.0:
75 raise ValueError('Trap size must be greater than or equal to 0.')
76 self.size = size
77
78 if emission_time <= 0.0:
79 raise ValueError('Emission time must be greater than 0.')
80 if np.isnan(emission_time):
81 raise ValueError('Emission time must be real-valued, not NaN')
82 self.emission_time = emission_time
83
84 if int(pixel) != pixel:
85 raise ValueError('Fraction value for pixel not allowed.')
86 self.pixel = int(pixel)
87
88 self.trap_type = trap_type
89 self.coeffs = coeffs
90
91 if self.trap_type not in ('linear', 'logistic', 'spline'):
92 raise ValueError('Unknown trap type: %s', self.trap_type)
93
94 if self.trap_type == 'spline':
95 # Note that ``spline`` is actually a piecewise linear interpolation
96 # in the model and the application, and not a true spline.
97 centers, values = np.split(np.array(self.coeffs, dtype=np.float64), 2)
98 # Ensure all NaN values are stripped out
99 values = values[~np.isnan(centers)]
100 centers = centers[~np.isnan(centers)]
101 centers = centers[~np.isnan(values)]
102 values = values[~np.isnan(values)]
103 self.interp = interp.interp1d(
104 centers,
105 values,
106 bounds_error=False,
107 fill_value=(values[0], values[-1]),
108 )
109
110 self._trap_array = None
111 self._trapped_charge = None
112
113 def __eq__(self, other):
114 # A trap is equal to another trap if all of the initialization
115 # parameters are equal. All other properties are only filled
116 # during use, and are not persisted into the calibration.
117 if self.size != other.size:
118 return False
119 if self.emission_time != other.emission_time:
120 return False
121 if self.pixel != other.pixel:
122 return False
123 if self.trap_type != other.trap_type:
124 return False
125 if self.coeffs != other.coeffs:
126 return False
127 return True
128
129 @property
130 def trap_array(self):
131 return self._trap_array
132
133 @property
134 def trapped_charge(self):
135 return self._trapped_charge
136
137 def initialize(self, ny, nx, prescan_width):
138 """Initialize trapping arrays for simulated readout.
139
140 Parameters
141 ----------
142 ny : `int`
143 Number of rows to simulate.
144 nx : `int`
145 Number of columns to simulate.
146 prescan_width : `int`
147 Additional transfers due to prescan.
148
149 Raises
150 ------
151 ValueError
152 Raised if the trap falls outside of the image.
153 """
154 if self.pixel > nx+prescan_width:
155 raise ValueError('Trap location {0} must be less than {1}'.format(self.pixel,
156 nx+prescan_width))
157
158 self._trap_array = np.zeros((ny, nx+prescan_width))
159 self._trap_array[:, self.pixel] = self.size
160 self._trapped_charge = np.zeros((ny, nx+prescan_width))
161
162 def release_charge(self):
163 """Release charge through exponential decay.
164
165 Returns
166 -------
167 released_charge : `float`
168 Charge released.
169 """
170 released_charge = self._trapped_charge*(1-np.exp(-1./self.emission_time))
171 self._trapped_charge -= released_charge
172
173 return released_charge
174
175 def trap_charge(self, free_charge):
176 """Perform charge capture using a logistic function.
177
178 Parameters
179 ----------
180 free_charge : `float`
181 Charge available to be trapped.
182
183 Returns
184 -------
185 captured_charge : `float`
186 Amount of charge actually trapped.
187 """
188 captured_charge = (np.clip(self.capture(free_charge), self.trapped_charge, self._trap_array)
189 - self.trapped_charge)
190 self._trapped_charge += captured_charge
191
192 return captured_charge
193
194 def capture(self, pixel_signals):
195 """Trap capture function.
196
197 Parameters
198 ----------
199 pixel_signals : `list` [`float`]
200 Input pixel values.
201
202 Returns
203 -------
204 captured_charge : `list` [`float`]
205 Amount of charge captured from each pixel.
206
207 Raises
208 ------
209 RuntimeError
210 Raised if the trap type is invalid.
211 """
212 if self.trap_type == 'linear':
213 scaling = self.coeffs[0]
214 return np.minimum(self.size, pixel_signals*scaling)
215 elif self.trap_type == 'logistic':
216 f0, k = (self.coeffs[0], self.coeffs[1])
217 return self.size/(1.+np.exp(-k*(pixel_signals-f0)))
218 elif self.trap_type == 'spline':
219 return self.interp(pixel_signals)
220 else:
221 raise RuntimeError(f"Invalid trap capture type: {self.trap_type}.")
222
223
225 """Base class for handling model/data fit comparisons.
226 This handles all of the methods needed for the lmfit Minimizer to
227 run.
228 """
229
230 @staticmethod
231 def model_results(params, signal, num_transfers, start=1, stop=10):
232 """Generate a realization of the overscan model, using the specified
233 fit parameters and input signal.
234
235 Parameters
236 ----------
237 params : `lmfit.Parameters`
238 Object containing the model parameters.
239 signal : `np.ndarray`, (nMeasurements)
240 Array of image means.
241 num_transfers : `int`
242 Number of serial transfers that the charge undergoes.
243 start : `int`, optional
244 First overscan column to fit. This number includes the
245 last imaging column, and needs to be adjusted by one when
246 using the overscan bounding box.
247 stop : `int`, optional
248 Last overscan column to fit. This number includes the
249 last imaging column, and needs to be adjusted by one when
250 using the overscan bounding box.
251
252 Returns
253 -------
254 results : `np.ndarray`, (nMeasurements, nCols)
255 Model results.
256 """
257 raise NotImplementedError("Subclasses must implement the model calculation.")
258
259 def loglikelihood(self, params, signal, data, error, *args, **kwargs):
260 """Calculate log likelihood of the model.
261
262 Parameters
263 ----------
264 params : `lmfit.Parameters`
265 Object containing the model parameters.
266 signal : `np.ndarray`, (nMeasurements)
267 Array of image means.
268 data : `np.ndarray`, (nMeasurements, nCols)
269 Array of overscan column means from each measurement.
270 error : `float`
271 Fixed error value.
272 *args :
273 Additional position arguments.
274 **kwargs :
275 Additional keyword arguments.
276
277 Returns
278 -------
279 logL : `float`
280 The log-likelihood of the observed data given the model
281 parameters.
282 """
283 model_results = self.model_results(params, signal, *args, **kwargs)
284
285 inv_sigma2 = 1.0/(error**2.0)
286 diff = model_results - data
287
288 return -0.5*(np.sum(inv_sigma2*(diff)**2.))
289
290 def negative_loglikelihood(self, params, signal, data, error, *args, **kwargs):
291 """Calculate negative log likelihood of the model.
292
293 Parameters
294 ----------
295 params : `lmfit.Parameters`
296 Object containing the model parameters.
297 signal : `np.ndarray`, (nMeasurements)
298 Array of image means.
299 data : `np.ndarray`, (nMeasurements, nCols)
300 Array of overscan column means from each measurement.
301 error : `float`
302 Fixed error value.
303 *args :
304 Additional position arguments.
305 **kwargs :
306 Additional keyword arguments.
307
308 Returns
309 -------
310 negativelogL : `float`
311 The negative log-likelihood of the observed data given the
312 model parameters.
313 """
314 ll = self.loglikelihood(params, signal, data, error, *args, **kwargs)
315
316 return -ll
317
318 def rms_error(self, params, signal, data, error, *args, **kwargs):
319 """Calculate RMS error between model and data.
320
321 Parameters
322 ----------
323 params : `lmfit.Parameters`
324 Object containing the model parameters.
325 signal : `np.ndarray`, (nMeasurements)
326 Array of image means.
327 data : `np.ndarray`, (nMeasurements, nCols)
328 Array of overscan column means from each measurement.
329 error : `float`
330 Fixed error value.
331 *args :
332 Additional position arguments.
333 **kwargs :
334 Additional keyword arguments.
335
336 Returns
337 -------
338 rms : `float`
339 The rms error between the model and input data.
340 """
341 model_results = self.model_results(params, signal, *args, **kwargs)
342
343 diff = model_results - data
344 rms = np.sqrt(np.mean(np.square(diff)))
345
346 return rms
347
348 def difference(self, params, signal, data, error, *args, **kwargs):
349 """Calculate the flattened difference array between model and data.
350
351 Parameters
352 ----------
353 params : `lmfit.Parameters`
354 Object containing the model parameters.
355 signal : `np.ndarray`, (nMeasurements)
356 Array of image means.
357 data : `np.ndarray`, (nMeasurements, nCols)
358 Array of overscan column means from each measurement.
359 error : `float`
360 Fixed error value.
361 *args :
362 Additional position arguments.
363 **kwargs :
364 Additional keyword arguments.
365
366 Returns
367 -------
368 difference : `np.ndarray`, (nMeasurements*nCols)
369 The rms error between the model and input data.
370 """
371 model_results = self.model_results(params, signal, *args, **kwargs)
372 diff = (model_results-data).flatten()
373
374 return diff
375
376
378 """Simple analytic overscan model."""
379
380 @staticmethod
381 def model_results(params, signal, num_transfers, start=1, stop=10):
382 """Generate a realization of the overscan model, using the specified
383 fit parameters and input signal.
384
385 Parameters
386 ----------
387 params : `lmfit.Parameters`
388 Object containing the model parameters.
389 signal : `np.ndarray`, (nMeasurements)
390 Array of image means.
391 num_transfers : `int`
392 Number of serial transfers that the charge undergoes.
393 start : `int`, optional
394 First overscan column to fit. This number includes the
395 last imaging column, and needs to be adjusted by one when
396 using the overscan bounding box.
397 stop : `int`, optional
398 Last overscan column to fit. This number includes the
399 last imaging column, and needs to be adjusted by one when
400 using the overscan bounding box.
401
402 Returns
403 -------
404 res : `np.ndarray`, (nMeasurements, nCols)
405 Model results.
406 """
407 v = params.valuesdict()
408 v['cti'] = 10**v['ctiexp']
409
410 # Adjust column numbering to match DM overscan bbox.
411 start += 1
412 stop += 1
413
414 x = np.arange(start, stop+1)
415 res = np.zeros((signal.shape[0], x.shape[0]))
416
417 for i, s in enumerate(signal):
418 # This is largely equivalent to equation 2. The minimum
419 # indicates that a trap cannot emit more charge than is
420 # available, nor can it emit more charge than it can hold.
421 # This scales the exponential release of charge from the
422 # trap. The next term defines the contribution from the
423 # global CTI at each pixel transfer, and the final term
424 # includes the contribution from local CTI effects.
425 res[i, :] = (np.minimum(v['trapsize'], s*v['scaling'])
426 * (np.exp(1/v['emissiontime']) - 1.0)
427 * np.exp(-x/v['emissiontime'])
428 + s*num_transfers*v['cti']**x
429 + v['driftscale']*s*np.exp(-x/float(v['decaytime'])))
430
431 return res
432
433
435 """Simulated overscan model."""
436
437 @staticmethod
438 def model_results(params, signal, num_transfers, amp, start=1, stop=10, trap_type=None):
439 """Generate a realization of the overscan model, using the specified
440 fit parameters and input signal.
441
442 Parameters
443 ----------
444 params : `lmfit.Parameters`
445 Object containing the model parameters.
446 signal : `np.ndarray`, (nMeasurements)
447 Array of image means.
448 num_transfers : `int`
449 Number of serial transfers that the charge undergoes.
450 amp : `lsst.afw.cameraGeom.Amplifier`
451 Amplifier to use for geometry information.
452 start : `int`, optional
453 First overscan column to fit. This number includes the
454 last imaging column, and needs to be adjusted by one when
455 using the overscan bounding box.
456 stop : `int`, optional
457 Last overscan column to fit. This number includes the
458 last imaging column, and needs to be adjusted by one when
459 using the overscan bounding box.
460 trap_type : `str`, optional
461 Type of trap model to use.
462
463 Returns
464 -------
465 results : `np.ndarray`, (nMeasurements, nCols)
466 Model results.
467 """
468 v = params.valuesdict()
469
470 # Adjust column numbering to match DM overscan bbox.
471 start += 1
472 stop += 1
473
474 # Electronics effect optimization
475 output_amplifier = FloatingOutputAmplifier(1.0, v['driftscale'], v['decaytime'])
476
477 # CTI optimization
478 v['cti'] = 10**v['ctiexp']
479
480 # Trap type for optimization
481 if trap_type is None:
482 trap = None
483 elif trap_type == 'linear':
484 trap = SerialTrap(v['trapsize'], v['emissiontime'], 1, 'linear',
485 [v['scaling']])
486 elif trap_type == 'logistic':
487 trap = SerialTrap(v['trapsize'], v['emissiontime'], 1, 'logistic',
488 [v['f0'], v['k']])
489 else:
490 raise ValueError('Trap type must be linear or logistic or None')
491
492 # Simulate ramp readout
493 imarr = np.zeros((signal.shape[0], amp.getRawDataBBox().getWidth()))
494 ramp = SegmentSimulator(imarr, amp.getRawSerialPrescanBBox().getWidth(), output_amplifier,
495 cti=v['cti'], traps=trap)
496 ramp.ramp_exp(signal)
497 model_results = ramp.readout(serial_overscan_width=amp.getRawSerialOverscanBBox().getWidth(),
498 parallel_overscan_width=0)
499
500 ncols = amp.getRawSerialPrescanBBox().getWidth() + amp.getRawDataBBox().getWidth()
501
502 return model_results[:, ncols+start-1:ncols+stop]
503
504
506 """Controls the creation of simulated segment images.
507
508 Parameters
509 ----------
510 imarr : `np.ndarray` (nx, ny)
511 Image data array.
512 prescan_width : `int`
513 Number of serial prescan columns.
514 output_amplifier : `lsst.cp.pipe.FloatingOutputAmplifier`
515 An object holding some deferred charge parameters.
516 cti : `float`
517 Global CTI value.
518 traps : `list` [`lsst.ip.isr.SerialTrap`]
519 Serial traps to simulate.
520 """
521
522 def __init__(self, imarr, prescan_width, output_amplifier, cti=0.0, traps=None):
523 # Image array geometry
524 self.prescan_width = prescan_width
525 self.ny, self.nx = imarr.shape
526
527 self.segarr = np.zeros((self.ny, self.nx+prescan_width))
528 self.segarr[:, prescan_width:] = imarr
529
530 # Serial readout information
531 self.output_amplifier = output_amplifier
532 if isinstance(cti, np.ndarray):
533 raise ValueError("cti must be single value, not an array.")
534 self.cti = cti
535
536 self.serial_traps = None
537 self.do_trapping = False
538 if traps is not None:
539 if not isinstance(traps, list):
540 traps = [traps]
541 for trap in traps:
542 self.add_trap(trap)
543
544 def add_trap(self, serial_trap):
545 """Add a trap to the serial register.
546
547 Parameters
548 ----------
549 serial_trap : `lsst.ip.isr.SerialTrap`
550 The trap to add.
551 """
552 try:
553 self.serial_traps.append(serial_trap)
554 except AttributeError:
555 self.serial_traps = [serial_trap]
556 self.do_trapping = True
557
558 def ramp_exp(self, signal_list):
559 """Simulate an image with varying flux illumination per row.
560
561 This method simulates a segment image where the signal level
562 increases along the horizontal direction, according to the
563 provided list of signal levels.
564
565 Parameters
566 ----------
567 signal_list : `list` [`float`]
568 List of signal levels.
569
570 Raises
571 ------
572 ValueError
573 Raised if the length of the signal list does not equal the
574 number of rows.
575 """
576 if len(signal_list) != self.ny:
577 raise ValueError("Signal list does not match row count.")
578
579 ramp = np.tile(signal_list, (self.nx, 1)).T
580 self.segarr[:, self.prescan_width:] += ramp
581
582 def readout(self, serial_overscan_width=10, parallel_overscan_width=0):
583 """Simulate serial readout of the segment image.
584
585 This method performs the serial readout of a segment image
586 given the appropriate SerialRegister object and the properties
587 of the ReadoutAmplifier. Additional arguments can be provided
588 to account for the number of desired overscan transfers. The
589 result is a simulated final segment image, in ADU.
590
591 Parameters
592 ----------
593 serial_overscan_width : `int`, optional
594 Number of serial overscan columns.
595 parallel_overscan_width : `int`, optional
596 Number of parallel overscan rows.
597
598 Returns
599 -------
600 result : `np.ndarray` (nx, ny)
601 Simulated image, including serial prescan, serial
602 overscan, and parallel overscan regions. Result in electrons.
603 """
604 # Create output array
605 iy = int(self.ny + parallel_overscan_width)
606 ix = int(self.nx + self.prescan_width + serial_overscan_width)
607
608 image = np.random.default_rng().normal(
609 loc=self.output_amplifier.global_offset,
610 scale=self.output_amplifier.noise,
611 size=(iy, ix),
612 )
613
614 free_charge = copy.deepcopy(self.segarr)
615
616 # Set flow control parameters
617 do_trapping = self.do_trapping
618 cti = self.cti
619
620 offset = np.zeros(self.ny)
621 cte = 1 - cti
622 if do_trapping:
623 for trap in self.serial_traps:
624 trap.initialize(self.ny, self.nx, self.prescan_width)
625
626 for i in range(ix):
627 # Trap capture
628 if do_trapping:
629 for trap in self.serial_traps:
630 captured_charge = trap.trap_charge(free_charge)
631 free_charge -= captured_charge
632
633 # Pixel-to-pixel proportional loss
634 transferred_charge = free_charge*cte
635 deferred_charge = free_charge*cti
636
637 # Pixel transfer and readout
638 offset = self.output_amplifier.local_offset(offset,
639 transferred_charge[:, 0])
640 image[:iy-parallel_overscan_width, i] += transferred_charge[:, 0] + offset
641
642 free_charge = np.pad(transferred_charge, ((0, 0), (0, 1)),
643 mode='constant')[:, 1:] + deferred_charge
644
645 # Trap emission
646 if do_trapping:
647 for trap in self.serial_traps:
648 released_charge = trap.release_charge()
649 free_charge += released_charge
650
651 return image
652
653
655 """Object representing the readout amplifier of a single channel.
656
657 Parameters
658 ----------
659 gain : `float`
660 Gain of the amplifier. Currently not used.
661 scale : `float`
662 Drift scale for the amplifier.
663 decay_time : `float`
664 Decay time for the bias drift.
665 noise : `float`, optional
666 Amplifier read noise.
667 offset : `float`, optional
668 Global CTI offset.
669 """
670
671 def __init__(self, gain, scale, decay_time, noise=0.0, offset=0.0):
672
673 self.gain = gain
674 self.noise = noise
675 self.global_offset = offset
676
677 self.update_parameters(scale, decay_time)
678
679 def local_offset(self, old, signal):
680 """Calculate local offset hysteresis.
681
682 Parameters
683 ----------
684 old : `np.ndarray`, (,)
685 Previous iteration.
686 signal : `np.ndarray`, (,)
687 Current column measurements.
688 Returns
689 -------
690 offset : `np.ndarray`
691 Local offset.
692 """
693 new = self.scale*signal
694
695 return np.maximum(new, old*np.exp(-1/self.decay_time))
696
697 def update_parameters(self, scale, decay_time):
698 """Update parameter values, if within acceptable values.
699
700 Parameters
701 ----------
702 scale : `float`
703 Drift scale for the amplifier.
704 decay_time : `float`
705 Decay time for the bias drift.
706
707 Raises
708 ------
709 ValueError
710 Raised if the input parameters are out of range.
711 """
712 if scale < 0.0:
713 raise ValueError("Scale must be greater than or equal to 0.")
714 if np.isnan(scale):
715 raise ValueError("Scale must be real-valued number, not NaN.")
716 self.scale = scale
717 if decay_time <= 0.0:
718 raise ValueError("Decay time must be greater than 0.")
719 if np.isnan(decay_time):
720 raise ValueError("Decay time must be real-valued number, not NaN.")
721 self.decay_time = decay_time
722
723
725 r"""Calibration containing deferred charge/CTI parameters.
726
727 This includes, parameters from Snyder+2021 and exstimates of
728 the serial and parallel CTI using the extended pixel edge
729 response (EPER) method (also defined in Snyder+2021).
730
731 Parameters
732 ----------
733 **kwargs :
734 Additional parameters to pass to parent constructor.
735
736 Notes
737 -----
738 The charge transfer inefficiency attributes stored are:
739
740 driftScale : `dict` [`str`, `float`]
741 A dictionary, keyed by amplifier name, of the local electronic
742 offset drift scale parameter, A_L in Snyder+2021.
743 decayTime : `dict` [`str`, `float`]
744 A dictionary, keyed by amplifier name, of the local electronic
745 offset decay time, \tau_L in Snyder+2021.
746 globalCti : `dict` [`str`, `float`]
747 A dictionary, keyed by amplifier name, of the mean global CTI
748 paramter, b in Snyder+2021.
749 serialTraps : `dict` [`str`, `lsst.ip.isr.SerialTrap`]
750 A dictionary, keyed by amplifier name, containing a single
751 serial trap for each amplifier.
752 signals : `dict` [`str`, `np.ndarray`]
753 A dictionary, keyed by amplifier name, of the mean signal
754 level for each input measurement.
755 serialEper : `dict` [`str`, `np.ndarray`, `float`]
756 A dictionary, keyed by amplifier name, of the serial EPER
757 estimator of serial CTI, given in a list for each input
758 measurement.
759 parallelEper : `dict` [`str`, `np.ndarray`, `float`]
760 A dictionary, keyed by amplifier name, of the parallel
761 EPER estimator of parallel CTI, given in a list for each
762 input measurement.
763 serialCtiTurnoff : `dict` [`str`, `float`]
764 A dictionary, keyed by amplifier name, of the serial CTI
765 turnoff (unit: electrons).
766 parallelCtiTurnoff : `dict` [`str`, `float`]
767 A dictionary, keyed by amplifier name, of the parallel CTI
768 turnoff (unit: electrons).
769 serialCtiTurnoffSamplingErr : `dict` [`str`, `float`]
770 A dictionary, keyed by amplifier name, of the serial CTI
771 turnoff sampling error (unit: electrons).
772 parallelCtiTurnoffSamplingErr : `dict` [`str`, `float`]
773 A dictionary, keyed by amplifier name, of the parallel CTI
774 turnoff sampling error (unit: electrons).
775
776 Also, the values contained in this calibration are all derived
777 from and image and overscan in units of electron as these are
778 the most natural units in which to compute deferred charge.
779 However, this means the the user should supply a reliable set
780 of gains when computing the CTI statistics during ISR.
781
782 Version 1.1 deprecates the USEGAINS attribute and standardizes
783 everything to electron units.
784 Version 1.2 adds the signal, serialEper, parallelEper,
785 serialCtiTurnoff, parallelCtiTurnoff,
786 serialCtiTurnoffSamplingErr, parallelCtiTurnoffSamplingErr
787 attributes.
788 """
789 _OBSTYPE = 'CTI'
790 _SCHEMA = 'Deferred Charge'
791 _VERSION = 1.2
792
793 def __init__(self, **kwargs):
794 self.driftScale = {}
795 self.decayTime = {}
796 self.globalCti = {}
797 self.serialTraps = {}
798 self.signals = {}
799 self.serialEper = {}
800 self.parallelEper = {}
805
806 # Check for deprecated kwargs
807 if kwargs.pop("useGains", None) is not None:
808 warnings.warn("useGains is deprecated, and will be removed "
809 "after v28.", FutureWarning)
810
811 super().__init__(**kwargs)
812
813 # Units are always in electron.
814 self.updateMetadata(UNITS='electron')
815
816 self.requiredAttributes.update(['driftScale', 'decayTime', 'globalCti', 'serialTraps',
817 'signals', 'serialEper', 'parallelEper', 'serialCtiTurnoff',
818 'parallelCtiTurnoff', 'serialCtiTurnoffSamplingErr',
819 'parallelCtiTurnoffSamplingErr'])
820
821 def fromDetector(self, detector):
822 """Read metadata parameters from a detector.
823
824 Parameters
825 ----------
826 detector : `lsst.afw.cameraGeom.detector`
827 Input detector with parameters to use.
828
829 Returns
830 -------
831 calib : `lsst.ip.isr.Linearizer`
832 The calibration constructed from the detector.
833 """
834
835 pass
836
837 @classmethod
838 def fromDict(cls, dictionary):
839 """Construct a calibration from a dictionary of properties.
840
841 Parameters
842 ----------
843 dictionary : `dict`
844 Dictionary of properties.
845
846 Returns
847 -------
848 calib : `lsst.ip.isr.CalibType`
849 Constructed calibration.
850
851 Raises
852 ------
853 RuntimeError
854 Raised if the supplied dictionary is for a different
855 calibration.
856 """
857 calib = cls()
858
859 if calib._OBSTYPE != dictionary['metadata']['OBSTYPE']:
860 raise RuntimeError(f"Incorrect CTI supplied. Expected {calib._OBSTYPE}, "
861 f"found {dictionary['metadata']['OBSTYPE']}")
862
863 calib.setMetadata(dictionary['metadata'])
864
865 calib.driftScale = dictionary['driftScale']
866 calib.decayTime = dictionary['decayTime']
867 calib.globalCti = dictionary['globalCti']
868 calib.serialCtiTurnoff = dictionary['serialCtiTurnoff']
869 calib.parallelCtiTurnoff = dictionary['parallelCtiTurnoff']
870 calib.serialCtiTurnoffSamplingErr = dictionary['serialCtiTurnoffSamplingErr']
871 calib.parallelCtiTurnoffSamplingErr = dictionary['parallelCtiTurnoffSamplingErr']
872
873 allAmpNames = dictionary['driftScale'].keys()
874
875 # Some amps might not have a serial trap solution, so
876 # dictionary['serialTraps'].keys() might not be equal
877 # to dictionary['driftScale'].keys()
878 for ampName in dictionary['serialTraps']:
879 ampTraps = dictionary['serialTraps'][ampName]
880 calib.serialTraps[ampName] = SerialTrap(ampTraps['size'], ampTraps['emissionTime'],
881 ampTraps['pixel'], ampTraps['trap_type'],
882 ampTraps['coeffs'])
883
884 for ampName in allAmpNames:
885 calib.signals[ampName] = np.array(dictionary['signals'][ampName], dtype=np.float64)
886 calib.serialEper[ampName] = np.array(dictionary['serialEper'][ampName], dtype=np.float64)
887 calib.parallelEper[ampName] = np.array(dictionary['parallelEper'][ampName], dtype=np.float64)
888
889 calib.updateMetadata()
890 return calib
891
892 def toDict(self):
893 """Return a dictionary containing the calibration properties.
894 The dictionary should be able to be round-tripped through
895 ``fromDict``.
896
897 Returns
898 -------
899 dictionary : `dict`
900 Dictionary of properties.
901 """
902 self.updateMetadata()
903 outDict = {}
904 outDict['metadata'] = self.getMetadata()
905
906 outDict['driftScale'] = self.driftScale
907 outDict['decayTime'] = self.decayTime
908 outDict['globalCti'] = self.globalCti
909 outDict['signals'] = self.signals
910 outDict['serialEper'] = self.serialEper
911 outDict['parallelEper'] = self.parallelEper
912 outDict['serialCtiTurnoff'] = self.serialCtiTurnoff
913 outDict['parallelCtiTurnoff'] = self.parallelCtiTurnoff
914 outDict['serialCtiTurnoffSamplingErr'] = self.serialCtiTurnoffSamplingErr
915 outDict['parallelCtiTurnoffSamplingErr'] = self.parallelCtiTurnoffSamplingErr
916
917 outDict['serialTraps'] = {}
918 for ampName in self.serialTraps:
919 ampTrap = {'size': self.serialTraps[ampName].size,
920 'emissionTime': self.serialTraps[ampName].emission_time,
921 'pixel': self.serialTraps[ampName].pixel,
922 'trap_type': self.serialTraps[ampName].trap_type,
923 'coeffs': self.serialTraps[ampName].coeffs}
924 outDict['serialTraps'][ampName] = ampTrap
925
926 return outDict
927
928 @classmethod
929 def fromTable(cls, tableList):
930 """Construct calibration from a list of tables.
931
932 This method uses the ``fromDict`` method to create the
933 calibration, after constructing an appropriate dictionary from
934 the input tables.
935
936 Parameters
937 ----------
938 tableList : `list` [`lsst.afw.table.Table`]
939 List of tables to use to construct the CTI
940 calibration. Two tables are expected in this list, the
941 first containing the per-amplifier CTI parameters, and the
942 second containing the parameters for serial traps.
943
944 Returns
945 -------
946 calib : `lsst.ip.isr.DeferredChargeCalib`
947 The calibration defined in the tables.
948
949 Raises
950 ------
951 ValueError
952 Raised if the trap type or trap coefficients are not
953 defined properly.
954 """
955 ampTable = tableList[0]
956
957 inDict = {}
958 inDict['metadata'] = ampTable.meta
959 calibVersion = inDict['metadata']['CTI_VERSION']
960
961 amps = ampTable['AMPLIFIER']
962 driftScale = ampTable['DRIFT_SCALE']
963 decayTime = ampTable['DECAY_TIME']
964 globalCti = ampTable['GLOBAL_CTI']
965
966 inDict['driftScale'] = {amp: value for amp, value in zip(amps, driftScale)}
967 inDict['decayTime'] = {amp: value for amp, value in zip(amps, decayTime)}
968 inDict['globalCti'] = {amp: value for amp, value in zip(amps, globalCti)}
969
970 # Version check
971 if calibVersion < 1.1:
972 # This version might be in the wrong units (not electron),
973 # and does not contain the gain information to convert
974 # into a new calibration version.
975 raise RuntimeError(f"Using old version of CTI calibration (ver. {calibVersion} < 1.1), "
976 "which is no longer supported.")
977 elif calibVersion < 1.2:
978 inDict['signals'] = {amp: np.array([np.nan]) for amp in amps}
979 inDict['serialEper'] = {amp: np.array([np.nan]) for amp in amps}
980 inDict['parallelEper'] = {amp: np.array([np.nan]) for amp in amps}
981 inDict['serialCtiTurnoff'] = {amp: np.nan for amp in amps}
982 inDict['parallelCtiTurnoff'] = {amp: np.nan for amp in amps}
983 inDict['serialCtiTurnoffSamplingErr'] = {amp: np.nan for amp in amps}
984 inDict['parallelCtiTurnoffSamplingErr'] = {amp: np.nan for amp in amps}
985 else:
986 signals = ampTable['SIGNALS']
987 serialEper = ampTable['SERIAL_EPER']
988 parallelEper = ampTable['PARALLEL_EPER']
989 serialCtiTurnoff = ampTable['SERIAL_CTI_TURNOFF']
990 parallelCtiTurnoff = ampTable['PARALLEL_CTI_TURNOFF']
991 serialCtiTurnoffSamplingErr = ampTable['SERIAL_CTI_TURNOFF_SAMPLING_ERR']
992 parallelCtiTurnoffSamplingErr = ampTable['PARALLEL_CTI_TURNOFF_SAMPLING_ERR']
993 inDict['signals'] = {amp: value for amp, value in zip(amps, signals)}
994 inDict['serialEper'] = {amp: value for amp, value in zip(amps, serialEper)}
995 inDict['parallelEper'] = {amp: value for amp, value in zip(amps, parallelEper)}
996 inDict['serialCtiTurnoff'] = {amp: value for amp, value in zip(amps, serialCtiTurnoff)}
997 inDict['parallelCtiTurnoff'] = {amp: value for amp, value in zip(amps, parallelCtiTurnoff)}
998 inDict['serialCtiTurnoffSamplingErr'] = {
999 amp: value for amp, value in zip(amps, serialCtiTurnoffSamplingErr)
1000 }
1001 inDict['parallelCtiTurnoffSamplingErr'] = {
1002 amp: value for amp, value in zip(amps, parallelCtiTurnoffSamplingErr)
1003 }
1004
1005 inDict['serialTraps'] = {}
1006 trapTable = tableList[1]
1007
1008 amps = trapTable['AMPLIFIER']
1009 sizes = trapTable['SIZE']
1010 emissionTimes = trapTable['EMISSION_TIME']
1011 pixels = trapTable['PIXEL']
1012 trap_type = trapTable['TYPE']
1013 coeffs = trapTable['COEFFS']
1014
1015 for index, amp in enumerate(amps):
1016 ampTrap = {}
1017 ampTrap['size'] = sizes[index]
1018 ampTrap['emissionTime'] = emissionTimes[index]
1019 ampTrap['pixel'] = pixels[index]
1020 ampTrap['trap_type'] = trap_type[index]
1021
1022 # Unpad any trailing NaN values: find the continuous array
1023 # of NaNs at the end of the coefficients, and remove them.
1024 inCoeffs = coeffs[index]
1025 breakIndex = 1
1026 nanValues = np.where(np.isnan(inCoeffs))[0]
1027 if nanValues is not None:
1028 coeffLength = len(inCoeffs)
1029 while breakIndex < coeffLength:
1030 if coeffLength - breakIndex in nanValues:
1031 breakIndex += 1
1032 else:
1033 break
1034 breakIndex -= 1 # Remove the fixed offset.
1035 if breakIndex != 0:
1036 outCoeffs = inCoeffs[0: coeffLength - breakIndex]
1037 else:
1038 outCoeffs = inCoeffs
1039 ampTrap['coeffs'] = outCoeffs.tolist()
1040
1041 if ampTrap['trap_type'] == 'linear':
1042 if len(ampTrap['coeffs']) < 1:
1043 raise ValueError("CTI Amplifier %s coefficients for trap has illegal length %d.",
1044 amp, len(ampTrap['coeffs']))
1045 elif ampTrap['trap_type'] == 'logistic':
1046 if len(ampTrap['coeffs']) < 2:
1047 raise ValueError("CTI Amplifier %s coefficients for trap has illegal length %d.",
1048 amp, len(ampTrap['coeffs']))
1049 elif ampTrap['trap_type'] == 'spline':
1050 if len(ampTrap['coeffs']) % 2 != 0:
1051 raise ValueError("CTI Amplifier %s coefficients for trap has illegal length %d.",
1052 amp, len(ampTrap['coeffs']))
1053 else:
1054 raise ValueError('Unknown trap type: %s', ampTrap['trap_type'])
1055
1056 inDict['serialTraps'][amp] = ampTrap
1057
1058 return cls.fromDict(inDict)
1059
1060 def toTable(self):
1061 """Construct a list of tables containing the information in this
1062 calibration.
1063
1064 The list of tables should create an identical calibration
1065 after being passed to this class's fromTable method.
1066
1067 Returns
1068 -------
1069 tableList : `list` [`lsst.afw.table.Table`]
1070 List of tables containing the crosstalk calibration
1071 information. Two tables are generated for this list, the
1072 first containing the per-amplifier CTI parameters, and the
1073 second containing the parameters for serial traps.
1074 """
1075 tableList = []
1076 self.updateMetadata()
1077
1078 ampList = []
1079 driftScale = []
1080 decayTime = []
1081 globalCti = []
1082 signals = []
1083 serialEper = []
1084 parallelEper = []
1085 serialCtiTurnoff = []
1086 parallelCtiTurnoff = []
1087 serialCtiTurnoffSamplingErr = []
1088 parallelCtiTurnoffSamplingErr = []
1089
1090 for amp in self.driftScale.keys():
1091 ampList.append(amp)
1092 driftScale.append(self.driftScale[amp])
1093 decayTime.append(self.decayTime[amp])
1094 globalCti.append(self.globalCti[amp])
1095 signals.append(self.signals[amp])
1096 serialEper.append(self.serialEper[amp])
1097 parallelEper.append(self.parallelEper[amp])
1098 serialCtiTurnoff.append(self.serialCtiTurnoff[amp])
1099 parallelCtiTurnoff.append(self.parallelCtiTurnoff[amp])
1100 serialCtiTurnoffSamplingErr.append(
1102 )
1103 parallelCtiTurnoffSamplingErr.append(
1105 )
1106
1107 ampTable = Table({
1108 'AMPLIFIER': ampList,
1109 'DRIFT_SCALE': driftScale,
1110 'DECAY_TIME': decayTime,
1111 'GLOBAL_CTI': globalCti,
1112 'SIGNALS': signals,
1113 'SERIAL_EPER': serialEper,
1114 'PARALLEL_EPER': parallelEper,
1115 'SERIAL_CTI_TURNOFF': serialCtiTurnoff,
1116 'PARALLEL_CTI_TURNOFF': parallelCtiTurnoff,
1117 'SERIAL_CTI_TURNOFF_SAMPLING_ERR': serialCtiTurnoffSamplingErr,
1118 'PARALLEL_CTI_TURNOFF_SAMPLING_ERR': parallelCtiTurnoffSamplingErr,
1119 })
1120
1121 ampTable.meta = self.getMetadata().toDict()
1122 tableList.append(ampTable)
1123
1124 ampList = []
1125 sizeList = []
1126 timeList = []
1127 pixelList = []
1128 typeList = []
1129 coeffList = []
1130
1131 # Get maximum coeff length
1132 maxCoeffLength = 0
1133 for trap in self.serialTraps.values():
1134 maxCoeffLength = np.maximum(maxCoeffLength, len(trap.coeffs))
1135
1136 # Pack and pad the end of the coefficients with NaN values.
1137 for amp, trap in self.serialTraps.items():
1138 ampList.append(amp)
1139 sizeList.append(trap.size)
1140 timeList.append(trap.emission_time)
1141 pixelList.append(trap.pixel)
1142 typeList.append(trap.trap_type)
1143
1144 coeffs = trap.coeffs
1145 if len(coeffs) != maxCoeffLength:
1146 coeffs = np.pad(coeffs, (0, maxCoeffLength - len(coeffs)),
1147 constant_values=np.nan).tolist()
1148 coeffList.append(coeffs)
1149
1150 trapTable = Table({'AMPLIFIER': ampList,
1151 'SIZE': sizeList,
1152 'EMISSION_TIME': timeList,
1153 'PIXEL': pixelList,
1154 'TYPE': typeList,
1155 'COEFFS': coeffList})
1156
1157 tableList.append(trapTable)
1158
1159 return tableList
1160
1161
1163 """Settings for deferred charge correction.
1164 """
1165 nPixelOffsetCorrection = Field(
1166 dtype=int,
1167 doc="Number of prior pixels to use for local offset correction.",
1168 default=15,
1169 )
1170 nPixelTrapCorrection = Field(
1171 dtype=int,
1172 doc="Number of prior pixels to use for trap correction.",
1173 default=6,
1174 )
1175 useGains = Field(
1176 dtype=bool,
1177 doc="If true, scale by the gain.",
1178 default=False,
1179 # TODO: DM-46721
1180 deprecated="This field is no longer used. Will be removed after v28.",
1181 )
1182 zeroUnusedPixels = Field(
1183 dtype=bool,
1184 doc="If true, set serial prescan and parallel overscan to zero before correction.",
1185 default=True,
1186 )
1187
1188
1190 """Task to correct an exposure for charge transfer inefficiency.
1191
1192 This uses the methods described by Snyder et al. 2021, Journal of
1193 Astronimcal Telescopes, Instruments, and Systems, 7,
1194 048002. doi:10.1117/1.JATIS.7.4.048002 (Snyder+21).
1195 """
1196 ConfigClass = DeferredChargeConfig
1197 _DefaultName = 'isrDeferredCharge'
1198
1199 def run(self, exposure, ctiCalib, gains=None):
1200 """Correct deferred charge/CTI issues.
1201
1202 Parameters
1203 ----------
1204 exposure : `lsst.afw.image.Exposure`
1205 Exposure to correct the deferred charge on.
1206 ctiCalib : `lsst.ip.isr.DeferredChargeCalib`
1207 Calibration object containing the charge transfer
1208 inefficiency model.
1209 gains : `dict` [`str`, `float`]
1210 A dictionary, keyed by amplifier name, of the gains to
1211 use. If gains is None, the nominal gains in the amplifier
1212 object are used.
1213
1214 Returns
1215 -------
1216 exposure : `lsst.afw.image.Exposure`
1217 The corrected exposure.
1218
1219 Notes
1220 -------
1221 This task will read the exposure metadata and determine if
1222 applying gains if necessary. The correction takes place in
1223 units of electrons. If bootstrapping, the gains used
1224 will just be 1.0. and the input/output units will stay in
1225 adu. If the input image is in adu, the output image will be
1226 in units of electrons. If the input image is in electron,
1227 the output image will be in electron.
1228 """
1229 image = exposure.getMaskedImage().image
1230 detector = exposure.getDetector()
1231
1232 # Get the image and overscan units.
1233 imageUnits = exposure.getMetadata().get("LSST ISR UNITS")
1234
1235 # The deferred charge correction assumes that everything is in
1236 # electron units. Make it so:
1237 applyGains = False
1238 if imageUnits == "adu":
1239 applyGains = True
1240
1241 # If we need to convert the image to electrons, check that gains
1242 # were supplied. CTI should not be solved or corrected without
1243 # supplied gains.
1244 if applyGains and gains is None:
1245 raise RuntimeError("No gains supplied for deferred charge correction.")
1246
1247 with gainContext(exposure, image, apply=applyGains, gains=gains, isTrimmed=False):
1248 # Both the image and the overscan are in electron units.
1249 for amp in detector.getAmplifiers():
1250 ampName = amp.getName()
1251
1252 ampImage = image[amp.getRawBBox()]
1253 if self.config.zeroUnusedPixels:
1254 # We don't apply overscan subtraction, so zero these
1255 # out for now.
1256 ampImage[amp.getRawParallelOverscanBBox()].array[:, :] = 0.0
1257 ampImage[amp.getRawSerialPrescanBBox()].array[:, :] = 0.0
1258
1259 # The algorithm expects that the readout corner is in
1260 # the lower left corner. Flip it to be so:
1261 ampData = self.flipData(ampImage.array, amp)
1262
1263 if ctiCalib.driftScale[ampName] > 0.0:
1264 correctedAmpData = self.local_offset_inverse(ampData,
1265 ctiCalib.driftScale[ampName],
1266 ctiCalib.decayTime[ampName],
1267 self.config.nPixelOffsetCorrection)
1268 else:
1269 correctedAmpData = ampData.copy()
1270
1271 correctedAmpData = self.local_trap_inverse(correctedAmpData,
1272 ctiCalib.serialTraps[ampName],
1273 ctiCalib.globalCti[ampName],
1274 self.config.nPixelTrapCorrection)
1275
1276 # Undo flips here. The method is symmetric.
1277 correctedAmpData = self.flipData(correctedAmpData, amp)
1278 image[amp.getRawBBox()].array[:, :] = correctedAmpData[:, :]
1279
1280 return exposure
1281
1282 @staticmethod
1283 def flipData(ampData, amp):
1284 """Flip data array such that readout corner is at lower-left.
1285
1286 Parameters
1287 ----------
1288 ampData : `numpy.ndarray`, (nx, ny)
1289 Image data to flip.
1290 amp : `lsst.afw.cameraGeom.Amplifier`
1291 Amplifier to get readout corner information.
1292
1293 Returns
1294 -------
1295 ampData : `numpy.ndarray`, (nx, ny)
1296 Flipped image data.
1297 """
1298 X_FLIP = {ReadoutCorner.LL: False,
1299 ReadoutCorner.LR: True,
1300 ReadoutCorner.UL: False,
1301 ReadoutCorner.UR: True}
1302 Y_FLIP = {ReadoutCorner.LL: False,
1303 ReadoutCorner.LR: False,
1304 ReadoutCorner.UL: True,
1305 ReadoutCorner.UR: True}
1306
1307 if X_FLIP[amp.getReadoutCorner()]:
1308 ampData = np.fliplr(ampData)
1309 if Y_FLIP[amp.getReadoutCorner()]:
1310 ampData = np.flipud(ampData)
1311
1312 return ampData
1313
1314 @staticmethod
1315 def local_offset_inverse(inputArr, drift_scale, decay_time, num_previous_pixels=15):
1316 r"""Remove CTI effects from local offsets.
1317
1318 This implements equation 10 of Snyder+21. For an image with
1319 CTI, s'(m, n), the correction factor is equal to the maximum
1320 value of the set of:
1321
1322 .. code-block::
1323
1324 {A_L s'(m, n - j) exp(-j t / \tau_L)}_j=0^jmax
1325
1326 Parameters
1327 ----------
1328 inputArr : `numpy.ndarray`, (nx, ny)
1329 Input image data to correct.
1330 drift_scale : `float`
1331 Drift scale (Snyder+21 A_L value) to use in correction.
1332 decay_time : `float`
1333 Decay time (Snyder+21 \tau_L) of the correction.
1334 num_previous_pixels : `int`, optional
1335 Number of previous pixels to use for correction. As the
1336 CTI has an exponential decay, this essentially truncates
1337 the correction where that decay scales the input charge to
1338 near zero.
1339
1340 Returns
1341 -------
1342 outputArr : `numpy.ndarray`, (nx, ny)
1343 Corrected image data.
1344 """
1345 r = np.exp(-1/decay_time)
1346 Ny, Nx = inputArr.shape
1347
1348 # j = 0 term:
1349 offset = np.zeros((num_previous_pixels, Ny, Nx))
1350 offset[0, :, :] = drift_scale*np.maximum(0, inputArr)
1351
1352 # j = 1..jmax terms:
1353 for n in range(1, num_previous_pixels):
1354 offset[n, :, n:] = drift_scale*np.maximum(0, inputArr[:, :-n])*(r**n)
1355
1356 Linv = np.amax(offset, axis=0)
1357 outputArr = inputArr - Linv
1358
1359 return outputArr
1360
1361 @staticmethod
1362 def local_trap_inverse(inputArr, trap, global_cti=0.0, num_previous_pixels=6):
1363 r"""Apply localized trapping inverse operator to pixel signals.
1364
1365 This implements equation 13 of Snyder+21. For an image with
1366 CTI, s'(m, n), the correction factor is equal to the maximum
1367 value of the set of:
1368
1369 .. code-block::
1370
1371 {A_L s'(m, n - j) exp(-j t / \tau_L)}_j=0^jmax
1372
1373 Parameters
1374 ----------
1375 inputArr : `numpy.ndarray`, (nx, ny)
1376 Input image data to correct.
1377 trap : `lsst.ip.isr.SerialTrap`
1378 Serial trap describing the capture and release of charge.
1379 global_cti: `float`
1380 Mean charge transfer inefficiency, b from Snyder+21.
1381 num_previous_pixels : `int`, optional
1382 Number of previous pixels to use for correction.
1383
1384 Returns
1385 -------
1386 outputArr : `numpy.ndarray`, (nx, ny)
1387 Corrected image data.
1388
1389 """
1390 Ny, Nx = inputArr.shape
1391 a = 1 - global_cti
1392 r = np.exp(-1/trap.emission_time)
1393
1394 # Estimate trap occupancies during readout
1395 trap_occupancy = np.zeros((num_previous_pixels, Ny, Nx))
1396 for n in range(num_previous_pixels):
1397 trap_occupancy[n, :, n+1:] = trap.capture(np.maximum(0, inputArr))[:, :-(n+1)]*(r**n)
1398 trap_occupancy = np.amax(trap_occupancy, axis=0)
1399
1400 # Estimate captured charge
1401 C = trap.capture(np.maximum(0, inputArr)) - trap_occupancy*r
1402 C[C < 0] = 0.
1403
1404 # Estimate released charge
1405 R = np.zeros(inputArr.shape)
1406 R[:, 1:] = trap_occupancy[:, 1:]*(1-r)
1407 T = R - C
1408
1409 outputArr = inputArr - a*T
1410
1411 return outputArr
fromDict(cls, dictionary, **kwargs)
Definition calibType.py:575
updateMetadata(self, camera=None, detector=None, filterName=None, setCalibId=False, setCalibInfo=False, setDate=False, **kwargs)
Definition calibType.py:207
local_trap_inverse(inputArr, trap, global_cti=0.0, num_previous_pixels=6)
local_offset_inverse(inputArr, drift_scale, decay_time, num_previous_pixels=15)
run(self, exposure, ctiCalib, gains=None)
__init__(self, gain, scale, decay_time, noise=0.0, offset=0.0)
negative_loglikelihood(self, params, signal, data, error, *args, **kwargs)
loglikelihood(self, params, signal, data, error, *args, **kwargs)
rms_error(self, params, signal, data, error, *args, **kwargs)
difference(self, params, signal, data, error, *args, **kwargs)
model_results(params, signal, num_transfers, start=1, stop=10)
readout(self, serial_overscan_width=10, parallel_overscan_width=0)
__init__(self, imarr, prescan_width, output_amplifier, cti=0.0, traps=None)
__init__(self, size, emission_time, pixel, trap_type, coeffs)
initialize(self, ny, nx, prescan_width)
model_results(params, signal, num_transfers, start=1, stop=10)
model_results(params, signal, num_transfers, amp, start=1, stop=10, trap_type=None)
gainContext(exp, image, apply, gains=None, invert=False, isTrimmed=True)