153 def run(self, inputPtc, dummy, camera, inputDims):
154 """Fit non-linearity to PTC data, returning the correct Linearizer
159 inputPtc : `lsst.cp.pipe.PtcDataset`
160 Pre-measured PTC dataset.
161 dummy : `lsst.afw.image.Exposure`
162 The exposure used to select the appropriate PTC dataset.
163 In almost all circumstances, one of the input exposures
164 used to generate the PTC dataset is the best option.
165 camera : `lsst.afw.cameraGeom.Camera`
167 inputDims : `lsst.daf.butler.DataCoordinate` or `dict`
168 DataIds to use to populate the output calibration.
172 results : `lsst.pipe.base.Struct`
173 The results struct containing:
175 ``outputLinearizer`` : `lsst.ip.isr.Linearizer`
176 Final linearizer calibration.
177 ``outputProvenance`` : `lsst.ip.isr.IsrProvenance`
178 Provenance data for the new calibration.
182 This task currently fits only polynomial-defined corrections,
183 where the correction coefficients are defined such that:
184 corrImage = uncorrImage + sum_i c_i uncorrImage^(2 + i)
185 These `c_i` are defined in terms of the direct polynomial fit:
186 meanVector ~ P(x=timeVector) = sum_j k_j x^j
187 such that c_(j-2) = -k_j/(k_1^j) in units of DN^(1-j) (c.f.,
188 Eq. 37 of 2003.05978). The `config.polynomialOrder` or
189 `config.splineKnots` define the maximum order of x^j to fit.
190 As k_0 and k_1 are degenerate with bias level and gain, they
191 are not included in the non-linearity correction.
194 self.log.
warn(
"No dummy exposure found.")
196 detector = camera[inputDims[
'detector']]
197 if self.config.linearityType ==
'LookupTable':
198 table = np.zeros((len(detector), self.config.maxLookupTableAdu), dtype=np.float32)
204 if self.config.linearityType ==
'Spline':
205 fitOrder = self.config.splineKnots
207 fitOrder = self.config.polynomialOrder
210 linearizer = Linearizer(detector=detector, table=table, log=self.log)
212 for i, amp
in enumerate(detector):
213 ampName = amp.getName()
214 if ampName
in inputPtc.badAmps:
217 if self.config.linearityType
in [
'Polynomial']:
218 nEntries = fitOrder + 1
219 pEntries = fitOrder + 1
220 elif self.config.linearityType
in [
'Spline']:
221 nEntries = fitOrder * 2
222 elif self.config.linearityType
in [
'Squared',
'None']:
224 pEntries = fitOrder + 1
225 elif self.config.linearityType
in [
'LookupTable']:
227 pEntries = fitOrder + 1
229 linearizer.linearityType[ampName] =
"None"
230 linearizer.linearityCoeffs[ampName] = np.zeros(nEntries)
231 linearizer.linearityBBox[ampName] = amp.getBBox()
232 linearizer.fitParams[ampName] = np.zeros(pEntries)
233 linearizer.fitParamsErr[ampName] = np.zeros(pEntries)
234 linearizer.fitChiSq[ampName] = np.nan
235 self.log.
warn(
"Amp %s has no usable PTC information. Skipping!", ampName)
238 if (len(inputPtc.expIdMask[ampName]) == 0)
or self.config.ignorePtcMask:
239 self.log.
warn(f
"Mask not found for {ampName} in non-linearity fit. Using all points.")
240 mask = np.repeat(
True, len(inputPtc.expIdMask[ampName]))
242 mask = np.array(inputPtc.expIdMask[ampName], dtype=bool)
244 inputAbscissa = np.array(inputPtc.rawExpTimes[ampName])[mask]
245 inputOrdinate = np.array(inputPtc.rawMeans[ampName])[mask]
248 fluxMask = inputOrdinate < self.config.maxLinearAdu
249 lowMask = inputOrdinate > self.config.minLinearAdu
250 fluxMask = fluxMask & lowMask
251 linearAbscissa = inputAbscissa[fluxMask]
252 linearOrdinate = inputOrdinate[fluxMask]
254 linearFit, linearFitErr, chiSq, weights =
irlsFit([0.0, 100.0], linearAbscissa,
255 linearOrdinate, funcPolynomial)
257 linearOrdinate = linearFit[0] + linearFit[1] * inputAbscissa
260 threshold = self.config.nSigmaClipLinear * np.sqrt(linearOrdinate)
261 fluxMask = np.abs(inputOrdinate - linearOrdinate) < threshold
262 linearOrdinate = linearOrdinate[fluxMask]
263 fitOrdinate = inputOrdinate[fluxMask]
264 self.debugFit(
'linearFit', inputAbscissa, inputOrdinate, linearOrdinate, fluxMask, ampName)
266 if self.config.linearityType
in [
'Polynomial',
'Squared',
'LookupTable']:
267 polyFit = np.zeros(fitOrder + 1)
269 polyFit, polyFitErr, chiSq, weights =
irlsFit(polyFit, linearOrdinate,
270 fitOrdinate, funcPolynomial)
274 linearityFit = [-coeff/(k1**order)
for order, coeff
in enumerate(polyFit)]
275 significant = np.where(np.abs(linearityFit) > 1e-10,
True,
False)
276 self.log.
info(f
"Significant polynomial fits: {significant}")
279 self.debugFit(
'polyFit', linearAbscissa, fitOrdinate, modelOrdinate,
None, ampName)
281 if self.config.linearityType ==
'Squared':
282 linearityFit = [linearityFit[2]]
283 elif self.config.linearityType ==
'LookupTable':
285 tMax = (self.config.maxLookupTableAdu - polyFit[0])/polyFit[1]
286 timeRange = np.linspace(0, tMax, self.config.maxLookupTableAdu)
287 signalIdeal = polyFit[0] + polyFit[1]*timeRange
289 lookupTableRow = signalIdeal - signalUncorrected
291 linearizer.tableData[tableIndex, :] = lookupTableRow
292 linearityFit = [tableIndex, 0]
294 elif self.config.linearityType
in [
'Spline']:
296 numPerBin, binEdges = np.histogram(linearOrdinate, bins=fitOrder)
297 with np.errstate(invalid=
"ignore"):
302 values = np.histogram(linearOrdinate, bins=fitOrder,
303 weights=(inputOrdinate[fluxMask] - linearOrdinate))[0]/numPerBin
315 binCenters = np.histogram(linearOrdinate, bins=fitOrder,
316 weights=linearOrdinate)[0]/numPerBin
317 values = values[numPerBin > 0]
318 binCenters = binCenters[numPerBin > 0]
320 self.debugFit(
'splineFit', binCenters, np.abs(values), values,
None, ampName)
323 modelOrdinate = linearOrdinate + interp.interpolate(linearOrdinate)
324 self.debugFit(
'splineFit', linearOrdinate, fitOrdinate, modelOrdinate,
None, ampName)
329 if len(binCenters) != fitOrder:
330 padN = fitOrder - len(binCenters)
331 binCenters = np.pad(binCenters, (padN, 0),
'linear_ramp',
332 end_values=(binCenters.min() - 1.0, ))
334 values = np.pad(values, (padN, 0))
337 linearityFit = np.concatenate((binCenters.tolist(), values.tolist())).tolist()
347 linearizer.linearityType[ampName] = self.config.linearityType
348 linearizer.linearityCoeffs[ampName] = np.array(linearityFit)
349 linearizer.linearityBBox[ampName] = amp.getBBox()
350 linearizer.fitParams[ampName] = np.array(polyFit)
351 linearizer.fitParamsErr[ampName] = np.array(polyFitErr)
352 linearizer.fitChiSq[ampName] = chiSq
354 image = afwImage.ImageF(len(inputOrdinate), 1)
355 image.getArray()[:, :] = inputOrdinate
356 linearizeFunction = linearizer.getLinearityTypeByName(linearizer.linearityType[ampName])
357 linearizeFunction()(image,
358 **{
'coeffs': linearizer.linearityCoeffs[ampName],
359 'table': linearizer.tableData,
360 'log': linearizer.log})
361 linearizeModel = image.getArray()[0, :]
363 self.debugFit(
'solution', inputOrdinate[fluxMask], linearOrdinate,
364 linearizeModel[fluxMask],
None, ampName)
366 linearizer.hasLinearity =
True
367 linearizer.validate()
368 linearizer.updateMetadata(camera=camera, detector=detector, filterName=
'NONE')
369 linearizer.updateMetadata(setDate=
True, setCalibId=
True)
370 provenance = IsrProvenance(calibType=
'linearizer')
372 return pipeBase.Struct(
373 outputLinearizer=linearizer,
374 outputProvenance=provenance,
Interpolate::Style stringToInterpStyle(std::string const &style)
Conversion function to switch a string to an Interpolate::Style.
std::shared_ptr< Interpolate > makeInterpolate(std::vector< double > const &x, std::vector< double > const &y, Interpolate::Style const style=Interpolate::AKIMA_SPLINE)
A factory function to make Interpolate objects.
def irlsFit(initialParams, dataX, dataY, function, weightsY=None, weightType='Cauchy')
def funcPolynomial(pars, x)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)