30 from lsstDebug
import getDebugFrame
33 from .utils
import (funcPolynomial, irlsFit)
36 __all__ = [
"LinearitySolveTask",
"LinearitySolveConfig",
"MeasureLinearityTask"]
40 dimensions=(
"instrument",
"detector")):
43 doc=
"Input PTC dataset.",
44 storageClass=
"StructuredDataDict",
45 dimensions=(
"instrument",
"detector"),
50 doc=
"Camera Geometry definition.",
51 storageClass=
"Camera",
52 dimensions=(
"instrument", ),
54 outputLinearizer = cT.Output(
56 doc=
"Output linearity measurements.",
57 storageClass=
"Linearizer",
58 dimensions=(
"instrument",
"detector"),
64 pipelineConnections=LinearitySolveConnections):
65 """Configuration for solving the linearity from PTC dataset.
67 linearityType = pexConfig.ChoiceField(
69 doc=
"Type of linearizer to construct.",
72 "LookupTable":
"Create a lookup table solution.",
73 "Polynomial":
"Create an arbitrary polynomial solution.",
74 "Squared":
"Create a single order squared solution.",
75 "Spline":
"Create a spline based solution.",
76 "None":
"Create a dummy solution.",
79 polynomialOrder = pexConfig.Field(
81 doc=
"Degree of polynomial to fit.",
84 splineKnots = pexConfig.Field(
86 doc=
"Number of spline knots to use in fit.",
89 maxLookupTableAdu = pexConfig.Field(
91 doc=
"Maximum DN value for a LookupTable linearizer.",
94 maxLinearAdu = pexConfig.Field(
96 doc=
"Maximum DN value to use to estimate linear term.",
99 minLinearAdu = pexConfig.Field(
101 doc=
"Minimum DN value to use to estimate linear term.",
104 nSigmaClipLinear = pexConfig.Field(
106 doc=
"Maximum deviation from linear solution for Poissonian noise.",
112 """Fit the linearity from the PTC dataset.
114 ConfigClass = LinearitySolveConfig
115 _DefaultName =
'cpLinearitySolve'
118 """Ensure that the input and output dimensions are passed along.
122 butlerQC : `lsst.daf.butler.butlerQuantumContext.ButlerQuantumContext`
123 Butler to operate on.
124 inputRefs : `lsst.pipe.base.connections.InputQuantizedConnection`
125 Input data refs to load.
126 ouptutRefs : `lsst.pipe.base.connections.OutputQuantizedConnection`
127 Output data refs to persist.
129 inputs = butlerQC.get(inputRefs)
132 inputs[
'inputDims'] = [exp.dataId.byName()
for exp
in inputRefs.inputPtc]
134 outputs = self.
runrun(**inputs)
135 butlerQC.put(outputs, outputRefs)
137 def run(self, inputPtc, camera, inputDims):
138 """Fit non-linearity to PTC data, returning the correct Linearizer
143 inputPtc : `lsst.cp.pipe.PtcDataset`
144 Pre-measured PTC dataset.
145 camera : `lsst.afw.cameraGeom.Camera`
147 inputDims : `lsst.daf.butler.DataCoordinate` or `dict`
148 DataIds to use to populate the output calibration.
152 results : `lsst.pipe.base.Struct`
153 The results struct containing:
155 ``outputLinearizer`` : `lsst.ip.isr.Linearizer`
156 Final linearizer calibration.
157 ``outputProvenance`` : `lsst.ip.isr.IsrProvenance`
158 Provenance data for the new calibration.
162 This task currently fits only polynomial-defined corrections,
163 where the correction coefficients are defined such that:
164 corrImage = uncorrImage + sum_i c_i uncorrImage^(2 + i)
165 These `c_i` are defined in terms of the direct polynomial fit:
166 meanVector ~ P(x=timeVector) = sum_j k_j x^j
167 such that c_(j-2) = -k_j/(k_1^j) in units of DN^(1-j) (c.f.,
168 Eq. 37 of 2003.05978). The `config.polynomialOrder` or
169 `config.splineKnots` define the maximum order of x^j to fit.
170 As k_0 and k_1 are degenerate with bias level and gain, they
171 are not included in the non-linearity correction.
173 detector = camera[inputDims[
'detector']]
174 if self.config.linearityType ==
'LookupTable':
175 table = np.zeros((len(detector), self.config.maxLookupTableAdu), dtype=np.float32)
181 if self.config.linearityType ==
'Spline':
182 fitOrder = self.config.splineKnots
184 fitOrder = self.config.polynomialOrder
187 linearizer =
Linearizer(detector=detector, table=table, log=self.log)
189 for i, amp
in enumerate(detector):
190 ampName = amp.getName()
191 if (len(inputPtc.expIdMask[ampName]) == 0):
192 self.log.
warn(f
"Mask not found for {ampName} in non-linearity fit. Using all points.")
193 mask = np.repeat(
True, len(inputPtc.expIdMask[ampName]))
195 mask = inputPtc.expIdMask[ampName]
197 inputAbscissa = np.array(inputPtc.rawExpTimes[ampName])[mask]
198 inputOrdinate = np.array(inputPtc.rawMeans[ampName])[mask]
201 fluxMask = inputOrdinate < self.config.maxLinearAdu
202 lowMask = inputOrdinate > self.config.minLinearAdu
203 fluxMask = fluxMask & lowMask
204 linearAbscissa = inputAbscissa[fluxMask]
205 linearOrdinate = inputOrdinate[fluxMask]
207 linearFit, linearFitErr, chiSq, weights =
irlsFit([0.0, 100.0], linearAbscissa,
208 linearOrdinate, funcPolynomial)
210 linearOrdinate = linearFit[0] + linearFit[1] * inputAbscissa
213 threshold = self.config.nSigmaClipLinear * np.sqrt(linearOrdinate)
214 fluxMask = np.abs(inputOrdinate - linearOrdinate) < threshold
215 linearOrdinate = linearOrdinate[fluxMask]
216 fitOrdinate = inputOrdinate[fluxMask]
217 self.
debugFitdebugFit(
'linearFit', inputAbscissa, inputOrdinate, linearOrdinate, fluxMask, ampName)
219 if self.config.linearityType
in [
'Polynomial',
'Squared',
'LookupTable']:
220 polyFit = np.zeros(fitOrder + 1)
222 polyFit, polyFitErr, chiSq, weights =
irlsFit(polyFit, linearOrdinate,
223 fitOrdinate, funcPolynomial)
227 linearityFit = [-coeff/(k1**order)
for order, coeff
in enumerate(polyFit)]
228 significant = np.where(np.abs(linearityFit) > 1e-10,
True,
False)
229 self.log.
info(f
"Significant polynomial fits: {significant}")
232 self.
debugFitdebugFit(
'polyFit', linearAbscissa, fitOrdinate, modelOrdinate,
None, ampName)
234 if self.config.linearityType ==
'Squared':
235 linearityFit = [linearityFit[2]]
236 elif self.config.linearityType ==
'LookupTable':
238 tMax = (self.config.maxLookupTableAdu - polyFit[0])/polyFit[1]
239 timeRange = np.linspace(0, tMax, self.config.maxLookupTableAdu)
240 signalIdeal = polyFit[0] + polyFit[1]*timeRange
242 lookupTableRow = signalIdeal - signalUncorrected
244 linearizer.tableData[tableIndex, :] = lookupTableRow
245 linearityFit = [tableIndex, 0]
247 elif self.config.linearityType
in [
'Spline']:
249 numPerBin, binEdges = np.histogram(linearOrdinate, bins=fitOrder)
250 with np.errstate(invalid=
"ignore"):
255 values = np.histogram(linearOrdinate, bins=fitOrder,
256 weights=(inputOrdinate[fluxMask] - linearOrdinate))[0]/numPerBin
268 binCenters = np.histogram(linearOrdinate, bins=fitOrder,
269 weights=linearOrdinate)[0]/numPerBin
270 values = values[numPerBin > 0]
271 binCenters = binCenters[numPerBin > 0]
273 self.
debugFitdebugFit(
'splineFit', binCenters, np.abs(values), values,
None, ampName)
276 modelOrdinate = linearOrdinate + interp.interpolate(linearOrdinate)
277 self.
debugFitdebugFit(
'splineFit', linearOrdinate, fitOrdinate, modelOrdinate,
None, ampName)
282 if len(binCenters) != fitOrder:
283 padN = fitOrder - len(binCenters)
284 binCenters = np.pad(binCenters, (padN, 0),
'linear_ramp',
285 end_values=(binCenters.min() - 1.0, ))
287 values = np.pad(values, (padN, 0))
290 linearityFit = np.concatenate((binCenters.tolist(), values.tolist())).tolist()
300 linearizer.linearityType[ampName] = self.config.linearityType
301 linearizer.linearityCoeffs[ampName] = np.array(linearityFit)
302 linearizer.linearityBBox[ampName] = amp.getBBox()
303 linearizer.fitParams[ampName] = np.array(polyFit)
304 linearizer.fitParamsErr[ampName] = np.array(polyFitErr)
305 linearizer.fitChiSq[ampName] = chiSq
307 image = afwImage.ImageF(len(inputOrdinate), 1)
308 image.getArray()[:, :] = inputOrdinate
309 linearizeFunction = linearizer.getLinearityTypeByName(linearizer.linearityType[ampName])
310 linearizeFunction()(image,
311 **{
'coeffs': linearizer.linearityCoeffs[ampName],
312 'table': linearizer.tableData,
313 'log': linearizer.log})
314 linearizeModel = image.getArray()[0, :]
316 self.
debugFitdebugFit(
'solution', inputOrdinate[fluxMask], linearOrdinate,
317 linearizeModel[fluxMask],
None, ampName)
319 linearizer.hasLinearity =
True
320 linearizer.validate()
321 linearizer.updateMetadata(camera=camera, detector=detector, filterName=
'NONE')
322 linearizer.updateMetadata(setDate=
True, setCalibId=
True)
325 return pipeBase.Struct(
326 outputLinearizer=linearizer,
327 outputProvenance=provenance,
330 def debugFit(self, stepname, xVector, yVector, yModel, mask, ampName):
331 """Debug method for linearity fitting.
336 A label to use to check if we care to debug at a given
338 xVector : `numpy.array`
339 The values to use as the independent variable in the
341 yVector : `numpy.array`
342 The values to use as the dependent variable in the
344 yModel : `numpy.array`
345 The values to use as the linearized result.
346 mask : `numpy.array` [ `bool` ], optional
347 A mask to indicate which entries of ``xVector`` and
350 Amplifier name to lookup linearity correction values.
355 import matplotlib.pyplot
as plt
356 fig, axs = plt.subplots(2)
359 mask = np.ones_like(xVector, dtype=bool)
361 fig.suptitle(f
"{stepname} {ampName} {self.config.linearityType}")
362 if stepname ==
'linearFit':
363 axs[0].set_xlabel(
"Input Abscissa (time or mondiode)")
364 axs[0].set_ylabel(
"Input Ordinate (flux)")
365 axs[1].set_xlabel(
"Linear Ordinate (linear flux)")
366 axs[1].set_ylabel(
"Flux Difference: (input - linear)")
367 elif stepname
in (
'polyFit',
'splineFit'):
368 axs[0].set_xlabel(
"Linear Abscissa (linear flux)")
369 axs[0].set_ylabel(
"Input Ordinate (flux)")
370 axs[1].set_xlabel(
"Linear Ordinate (linear flux)")
371 axs[1].set_ylabel(
"Flux Difference: (input - full model fit)")
372 elif stepname ==
'solution':
373 axs[0].set_xlabel(
"Input Abscissa (time or mondiode)")
374 axs[0].set_ylabel(
"Linear Ordinate (linear flux)")
375 axs[1].set_xlabel(
"Model flux (linear flux)")
376 axs[1].set_ylabel(
"Flux Difference: (linear - model)")
378 axs[0].set_yscale(
'log')
379 axs[0].set_xscale(
'log')
380 axs[0].scatter(xVector, yVector)
381 axs[0].scatter(xVector[~mask], yVector[~mask], c=
'red', marker=
'x')
382 axs[1].set_xscale(
'log')
384 axs[1].scatter(yModel, yVector[mask] - yModel)
387 prompt =
"Press Enter or c to continue [chpx]..."
389 ans = input(prompt).lower()
390 if ans
in (
"",
" ",
"c",):
396 print(
"[h]elp [c]ontinue [p]db")
403 solver = pexConfig.ConfigurableField(
404 target=LinearitySolveTask,
405 doc=
"Task to convert PTC data to linearity solutions.",
410 """Stand alone Gen2 linearity measurement.
412 This class wraps the Gen3 linearity task to allow it to be run as
415 ConfigClass = MeasureLinearityConfig
416 _DefaultName =
"measureLinearity"
420 self.makeSubtask(
"solver")
423 """Run new linearity code for gen2.
427 dataRef : `lsst.daf.persistence.ButlerDataRef`
428 Input dataref for the photon transfer curve data.
432 results : `lsst.pipe.base.Struct`
433 The results struct containing:
435 ``outputLinearizer`` : `lsst.ip.isr.Linearizer`
436 Final linearizer calibration.
437 ``outputProvenance`` : `lsst.ip.isr.IsrProvenance`
438 Provenance data for the new calibration.
440 ptc = dataRef.get(
'photonTransferCurveDataset')
441 camera = dataRef.get(
'camera')
442 inputDims = dataRef.dataId
443 linearityResults = self.solver.
run(ptc, camera=camera, inputDims=inputDims)
445 inputDims[
'calibDate'] = linearityResults.outputLinearizer.getMetadata().get(
'CALIBDATE')
446 butler = dataRef.getButler()
447 butler.put(linearityResults.outputLinearizer,
"linearizer", inputDims)
448 return linearityResults
def run(self, inputPtc, camera, inputDims)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def debugFit(self, stepname, xVector, yVector, yModel, mask, ampName)
def __init__(self, **kwargs)
def runDataRef(self, dataRef)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
Interpolate::Style stringToInterpStyle(std::string const &style)
Conversion function to switch a string to an Interpolate::Style.
std::shared_ptr< Interpolate > makeInterpolate(std::vector< double > const &x, std::vector< double > const &y, Interpolate::Style const style=Interpolate::AKIMA_SPLINE)
A factory function to make Interpolate objects.
def irlsFit(initialParams, dataX, dataY, function, weightsY=None)
def funcPolynomial(pars, x)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getDebugFrame(debugDisplay, name)