|
LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
|
Classes | |
| class | fixes |
| class | PrettyPictureConnections |
Variables | |
| doWrite | |
| r | |
| g | |
| b | |
| images : `Mapping` of `str` to `Exposure` | |
| result : `Struct` | |
| refs : `Iterable` of `DatasetRef` | |
| butler : `Butler` or `QuantumContext` | |
| sortedImages : `dict` of `str` to `Exposure` | |
| kwargs : `NDArray` | |
| R : `int` | |
| C : `int` | |
| slices : `list` of `tuple` | |
| image : `NDArray` | |
| inputCoadd : `Exposure` | |
| results : `Struct` of `Mapping` of `str` to `ExposureF` | |
| lsst.pipe.tasks.prettyPictureMaker._task.butler : `Butler` or `QuantumContext` |
| lsst.pipe.tasks.prettyPictureMaker._task.image : `NDArray` |
M = arr.shape[0]
N = arr.shape[1]
# Function to compute slices for a given dimension size and number of divisions
def get_slices(total_size, num_divisions):
base = total_size // num_divisions
remainder = total_size % num_divisions
slices = []
start = 0
for i in range(num_divisions):
end = start + base
if i < remainder:
end += 1
slices.append((start, end))
start = end
return slices
# Get row and column slices
row_slices = get_slices(M, R)
col_slices = get_slices(N, C)
# Generate all possible tile combinations of row and column slices
tiles = []
for rs in row_slices:
r_start, r_end = rs
for cs in col_slices:
c_start, c_end = cs
tile_slice = (slice(r_start, r_end), slice(c_start, c_end))
tiles.append(tile_slice)
return tiles
def fixBackground(self, image):
| lsst.pipe.tasks.prettyPictureMaker._task.images : `Mapping` of `str` to `Exposure` |
r = Field[float](doc="The amount of red contained in this channel") g = Field[float](doc="The amount of green contained in this channel") b = Field[float](doc="The amount of blue contained in this channel") class LumConfig(Config):
stretch = Field[float](doc="The stretch of the luminance in asinh", default=400)
max = Field[float](doc="The maximum allowed luminance on a 0 to 100 scale", default=85)
floor = Field[float](doc="A scaling factor to apply to the luminance before asinh scaling", default=0.0)
Q = Field[float](doc="softening parameter", default=0.7)
highlight = Field[float](
doc="The value of highlights in scaling factor applied to post asinh streaching", default=1.0
)
shadow = Field[float](
doc="The value of shadows in scaling factor applied to post asinh streaching", default=0.0
)
midtone = Field[float](
doc="The value of midtone in scaling factor applied to post asinh streaching", default=0.5
)
class LocalContrastConfig(Config):
doLocalContrast = Field[bool](
doc="Apply local contrast enhancements to the luminance channel", default=True
)
highlights = Field[float](doc="Adjustment factor for the highlights", default=-0.9)
shadows = Field[float](doc="Adjustment factor for the shadows", default=0.5)
clarity = Field[float](doc="Amount of clarity to apply to contrast modification", default=0.1)
sigma = Field[float](
doc="The scale size of what is considered local in the contrast enhancement", default=30
)
maxLevel = Field[int](
doc="The maximum number of scales the contrast should be enhanced over, if None then all",
default=4,
optional=True,
)
class ScaleColorConfig(Config):
saturation = Field[float](
doc=(
"The overall saturation factor with the scaled luminance between zero and one. "
"A value of one is not recommended as it makes bright pixels very saturated"
),
default=0.5,
)
maxChroma = Field[float](
doc=(
"The maximum chromaticity in the CIELCh color space, large "
"values will cause bright pixels to fall outside the RGB gamut."
),
default=50.0,
)
class RemapBoundsConfig(Config):
quant = Field[float](
doc=(
"The maximum values of each of the three channels will be multiplied by this factor to "
"determine the maximum flux of the image, values larger than this quantity will be clipped."
),
default=0.8,
)
absMax = Field[float](
doc="Instead of determining the maximum value from the image, use this fixed value instead",
default=220,
optional=True,
)
class PrettyPictureConfig(PipelineTaskConfig, pipelineConnections=PrettyPictureConnections):
channelConfig = ConfigDictField(
doc="A dictionary that maps band names to their rgb channel configurations",
keytype=str,
itemtype=ChannelRGBConfig,
default={},
)
imageRemappingConfig = ConfigField[RemapBoundsConfig](
doc="Configuration controlling channel normalization process"
)
luminanceConfig = ConfigField[LumConfig](
doc="Configuration for the luminance scaling when making an RGB image"
)
localContrastConfig = ConfigField[LocalContrastConfig](
doc="Configuration controlling the local contrast correction in RGB image production"
)
colorConfig = ConfigField[ScaleColorConfig](
doc="Configuration to control the color scaling process in RGB image production"
)
cieWhitePoint = ListField[float](
doc="The white point of the input arrays in ciexz coordinates", maxLength=2, default=[0.28, 0.28]
)
arrayType = ChoiceField[str](
doc="The dataset type for the output image array",
default="uint8",
allowed={
"uint8": "Use 8 bit arrays, 255 max",
"uint16": "Use 16 bit arrays, 65535 max",
"half": "Use 16 bit float arrays, 1 max",
"float": "Use 32 bit float arrays, 1 max",
},
)
doPSFDeconcovlve = Field[bool](
doc="Use the PSF in a richardson lucy deconvolution on the luminance channel.", default=True
)
exposureBrackets = ListField[float](
doc=(
"Exposure scaling factors used in creating multiple exposures with different scalings which will "
"then be fused into a final image"
),
optional=True,
default=[1.25, 1, 0.75],
)
doRemapGamut = Field[bool](
doc="Apply a color correction to unrepresentable colors, if false they will clip", default=True
)
gamutMethod = ChoiceField[str](
doc="If doRemapGamut is True this determines the method",
default="inpaint",
allowed={
"mapping": "Use a mapping function",
"inpaint": "Use surrounding pixels to determine likely value",
},
)
def setDefaults(self):
self.channelConfig["i"] = ChannelRGBConfig(r=1, g=0, b=0)
self.channelConfig["r"] = ChannelRGBConfig(r=0, g=1, b=0)
self.channelConfig["g"] = ChannelRGBConfig(r=0, g=0, b=1)
return super().setDefaults()
class PrettyPictureTask(PipelineTask):
_DefaultName = "prettyPictureTask" ConfigClass = PrettyPictureConfig config: ConfigClass def run(self, images: Mapping[str, Exposure]) -> Struct:
| lsst.pipe.tasks.prettyPictureMaker._task.inputCoadd : `Exposure` |
# Find the median value in the image, which is likely to be
# close to average background. Note this doesn't work well
# in fields with high density or diffuse flux.
maxLikely = np.median(image, axis=None)
# find all the pixels that are fainter than this
# and find the std. This is just used as an initialization
# parameter and doesn't need to be accurate.
mask = image < maxLikely
initial_std = (image[mask] - maxLikely).std()
# Don't do anything if there are no pixels to check
if np.any(mask):
# use a minimizer to determine best mu and sigma for a Gaussian
# given only samples below the mean of the Gaussian.
result = minimize(
self._neg_log_likelihood,
(maxLikely, initial_std),
args=(image[mask]),
bounds=((maxLikely, None), (1e-8, None)),
)
mu_hat, sigma_hat = result.x
else:
mu_hat, sigma_hat = (maxLikely, 2 * initial_std)
# create a new masking threshold that is the determined
# mean plus std from the fit
threshhold = mu_hat + sigma_hat
image_mask = image < threshhold
# create python slices that tile the image.
tiles = self._tile_slices(image, 25, 25)
yloc = []
xloc = []
values = []
# for each box find the middle position and the median background
# value in the window.
for xslice, yslice in tiles:
ypos = (yslice.stop - yslice.start) / 2 + yslice.start
xpos = (xslice.stop - xslice.start) / 2 + xslice.start
yloc.append(ypos)
xloc.append(xpos)
window = image[yslice, xslice][image_mask[yslice, xslice]]
if window.size > 0:
value = np.median(window)
else:
value = 0
values.append(value)
positions = np.meshgrid(np.arange(image.shape[0]), np.arange(image.shape[1]))
# create an interpolant for the background and interpolate over the image.
inter = RBFInterpolator(
np.vstack((yloc, xloc)).T, values, kernel="thin_plate_spline", degree=4, smoothing=0.05
)
backgrounds = inter(np.array(positions)[::-1].reshape(2, -1).T).reshape(image.shape)
return backgrounds
def run(self, inputCoadd: Exposure):
| lsst.pipe.tasks.prettyPictureMaker._task.kwargs : `NDArray` |
sortedImages: dict[str, Exposure] = {}
for ref in refs:
key: str = cast(str, ref.dataId["band"])
image = butler.get(ref)
sortedImages[key] = image
return sortedImages
def makeInputsFromArrays(self, **kwargs) -> dict[int, DeferredDatasetHandle]:
r# ignore type because there aren't proper stubs for afw
temp = {}
for key, array in kwargs.items():
temp[key] = Exposure(Box2I(Point2I(0, 0), Extent2I(*array.shape)), dtype=array.dtype)
temp[key].image.array[:] = array
return self.makeInputsFromExposures(**temp)
def makeInputsFromExposures(self, **kwargs) -> dict[int, DeferredDatasetHandle]:
r
| lsst.pipe.tasks.prettyPictureMaker._task.R : `int` |
sortedImages = {}
for key, value in kwargs.items():
sortedImages[key] = value
return sortedImages
class PrettyPictureBackgroundFixerConnections(
PipelineTaskConnections,
dimensions=("tract", "patch", "skymap", "band"),
defaultTemplates={"coaddTypeName": "deep"},
):
inputCoadd = Input(
doc=("Input coadd for which the background is to be removed"),
name="{coaddTypeName}CoaddPsfMatched",
storageClass="ExposureF",
dimensions=("tract", "patch", "skymap", "band"),
)
outputCoadd = Output(
doc="The coadd with the background fixed and subtracted",
name="pretty_picture_coadd_bg_subtracted",
storageClass="ExposureF",
dimensions=("tract", "patch", "skymap", "band"),
)
class PrettyPictureBackgroundFixerConfig(
PipelineTaskConfig, pipelineConnections=PrettyPictureBackgroundFixerConnections
):
pass
class PrettyPictureBackgroundFixerTask(PipelineTask):
_DefaultName = "prettyPictureBackgroundFixerTask" ConfigClass = PrettyPictureBackgroundFixerConfig config: ConfigClass def _neg_log_likelihood(self, params, x):
| lsst.pipe.tasks.prettyPictureMaker._task.refs : `Iterable` of `DatasetRef` |
channels = {}
shape = (0, 0)
jointMask: None | NDArray = None
maskDict: Mapping[str, int] = {}
doJointMaskInit = False
if jointMask is None:
doJointMask = True
doJointMaskInit = True
for channel, imageExposure in images.items():
imageArray = imageExposure.image.array
# run all the plugins designed for array based interaction
for plug in plugins.channel():
imageArray = plug(
imageArray, imageExposure.mask.array, imageExposure.mask.getMaskPlaneDict(), self.config
).astype(np.float32)
channels[channel] = imageArray
# These operations are trivial look-ups and don't matter if they
# happen in each loop.
shape = imageArray.shape
maskDict = imageExposure.mask.getMaskPlaneDict()
if doJointMaskInit:
jointMask = np.zeros(shape, dtype=imageExposure.mask.dtype)
doJointMaskInit = False
if doJointMask:
jointMask |= imageExposure.mask.array
# mix the images to RGB
imageRArray = np.zeros(shape, dtype=np.float32)
imageGArray = np.zeros(shape, dtype=np.float32)
imageBArray = np.zeros(shape, dtype=np.float32)
for band, image in channels.items():
mix = self.config.channelConfig[band]
if mix.r:
imageRArray += mix.r * image
if mix.g:
imageGArray += mix.g * image
if mix.b:
imageBArray += mix.b * image
exposure = next(iter(images.values()))
box: Box2I = exposure.getBBox()
boxCenter = box.getCenter()
try:
psf = exposure.psf.computeImage(boxCenter).array
except Exception:
psf = None
# assert for typing reasons
assert jointMask is not None
# Run any image level correction plugins
colorImage = np.zeros((*imageRArray.shape, 3))
colorImage[:, :, 0] = imageRArray
colorImage[:, :, 1] = imageGArray
colorImage[:, :, 2] = imageBArray
for plug in plugins.partial():
colorImage = plug(colorImage, jointMask, maskDict, self.config)
# Ignore type because Exposures do in fact have a bbox, but it's c++
# and not typed.
colorImage = lsstRGB(
colorImage[:, :, 0],
colorImage[:, :, 1],
colorImage[:, :, 2],
scaleLumKWargs=self.config.luminanceConfig.toDict(),
remapBoundsKwargs=self.config.imageRemappingConfig.toDict(),
scaleColorKWargs=self.config.colorConfig.toDict(),
**(self.config.localContrastConfig.toDict()),
cieWhitePoint=tuple(self.config.cieWhitePoint), # type: ignore
psf=psf if self.config.doPSFDeconcovlve else None,
brackets=list(self.config.exposureBrackets) if self.config.exposureBrackets else None,
doRemapGamut=self.config.doRemapGamut,
gamutMethod=self.config.gamutMethod,
)
# Find the dataset type and thus the maximum values as well
maxVal: int | float
match self.config.arrayType:
case "uint8":
dtype = np.uint8
maxVal = 255
case "uint16":
dtype = np.uint16
maxVal = 65535
case "half":
dtype = np.half
maxVal = 1.0
case "float":
dtype = np.float32
maxVal = 1.0
case _:
assert True, "This code path should be unreachable"
# lsstRGB returns an image in 0-1 scale it to the maximum value
colorImage *= maxVal # type: ignore
# pack the joint mask back into a mask object
lsstMask = Mask(width=jointMask.shape[1], height=jointMask.shape[0], planeDefs=maskDict)
lsstMask.array = jointMask # type: ignore
return Struct(outputRGB=colorImage.astype(dtype), outputRGBMask=lsstMask) # type: ignore
def runQuantum(
self,
butlerQC: QuantumContext,
inputRefs: InputQuantizedConnection,
outputRefs: OutputQuantizedConnection,
) -> None:
imageRefs: list[DatasetRef] = inputRefs.inputCoadds
sortedImages = self.makeInputsFromRefs(imageRefs, butlerQC)
outputs = self.run(sortedImages)
butlerQC.put(outputs, outputRefs)
def makeInputsFromRefs(
self, refs: Iterable[DatasetRef], butler: Butler | QuantumContext
) -> dict[str, Exposure]:
r
| lsst.pipe.tasks.prettyPictureMaker._task.results : `Struct` of `Mapping` of `str` to `ExposureF` |
| lsst.pipe.tasks.prettyPictureMaker._task.slices : `list` of `tuple` |