LSST Applications 28.0.2,g0fba68d861+5b923b673a,g1fd858c14a+7a7b9dd5ed,g2c84ff76c0+5548bfee71,g30358e5240+f0e04ebe90,g35bb328faa+fcb1d3bbc8,g436fd98eb5+bdc6fcdd04,g4af146b050+742274f7cd,g4d2262a081+3efd3f8190,g4e0f332c67+cb09b8a5b6,g53246c7159+fcb1d3bbc8,g5a012ec0e7+477f9c599b,g5edb6fd927+826dfcb47f,g60b5630c4e+bdc6fcdd04,g67b6fd64d1+2218407a0c,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g87b7deb4dc+f9ac2ab1bd,g8852436030+ebf28f0d95,g89139ef638+2218407a0c,g9125e01d80+fcb1d3bbc8,g989de1cb63+2218407a0c,g9f33ca652e+42fb53f4c8,g9f7030ddb1+11b9b6f027,ga2b97cdc51+bdc6fcdd04,gab72ac2889+bdc6fcdd04,gabe3b4be73+1e0a283bba,gabf8522325+3210f02652,gb1101e3267+9c79701da9,gb58c049af0+f03b321e39,gb89ab40317+2218407a0c,gcf25f946ba+ebf28f0d95,gd6cbbdb0b4+e8f9c9c900,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+a08f294619,ge278dab8ac+3ef3db156b,ge410e46f29+2218407a0c,gf67bdafdda+2218407a0c
LSST Data Management Base Package
|
Classes | |
class | fixes |
class | PrettyPictureConnections |
Variables | |
doWrite | |
r | |
g | |
b | |
images : `Mapping` of `str` to `Exposure` | |
result : `Struct` | |
refs : `Iterable` of `DatasetRef` | |
butler : `Butler` or `QuantumContext` | |
sortedImages : `dict` of `str` to `Exposure` | |
kwargs : `NDArray` | |
R : `int` | |
C : `int` | |
slices : `list` of `tuple` | |
image : `NDArray` | |
inputCoadd : `Exposure` | |
results : `Struct` of `Mapping` of `str` to `ExposureF` | |
lsst.pipe.tasks.prettyPictureMaker._task.butler : `Butler` or `QuantumContext` |
lsst.pipe.tasks.prettyPictureMaker._task.image : `NDArray` |
M = arr.shape[0] N = arr.shape[1] # Function to compute slices for a given dimension size and number of divisions def get_slices(total_size, num_divisions): base = total_size // num_divisions remainder = total_size % num_divisions slices = [] start = 0 for i in range(num_divisions): end = start + base if i < remainder: end += 1 slices.append((start, end)) start = end return slices # Get row and column slices row_slices = get_slices(M, R) col_slices = get_slices(N, C) # Generate all possible tile combinations of row and column slices tiles = [] for rs in row_slices: r_start, r_end = rs for cs in col_slices: c_start, c_end = cs tile_slice = (slice(r_start, r_end), slice(c_start, c_end)) tiles.append(tile_slice) return tiles def fixBackground(self, image):
lsst.pipe.tasks.prettyPictureMaker._task.images : `Mapping` of `str` to `Exposure` |
r = Field[float](doc="The amount of red contained in this channel") g = Field[float](doc="The amount of green contained in this channel") b = Field[float](doc="The amount of blue contained in this channel") class LumConfig(Config):
stretch = Field[float](doc="The stretch of the luminance in asinh", default=400) max = Field[float](doc="The maximum allowed luminance on a 0 to 100 scale", default=85) floor = Field[float](doc="A scaling factor to apply to the luminance before asinh scaling", default=0.0) Q = Field[float](doc="softening parameter", default=0.7) highlight = Field[float]( doc="The value of highlights in scaling factor applied to post asinh streaching", default=1.0 ) shadow = Field[float]( doc="The value of shadows in scaling factor applied to post asinh streaching", default=0.0 ) midtone = Field[float]( doc="The value of midtone in scaling factor applied to post asinh streaching", default=0.5 ) class LocalContrastConfig(Config):
doLocalContrast = Field[bool]( doc="Apply local contrast enhancements to the luminance channel", default=True ) highlights = Field[float](doc="Adjustment factor for the highlights", default=-0.9) shadows = Field[float](doc="Adjustment factor for the shadows", default=0.5) clarity = Field[float](doc="Amount of clarity to apply to contrast modification", default=0.1) sigma = Field[float]( doc="The scale size of what is considered local in the contrast enhancement", default=30 ) maxLevel = Field[int]( doc="The maximum number of scales the contrast should be enhanced over, if None then all", default=4, optional=True, ) class ScaleColorConfig(Config):
saturation = Field[float]( doc=( "The overall saturation factor with the scaled luminance between zero and one. " "A value of one is not recommended as it makes bright pixels very saturated" ), default=0.5, ) maxChroma = Field[float]( doc=( "The maximum chromaticity in the CIELCh color space, large " "values will cause bright pixels to fall outside the RGB gamut." ), default=50.0, ) class RemapBoundsConfig(Config):
quant = Field[float]( doc=( "The maximum values of each of the three channels will be multiplied by this factor to " "determine the maximum flux of the image, values larger than this quantity will be clipped." ), default=0.8, ) absMax = Field[float]( doc="Instead of determining the maximum value from the image, use this fixed value instead", default=220, optional=True, ) class PrettyPictureConfig(PipelineTaskConfig, pipelineConnections=PrettyPictureConnections): channelConfig = ConfigDictField( doc="A dictionary that maps band names to their rgb channel configurations", keytype=str, itemtype=ChannelRGBConfig, default={}, ) imageRemappingConfig = ConfigField[RemapBoundsConfig]( doc="Configuration controlling channel normalization process" ) luminanceConfig = ConfigField[LumConfig]( doc="Configuration for the luminance scaling when making an RGB image" ) localContrastConfig = ConfigField[LocalContrastConfig]( doc="Configuration controlling the local contrast correction in RGB image production" ) colorConfig = ConfigField[ScaleColorConfig]( doc="Configuration to control the color scaling process in RGB image production" ) cieWhitePoint = ListField[float]( doc="The white point of the input arrays in ciexz coordinates", maxLength=2, default=[0.28, 0.28] ) arrayType = ChoiceField[str]( doc="The dataset type for the output image array", default="uint8", allowed={ "uint8": "Use 8 bit arrays, 255 max", "uint16": "Use 16 bit arrays, 65535 max", "half": "Use 16 bit float arrays, 1 max", "float": "Use 32 bit float arrays, 1 max", }, ) doPSFDeconcovlve = Field[bool]( doc="Use the PSF in a richardson lucy deconvolution on the luminance channel.", default=True ) exposureBrackets = ListField[float]( doc=( "Exposure scaling factors used in creating multiple exposures with different scalings which will " "then be fused into a final image" ), optional=True, default=[1.25, 1, 0.75], ) doRemapGamut = Field[bool]( doc="Apply a color correction to unrepresentable colors, if false they will clip", default=True ) gamutMethod = ChoiceField[str]( doc="If doRemapGamut is True this determines the method", default="inpaint", allowed={ "mapping": "Use a mapping function", "inpaint": "Use surrounding pixels to determine likely value", }, ) def setDefaults(self): self.channelConfig["i"] = ChannelRGBConfig(r=1, g=0, b=0) self.channelConfig["r"] = ChannelRGBConfig(r=0, g=1, b=0) self.channelConfig["g"] = ChannelRGBConfig(r=0, g=0, b=1) return super().setDefaults() class PrettyPictureTask(PipelineTask):
_DefaultName = "prettyPictureTask" ConfigClass = PrettyPictureConfig config: ConfigClass def run(self, images: Mapping[str, Exposure]) -> Struct:
lsst.pipe.tasks.prettyPictureMaker._task.inputCoadd : `Exposure` |
# Find the median value in the image, which is likely to be # close to average background. Note this doesn't work well # in fields with high density or diffuse flux. maxLikely = np.median(image, axis=None) # find all the pixels that are fainter than this # and find the std. This is just used as an initialization # parameter and doesn't need to be accurate. mask = image < maxLikely initial_std = (image[mask] - maxLikely).std() # Don't do anything if there are no pixels to check if np.any(mask): # use a minimizer to determine best mu and sigma for a Gaussian # given only samples below the mean of the Gaussian. result = minimize( self._neg_log_likelihood, (maxLikely, initial_std), args=(image[mask]), bounds=((maxLikely, None), (1e-8, None)), ) mu_hat, sigma_hat = result.x else: mu_hat, sigma_hat = (maxLikely, 2 * initial_std) # create a new masking threshold that is the determined # mean plus std from the fit threshhold = mu_hat + sigma_hat image_mask = image < threshhold # create python slices that tile the image. tiles = self._tile_slices(image, 25, 25) yloc = [] xloc = [] values = [] # for each box find the middle position and the median background # value in the window. for xslice, yslice in tiles: ypos = (yslice.stop - yslice.start) / 2 + yslice.start xpos = (xslice.stop - xslice.start) / 2 + xslice.start yloc.append(ypos) xloc.append(xpos) window = image[yslice, xslice][image_mask[yslice, xslice]] if window.size > 0: value = np.median(window) else: value = 0 values.append(value) positions = np.meshgrid(np.arange(image.shape[0]), np.arange(image.shape[1])) # create an interpolant for the background and interpolate over the image. inter = RBFInterpolator( np.vstack((yloc, xloc)).T, values, kernel="thin_plate_spline", degree=4, smoothing=0.05 ) backgrounds = inter(np.array(positions)[::-1].reshape(2, -1).T).reshape(image.shape) return backgrounds def run(self, inputCoadd: Exposure):
lsst.pipe.tasks.prettyPictureMaker._task.kwargs : `NDArray` |
sortedImages: dict[str, Exposure] = {} for ref in refs: key: str = cast(str, ref.dataId["band"]) image = butler.get(ref) sortedImages[key] = image return sortedImages def makeInputsFromArrays(self, **kwargs) -> dict[int, DeferredDatasetHandle]: r
# ignore type because there aren't proper stubs for afw temp = {} for key, array in kwargs.items(): temp[key] = Exposure(Box2I(Point2I(0, 0), Extent2I(*array.shape)), dtype=array.dtype) temp[key].image.array[:] = array return self.makeInputsFromExposures(**temp) def makeInputsFromExposures(self, **kwargs) -> dict[int, DeferredDatasetHandle]: r
lsst.pipe.tasks.prettyPictureMaker._task.R : `int` |
sortedImages = {} for key, value in kwargs.items(): sortedImages[key] = value return sortedImages class PrettyPictureBackgroundFixerConnections( PipelineTaskConnections, dimensions=("tract", "patch", "skymap", "band"), defaultTemplates={"coaddTypeName": "deep"}, ): inputCoadd = Input( doc=("Input coadd for which the background is to be removed"), name="{coaddTypeName}CoaddPsfMatched", storageClass="ExposureF", dimensions=("tract", "patch", "skymap", "band"), ) outputCoadd = Output( doc="The coadd with the background fixed and subtracted", name="pretty_picture_coadd_bg_subtracted", storageClass="ExposureF", dimensions=("tract", "patch", "skymap", "band"), ) class PrettyPictureBackgroundFixerConfig( PipelineTaskConfig, pipelineConnections=PrettyPictureBackgroundFixerConnections ): pass class PrettyPictureBackgroundFixerTask(PipelineTask):
_DefaultName = "prettyPictureBackgroundFixerTask" ConfigClass = PrettyPictureBackgroundFixerConfig config: ConfigClass def _neg_log_likelihood(self, params, x):
lsst.pipe.tasks.prettyPictureMaker._task.refs : `Iterable` of `DatasetRef` |
channels = {} shape = (0, 0) jointMask: None | NDArray = None maskDict: Mapping[str, int] = {} doJointMaskInit = False if jointMask is None: doJointMask = True doJointMaskInit = True for channel, imageExposure in images.items(): imageArray = imageExposure.image.array # run all the plugins designed for array based interaction for plug in plugins.channel(): imageArray = plug( imageArray, imageExposure.mask.array, imageExposure.mask.getMaskPlaneDict(), self.config ).astype(np.float32) channels[channel] = imageArray # These operations are trivial look-ups and don't matter if they # happen in each loop. shape = imageArray.shape maskDict = imageExposure.mask.getMaskPlaneDict() if doJointMaskInit: jointMask = np.zeros(shape, dtype=imageExposure.mask.dtype) doJointMaskInit = False if doJointMask: jointMask |= imageExposure.mask.array # mix the images to RGB imageRArray = np.zeros(shape, dtype=np.float32) imageGArray = np.zeros(shape, dtype=np.float32) imageBArray = np.zeros(shape, dtype=np.float32) for band, image in channels.items(): mix = self.config.channelConfig[band] if mix.r: imageRArray += mix.r * image if mix.g: imageGArray += mix.g * image if mix.b: imageBArray += mix.b * image exposure = next(iter(images.values())) box: Box2I = exposure.getBBox() boxCenter = box.getCenter() try: psf = exposure.psf.computeImage(boxCenter).array except Exception: psf = None # assert for typing reasons assert jointMask is not None # Run any image level correction plugins colorImage = np.zeros((*imageRArray.shape, 3)) colorImage[:, :, 0] = imageRArray colorImage[:, :, 1] = imageGArray colorImage[:, :, 2] = imageBArray for plug in plugins.partial(): colorImage = plug(colorImage, jointMask, maskDict, self.config) # Ignore type because Exposures do in fact have a bbox, but it's c++ # and not typed. colorImage = lsstRGB( colorImage[:, :, 0], colorImage[:, :, 1], colorImage[:, :, 2], scaleLumKWargs=self.config.luminanceConfig.toDict(), remapBoundsKwargs=self.config.imageRemappingConfig.toDict(), scaleColorKWargs=self.config.colorConfig.toDict(), **(self.config.localContrastConfig.toDict()), cieWhitePoint=tuple(self.config.cieWhitePoint), # type: ignore psf=psf if self.config.doPSFDeconcovlve else None, brackets=list(self.config.exposureBrackets) if self.config.exposureBrackets else None, doRemapGamut=self.config.doRemapGamut, gamutMethod=self.config.gamutMethod, ) # Find the dataset type and thus the maximum values as well maxVal: int | float match self.config.arrayType: case "uint8": dtype = np.uint8 maxVal = 255 case "uint16": dtype = np.uint16 maxVal = 65535 case "half": dtype = np.half maxVal = 1.0 case "float": dtype = np.float32 maxVal = 1.0 case _: assert True, "This code path should be unreachable" # lsstRGB returns an image in 0-1 scale it to the maximum value colorImage *= maxVal # type: ignore # pack the joint mask back into a mask object lsstMask = Mask(width=jointMask.shape[1], height=jointMask.shape[0], planeDefs=maskDict) lsstMask.array = jointMask # type: ignore return Struct(outputRGB=colorImage.astype(dtype), outputRGBMask=lsstMask) # type: ignore def runQuantum( self, butlerQC: QuantumContext, inputRefs: InputQuantizedConnection, outputRefs: OutputQuantizedConnection, ) -> None: imageRefs: list[DatasetRef] = inputRefs.inputCoadds sortedImages = self.makeInputsFromRefs(imageRefs, butlerQC) outputs = self.run(sortedImages) butlerQC.put(outputs, outputRefs) def makeInputsFromRefs( self, refs: Iterable[DatasetRef], butler: Butler | QuantumContext ) -> dict[str, Exposure]: r
lsst.pipe.tasks.prettyPictureMaker._task.results : `Struct` of `Mapping` of `str` to `ExposureF` |
lsst.pipe.tasks.prettyPictureMaker._task.slices : `list` of `tuple` |