23 __all__ = [
"MaskStreaksConfig",
"MaskStreaksTask",
"setDetectionMask"]
33 from skimage.feature
import canny
34 from sklearn.cluster
import KMeans
36 from dataclasses
import dataclass
39 def setDetectionMask(maskedImage, forceSlowBin=False, binning=None, detectedPlane="DETECTED",
40 badMaskPlanes=(
"NO_DATA",
"INTRP",
"BAD",
"SAT",
"EDGE"), detectionThreshold=5):
41 """Make detection mask and set the mask plane
43 Creat a binary image from a masked image by setting all data with signal-to-
44 noise below some threshold to zero, and all data above the threshold to one.
45 If the binning parameter has been set, this procedure will be preceded by a
46 weighted binning of the data in order to smooth the result, after which the
47 result is scaled back to the original dimensions. Set the detection mask
48 plane with this binary image.
52 maskedImage : `lsst.afw.image.maskedImage`
53 Image to be (optionally) binned and converted
54 forceSlowBin : bool (optional)
55 Force usage of slower binning method to check that the two methods
57 binning : int (optional)
58 Number of pixels by which to bin image
59 detectedPlane : str (optional)
60 Name of mask with pixels that were detected above threshold in image
61 badMaskPlanes : set (optional)
62 Names of masks with pixels that are rejected
63 detectionThreshold : float (optional)
64 Boundary in signal-to-noise between non-detections and detections for
65 making a binary image from the original input image
67 data = maskedImage.image.array
68 weights = 1 / maskedImage.variance.array
69 mask = maskedImage.getMask()
71 detectionMask = ((mask.array & mask.getPlaneBitMask(detectedPlane)))
72 badPixelMask = mask.getPlaneBitMask(badMaskPlanes)
73 badMask = (mask.array & badPixelMask) > 0
74 fitMask = detectionMask.astype(bool) & ~badMask
76 fitData = np.copy(data)
78 fitWeights = np.copy(weights)
79 fitWeights[~fitMask] = 0
83 ymax, xmax = fitData.shape
84 if (ymax % binning == 0)
and (xmax % binning == 0)
and (
not forceSlowBin):
86 binNumeratorReshape = (fitData * fitWeights).reshape(ymax // binning, binning,
87 xmax // binning, binning)
88 binDenominatorReshape = fitWeights.reshape(binNumeratorReshape.shape)
89 binnedNumerator = binNumeratorReshape.sum(axis=3).sum(axis=1)
90 binnedDenominator = binDenominatorReshape.sum(axis=3).sum(axis=1)
93 warnings.warn(
'Using slow binning method--consider choosing a binsize that evenly divides '
94 f
'into the image size, so that {ymax} mod binning == 0 '
95 f
'and {xmax} mod binning == 0')
96 xarray = np.arange(xmax)
97 yarray = np.arange(ymax)
98 xmesh, ymesh = np.meshgrid(xarray, yarray)
99 xbins = np.arange(0, xmax + binning, binning)
100 ybins = np.arange(0, ymax + binning, binning)
101 numerator = fitWeights * fitData
102 binnedNumerator, *_ = scipy.stats.binned_statistic_2d(ymesh.ravel(), xmesh.ravel(),
103 numerator.ravel(), statistic=
'sum',
105 binnedDenominator, *_ = scipy.stats.binned_statistic_2d(ymesh.ravel(), xmesh.ravel(),
106 fitWeights.ravel(), statistic=
'sum',
108 binnedData = np.zeros(binnedNumerator.shape)
109 ind = binnedDenominator != 0
110 np.divide(binnedNumerator, binnedDenominator, out=binnedData, where=ind)
111 binnedWeight = binnedDenominator
112 binMask = (binnedData * binnedWeight**0.5) > detectionThreshold
113 tmpOutputMask = binMask.repeat(binning, axis=0)[:ymax]
114 outputMask = tmpOutputMask.repeat(binning, axis=1)[:, :xmax]
116 outputMask = (fitData * fitWeights**0.5) > detectionThreshold
119 maskedImage.mask.array &= ~maskedImage.mask.getPlaneBitMask(detectedPlane)
122 maskedImage.mask.array[outputMask] |= maskedImage.mask.getPlaneBitMask(detectedPlane)
127 """A simple data class to describe a line profile. The parameter `rho`
128 describes the distance from the center of the image, `theta` describes
129 the angle, and `sigma` describes the width of the line.
137 """Collection of `Line` objects.
142 Array of `Line` rho parameters
144 Array of `Line` theta parameters
145 sigmas : np.ndarray (optional)
146 Array of `Line` sigma parameters
151 sigmas = np.zeros(len(rhos))
153 self.
_lines_lines = [
Line(rho, theta, sigma)
for (rho, theta, sigma)
in
154 zip(rhos, thetas, sigmas)]
157 return len(self.
_lines_lines)
160 return self.
_lines_lines[index]
166 joinedString =
", ".join(str(line)
for line
in self.
_lines_lines)
167 return textwrap.shorten(joinedString, width=160, placeholder=
"...")
171 return np.array([line.rho
for line
in self.
_lines_lines])
175 return np.array([line.theta
for line
in self.
_lines_lines])
178 """Add line to current collection of lines.
183 `Line` to add to current collection of lines
189 """Construct and/or fit a model for a linear streak.
191 This assumes a simple model for a streak, in which the streak
192 follows a straight line in pixels space, with a Moffat-shaped profile. The
193 model is fit to data using a Newton-Raphson style minimization algorithm.
194 The initial guess for the line parameters is assumed to be fairly accurate,
195 so only a narrow band of pixels around the initial line estimate is used in
196 fitting the model, which provides a significant speed-up over using all the
197 data. The class can also be used just to construct a model for the data with
198 a line following the given coordinates.
206 line : `Line` (optional)
207 Guess for position of line. Data far from line guess is masked out.
208 Defaults to None, in which case only data with `weights` = 0 is masked
215 self._ymax, self.
_xmax_xmax = data.shape
216 self.
_dtype_dtype = data.dtype
217 xrange = np.arange(self.
_xmax_xmax) - self.
_xmax_xmax / 2.
218 yrange = np.arange(self._ymax) - self._ymax / 2.
219 self.
_rhoMax_rhoMax = ((0.5 * self._ymax)**2 + (0.5 * self.
_xmax_xmax)**2)**0.5
220 self._xmesh, self.
_ymesh_ymesh = np.meshgrid(xrange, yrange)
227 """Set mask around the image region near the line
232 Parameters of line in the image
236 radtheta = np.deg2rad(line.theta)
237 distance = (np.cos(radtheta) * self._xmesh + np.sin(radtheta) * self.
_ymesh_ymesh - line.rho)
238 m = (
abs(distance) < 5 * line.sigma)
249 def _makeMaskedProfile(self, line, fitFlux=True):
250 """Construct the line model in the masked region and calculate its
256 Parameters of line profile for which to make profile in the masked
259 Fit the amplitude of the line profile to the data
264 Model in the masked region
266 Derivative of the model in the masked region
268 invSigma = line.sigma**-1
270 radtheta = np.deg2rad(line.theta)
271 costheta = np.cos(radtheta)
272 sintheta = np.sin(radtheta)
273 distance = (costheta * self.
_mxmesh_mxmesh + sintheta * self.
_mymesh_mymesh - line.rho)
274 distanceSquared = distance**2
278 dDistanceSqdRho = 2 * distance * (-np.ones_like(self.
_mxmesh_mxmesh))
279 dDistanceSqdTheta = (2 * distance * (-sintheta * self.
_mxmesh_mxmesh + costheta * self.
_mymesh_mymesh) * drad)
282 profile = (1 + distanceSquared * invSigma**2)**-2.5
283 dProfile = -2.5 * (1 + distanceSquared * invSigma**2)**-3.5
295 model = flux * profile
298 fluxdProfile = flux * dProfile
299 fluxdProfileInvSigma = fluxdProfile * invSigma**2
300 dModeldRho = fluxdProfileInvSigma * dDistanceSqdRho
301 dModeldTheta = fluxdProfileInvSigma * dDistanceSqdTheta
302 dModeldInvSigma = fluxdProfile * distanceSquared * 2 * invSigma
304 dModel = np.array([dModeldRho, dModeldTheta, dModeldInvSigma])
308 """Construct the line profile model
313 Parameters of the line profile to model
314 fitFlux : bool (optional)
315 Fit the amplitude of the line profile to the data
319 finalModel : np.ndarray
320 Model for line profile
323 finalModel = np.zeros((self._ymax, self.
_xmax_xmax), dtype=self.
_dtype_dtype)
324 finalModel[self.
lineMasklineMask] = model
327 def _lineChi2(self, line, grad=True):
328 """Construct the chi2 between the data and the model
333 `Line` parameters for which to build model and calculate chi2
334 grad : bool (optional)
335 Whether or not to return the gradient and hessian
340 Reduced chi2 of the model
341 reducedDChi : np.ndarray
342 Derivative of the chi2 with respect to rho, theta, invSigma
343 reducedHessianChi : np.ndarray
344 Hessian of the chi2 with respect to rho, theta, invSigma
353 derivChi2 = ((-2 * self.
_maskWeights_maskWeights * (self.
_maskData_maskData - model))[
None, :] * dModel).sum(axis=1)
354 hessianChi2 = (2 * self.
_maskWeights_maskWeights * dModel[:,
None, :] * dModel[
None, :, :]).sum(axis=2)
358 reducedHessianChi = hessianChi2 / self.
lineMaskSizelineMaskSize
359 return reducedChi, reducedDChi, reducedHessianChi
361 def fit(self, dChi2Tol=0.1, maxIter=100):
362 """Perform Newton-Raphson minimization to find line parameters
364 This method takes advantage of having known derivative and Hessian of
365 the multivariate function to quickly and efficiently find the minimum.
366 This is more efficient than the scipy implementation of the Newton-
367 Raphson method, which doesn't take advantage of the Hessian matrix. The
368 method here also performs a line search in the direction of the steepest
369 derivative at each iteration, which reduces the number of iterations
374 dChi2Tol : float (optional)
375 Change in Chi2 tolerated for fit convergence
376 maxIter : int (optional)
377 Maximum number of fit iterations allowed. The fit should converge in
378 ~10 iterations, depending on the value of dChi2Tol, but this
379 maximum provides a backup.
384 Coordinates and inverse width of fit line
386 Reduced Chi2 of model fit to data
388 Boolean where `False` corresponds to a successful fit
398 def line_search(c, dx):
400 testLine =
Line(testx[0], testx[1], testx[2]**-1)
401 return self.
_lineChi2_lineChi2(testLine, grad=
False)
403 while abs(dChi2) > dChi2Tol:
404 line =
Line(x[0], x[1], x[2]**-1)
405 chi2, b, A = self.
_lineChi2_lineChi2(line)
408 if not np.isfinite(A).
all():
412 dChi2 = oldChi2 - chi2
413 cholesky = scipy.linalg.cho_factor(A)
414 dx = scipy.linalg.cho_solve(cholesky, b)
416 factor, fmin, _, _ = scipy.optimize.brent(line_search, args=(dx,), full_output=
True, tol=0.05)
418 if (
abs(x[0]) > 1.5 * self.
_rhoMax_rhoMax)
or (iter > maxIter):
424 outline =
Line(x[0], x[1],
abs(x[2])**-1)
426 return outline, chi2, fitFailure
430 """Configuration parameters for `MaskStreaksTask`
432 minimumKernelHeight = pexConfig.Field(
433 doc=
"Minimum height of the streak-finding kernel relative to the tallest kernel",
437 absMinimumKernelHeight = pexConfig.Field(
438 doc=
"Minimum absolute height of the streak-finding kernel",
442 clusterMinimumSize = pexConfig.Field(
443 doc=
"Minimum size in pixels of detected clusters",
447 clusterMinimumDeviation = pexConfig.Field(
448 doc=
"Allowed deviation (in pixels) from a straight line for a detected "
453 delta = pexConfig.Field(
454 doc=
"Stepsize in angle-radius parameter space",
458 nSigma = pexConfig.Field(
459 doc=
"Number of sigmas from center of kernel to include in voting "
464 rhoBinSize = pexConfig.Field(
465 doc=
"Binsize in pixels for position parameter rho when finding "
466 "clusters of detected lines",
470 thetaBinSize = pexConfig.Field(
471 doc=
"Binsize in degrees for angle parameter theta when finding "
472 "clusters of detected lines",
476 invSigma = pexConfig.Field(
477 doc=
"Inverse of the Moffat sigma parameter (in units of pixels)"
478 "describing the profile of the streak",
482 footprintThreshold = pexConfig.Field(
483 doc=
"Threshold at which to determine edge of line, in units of the line"
488 dChi2Tolerance = pexConfig.Field(
489 doc=
"Absolute difference in Chi2 between iterations of line profile"
490 "fitting that is acceptable for convergence",
494 detectedMaskPlane = pexConfig.Field(
495 doc=
"Name of mask with pixels above detection threshold, used for first"
496 "estimate of streak locations",
500 streaksMaskPlane = pexConfig.Field(
501 doc=
"Name of mask plane holding detected streaks",
508 """Find streaks or other straight lines in image data.
510 Nearby objects passing through the field of view of the telescope leave a
511 bright trail in images. This class uses the Kernel Hough Transform (KHT)
512 (Fernandes and Oliveira, 2007), implemented in `lsst.houghtransform`. The
513 procedure works by taking a binary image, either provided as put or produced
514 from the input data image, using a Canny filter to make an image of the
515 edges in the original image, then running the KHT on the edge image. The KHT
516 identifies clusters of non-zero points, breaks those clusters of points into
517 straight lines, keeps clusters with a size greater than the user-set
518 threshold, then performs a voting procedure to find the best-fit coordinates
519 of any straight lines. Given the results of the KHT algorithm, clusters of
520 lines are identified and grouped (generally these correspond to the two
521 edges of a strea) and a profile is fit to the streak in the original
525 ConfigClass = MaskStreaksConfig
526 _DefaultName =
"maskStreaks"
530 """Find streaks in a masked image
534 maskedImage : `lsst.afw.image.maskedImage`
535 The image in which to search for streaks.
539 result : `lsst.pipe.base.Struct`
540 Result struct with components:
542 - ``originalLines``: lines identified by kernel hough transform
543 - ``lineClusters``: lines grouped into clusters in rho-theta space
544 - ``lines``: final result for lines after line-profile fit
545 - ``mask``: 2-d boolean mask where detected lines are True
547 mask = maskedImage.getMask()
548 detectionMask = (mask.array & mask.getPlaneBitMask(self.config.detectedMaskPlane))
553 if len(self.
lineslines) == 0:
554 lineMask = np.zeros(detectionMask.shape, dtype=bool)
559 fitLines, lineMask = self.
_fitProfile_fitProfile(clusters, maskedImage)
562 outputMask = lineMask & detectionMask.astype(bool)
564 return pipeBase.Struct(
566 lineClusters=clusters,
567 originalLines=self.
lineslines,
572 def run(self, maskedImage):
573 """Find and mask streaks in a masked image.
575 Finds streaks in the image and modifies maskedImage in place by adding a
576 mask plane with any identified streaks.
580 maskedImage : `lsst.afw.image.maskedImage`
581 The image in which to search for streaks. The mask detection plane
582 corresponding to `config.detectedMaskPlane` must be set with the
587 result : `lsst.pipe.base.Struct`
588 Result struct with components:
590 - ``originalLines``: lines identified by kernel hough transform
591 - ``lineClusters``: lines grouped into clusters in rho-theta space
592 - ``lines``: final result for lines after line-profile fit
594 streaks = self.
findfind(maskedImage)
596 maskedImage.mask.addMaskPlane(self.config.streaksMaskPlane)
597 maskedImage.mask.array[streaks.mask] |= maskedImage.mask.getPlaneBitMask(self.config.streaksMaskPlane)
599 return pipeBase.Struct(
601 lineClusters=streaks.lineClusters,
602 originalLines=streaks.originalLines,
605 def _cannyFilter(self, image):
606 """Apply a canny filter to the data in order to detect edges
611 2-d image data on which to run filter
615 cannyData : `np.ndarray`
616 2-d image of edges found in input image
618 filterData = image.astype(int)
619 return canny(filterData, low_threshold=0, high_threshold=1, sigma=0.1)
621 def _runKHT(self, image):
622 """Run Kernel Hough Transform on image.
627 2-d image data on which to detect lines
631 result : `LineCollection`
632 Collection of detected lines, with their detected rho and theta
635 lines = lsst.kht.find_lines(image, self.config.clusterMinimumSize,
636 self.config.clusterMinimumDeviation, self.config.delta,
637 self.config.minimumKernelHeight, self.config.nSigma,
638 self.config.absMinimumKernelHeight)
642 def _findClusters(self, lines):
643 """Group lines that are close in parameter space and likely describe
648 lines : `LineCollection`
649 Collection of lines to group into clusters
653 result : `LineCollection`
654 Average `Line` for each cluster of `Line`s in the input
661 x = lines.rhos / self.config.rhoBinSize
662 y = lines.thetas / self.config.thetaBinSize
663 X = np.array([x, y]).T
672 kmeans = KMeans(n_clusters=nClusters).fit(X)
673 clusterStandardDeviations = np.zeros((nClusters, 2))
674 for c
in range(nClusters):
675 inCluster = X[kmeans.labels_ == c]
676 clusterStandardDeviations[c] = np.std(inCluster, axis=0)
678 if (clusterStandardDeviations <= 1).
all():
683 finalClusters = kmeans.cluster_centers_.T
686 finalRhos = finalClusters[0] * self.config.rhoBinSize
687 finalThetas = finalClusters[1] * self.config.thetaBinSize
692 def _fitProfile(self, lines, maskedImage):
693 """Fit the profile of the streak.
695 Given the initial parameters of detected lines, fit a model for the
696 streak to the original (non-binary image). The assumed model is a
697 straight line with a Moffat profile.
701 lines : `LineCollection`
702 Collection of guesses for `Line`s detected in the image
703 maskedImage : `lsst.afw.image.maskedImage`
704 Original image to be used to fit profile of streak.
708 lineFits : `LineCollection`
709 Collection of `Line` profiles fit to the data
710 finalMask : `np.ndarray`
711 2d mask array with detected streaks=1.
713 data = maskedImage.image.array
714 weights = maskedImage.variance.array**-1
716 weights[~np.isfinite(weights) | ~np.isfinite(data)] = 0
719 finalLineMasks = [np.zeros(data.shape, dtype=bool)]
721 line.sigma = self.config.invSigma**-1
724 if lineModel.lineMaskSize == 0:
727 fit, chi2, fitFailure = lineModel.fit(dChi2Tol=self.config.dChi2Tolerance)
731 if ((
abs(fit.rho - line.rho) > 2 * self.config.rhoBinSize)
732 or (
abs(fit.theta - line.theta) > 2 * self.config.thetaBinSize)):
739 lineModel.setLineMask(fit)
740 finalModel = lineModel.makeProfile(fit)
742 finalModelMax =
abs(finalModel).
max()
743 finalLineMask =
abs(finalModel) > self.config.footprintThreshold
745 if not finalLineMask.any():
748 fit.finalModelMax = finalModelMax
750 finalLineMasks.append(finalLineMask)
752 finalMask = np.array(finalLineMasks).
any(axis=0)
754 return lineFits, finalMask
def __init__(self, rhos, thetas, sigmas=None)
def append(self, newLine)
def __getitem__(self, index)
def __init__(self, data, weights, line=None)
def makeProfile(self, line, fitFlux=True)
def _makeMaskedProfile(self, line, fitFlux=True)
def _lineChi2(self, line, grad=True)
def fit(self, dChi2Tol=0.1, maxIter=100)
def setLineMask(self, line)
def run(self, maskedImage)
def find(self, maskedImage)
def _fitProfile(self, lines, maskedImage)
def _findClusters(self, lines)
def _cannyFilter(self, image)
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def setDetectionMask(maskedImage, forceSlowBin=False, binning=None, detectedPlane="DETECTED", badMaskPlanes=("NO_DATA", "INTRP", "BAD", "SAT", "EDGE"), detectionThreshold=5)
Angle abs(Angle const &a)