39import lsst.pipe.base.connectionTypes
as cT
44from .forcedMeasurement
import ForcedMeasurementTask
45from .applyApCorr
import ApplyApCorrTask
46from .catalogCalculation
import CatalogCalculationTask
47from ._id_generator
import DetectorVisitIdGeneratorConfig
49__all__ = (
"ForcedPhotCcdConfig",
"ForcedPhotCcdTask",
50 "ForcedPhotCcdFromDataFrameTask",
"ForcedPhotCcdFromDataFrameConfig")
54 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
55 defaultTemplates={
"inputCoaddName":
"deep",
56 "inputName":
"calexp"}):
57 inputSchema = cT.InitInput(
58 doc=
"Schema for the input measurement catalogs.",
59 name=
"{inputCoaddName}Coadd_ref_schema",
60 storageClass=
"SourceCatalog",
62 outputSchema = cT.InitOutput(
63 doc=
"Schema for the output forced measurement catalogs.",
64 name=
"forced_src_schema",
65 storageClass=
"SourceCatalog",
68 doc=
"Input exposure to perform photometry on.",
70 storageClass=
"ExposureF",
71 dimensions=[
"instrument",
"visit",
"detector"],
74 doc=
"Catalog of shapes and positions at which to force photometry.",
75 name=
"{inputCoaddName}Coadd_ref",
76 storageClass=
"SourceCatalog",
77 dimensions=[
"skymap",
"tract",
"patch"],
82 doc=
"SkyMap dataset that defines the coordinate system of the reference catalog.",
83 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
84 storageClass=
"SkyMap",
85 dimensions=[
"skymap"],
88 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
90 storageClass=
"Background",
91 dimensions=(
"instrument",
"visit",
"detector"),
93 visitSummary = cT.Input(
94 doc=
"Input visit-summary catalog with updated calibration objects.",
95 name=
"finalVisitSummary",
96 storageClass=
"ExposureCatalog",
97 dimensions=(
"instrument",
"visit"),
100 doc=
"Output forced photometry catalog.",
102 storageClass=
"SourceCatalog",
103 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
106 def __init__(self, *, config=None):
107 super().__init__(config=config)
108 if not config.doApplySkyCorr:
110 if not config.useVisitSummary:
111 del self.visitSummary
112 if config.refCatStorageClass !=
"SourceCatalog":
116 self.refCat = dataclasses.replace(self.refCat, storageClass=config.refCatStorageClass)
119class ForcedPhotCcdConfig(pipeBase.PipelineTaskConfig,
120 pipelineConnections=ForcedPhotCcdConnections):
121 """Config class for forced measurement driver task."""
123 target=ForcedMeasurementTask,
124 doc=
"subtask to do forced measurement"
127 doc=
"coadd name: typically one of deep or goodSeeing",
134 doc=
"Run subtask to apply aperture corrections"
137 target=ApplyApCorrTask,
138 doc=
"Subtask to apply aperture corrections"
141 target=CatalogCalculationTask,
142 doc=
"Subtask to run catalogCalculation plugins on catalog"
147 doc=
"Apply sky correction?",
153 "Use updated WCS, PhotoCalib, ApCorr, and PSF from visit summary? "
154 "This should be False if and only if the input image already has the best-available calibration "
161 "SourceCatalog":
"Read an lsst.afw.table.SourceCatalog.",
162 "DataFrame":
"Read a pandas.DataFrame.",
163 "ArrowAstropy":
"Read an astropy.table.Table saved to Parquet.",
165 default=
"SourceCatalog",
167 "The butler storage class for the refCat connection. "
168 "If set to something other than 'SourceCatalog', the "
169 "'inputSchema' connection will be ignored."
174 default=
"diaObjectId",
176 "Name of the column that provides the object ID from the refCat connection. "
177 "measurement.copyColumns['id'] must be set to this value as well."
178 "Ignored if refCatStorageClass='SourceCatalog'."
185 "Name of the column that provides the right ascension (in floating-point degrees) from the "
186 "refCat connection. "
187 "Ignored if refCatStorageClass='SourceCatalog'."
194 "Name of the column that provides the declination (in floating-point degrees) from the "
195 "refCat connection. "
196 "Ignored if refCatStorageClass='SourceCatalog'."
202 doc=
"Add photometric calibration variance to warp variance plane?",
206 doc=
"Where to obtain footprints to install in the measurement catalog, prior to measurement.",
208 "transformed":
"Transform footprints from the reference catalog (downgrades HeavyFootprints).",
209 "psf": (
"Use the scaled shape of the PSF at the position of each source (does not generate "
210 "HeavyFootprints)."),
213 default=
"transformed",
217 doc=
"Scaling factor to apply to the PSF shape when footprintSource='psf' (ignored otherwise).",
220 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
222 def setDefaults(self):
224 super().setDefaults()
227 self.measurement.doReplaceWithNoise =
False
230 self.measurement.plugins.names = [
"base_PixelFlags",
231 "base_TransformedCentroid",
233 "base_LocalBackground",
234 "base_LocalPhotoCalib",
237 self.measurement.slots.psfFlux =
"base_PsfFlux"
238 self.measurement.slots.shape =
None
241 self.catalogCalculation.plugins.names = []
245 if self.refCatStorageClass !=
"SourceCatalog":
246 if self.footprintSource ==
"transformed":
247 raise ValueError(
"Cannot transform footprints from reference catalog, because "
248 f
"{self.config.refCatStorageClass} datasets can't hold footprints.")
249 if self.measurement.copyColumns[
"id"] != self.refCatIdColumn:
251 f
"measurement.copyColumns['id'] should be set to {self.refCatIdColumn} "
252 f
"(refCatIdColumn) when refCatStorageClass={self.refCatStorageClass}."
255 def configureParquetRefCat(self, refCatStorageClass: str =
"ArrowAstropy"):
256 """Set the refCatStorageClass option to a Parquet-based type, and
257 reconfigure the measurement subtask and footprintSources accordingly.
259 self.refCatStorageClass = refCatStorageClass
260 self.footprintSource =
"psf"
261 self.measurement.doReplaceWithNoise =
False
262 self.measurement.plugins.names -= {
"base_TransformedCentroid"}
263 self.measurement.plugins.names |= {
"base_TransformedCentroidFromCoord"}
264 self.measurement.copyColumns[
"id"] = self.refCatIdColumn
265 self.measurement.copyColumns.pop(
"deblend_nChild",
None)
266 self.measurement.slots.centroid =
"base_TransformedCentroidFromCoord"
269class ForcedPhotCcdTask(pipeBase.PipelineTask):
270 """A pipeline task for performing forced measurement on CCD images.
274 refSchema : `lsst.afw.table.Schema`, optional
275 The schema of the reference catalog, passed to the constructor of the
276 references subtask. Optional, but must be specified if ``initInputs``
277 is not; if both are specified, ``initInputs`` takes precedence.
279 Dictionary that can contain a key ``inputSchema`` containing the
280 schema. If present will override the value of ``refSchema``.
282 Keyword arguments are passed to the supertask constructor.
285 ConfigClass = ForcedPhotCcdConfig
286 _DefaultName =
"forcedPhotCcd"
289 def __init__(self, refSchema=None, initInputs=None, **kwargs):
290 super().__init__(**kwargs)
293 refSchema = initInputs[
'inputSchema'].schema
295 if refSchema
is None:
298 self.makeSubtask(
"measurement", refSchema=refSchema)
302 if self.config.doApCorr:
303 self.makeSubtask(
"applyApCorr", schema=self.measurement.schema)
304 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
307 def runQuantum(self, butlerQC, inputRefs, outputRefs):
308 inputs = butlerQC.get(inputRefs)
310 tract = butlerQC.quantum.dataId[
'tract']
311 skyMap = inputs.pop(
'skyMap')
312 inputs[
'refWcs'] = skyMap[tract].getWcs()
315 skyCorr = inputs.pop(
'skyCorr',
None)
317 inputs[
'exposure'] = self.prepareCalibratedExposure(
320 visitSummary=inputs.pop(
"visitSummary",
None),
323 if inputs[
"exposure"].getWcs()
is None:
324 raise NoWorkFound(
"Exposure has no WCS.")
326 match self.config.refCatStorageClass:
327 case
"SourceCatalog":
328 prepFunc = self._prepSourceCatalogRefCat
330 prepFunc = self._prepDataFrameRefCat
332 prepFunc = self._prepArrowAstropyRefCat
334 raise AssertionError(
"Configuration should not have passed validation.")
335 self.log.info(
"Filtering ref cats: %s",
','.join([str(i.dataId)
for i
in inputs[
'refCat']]))
336 inputs[
'refCat'] = prepFunc(
338 inputs[
'exposure'].getBBox(),
339 inputs[
'exposure'].getWcs(),
343 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(
344 inputRefs.exposure.dataId, inputs[
'exposure'], inputs[
'refCat'], inputs[
'refWcs']
348 self.attachFootprints(inputs[
"measCat"], inputs[
"refCat"], inputs[
"exposure"], inputs[
"refWcs"])
349 outputs = self.run(**inputs)
350 butlerQC.put(outputs, outputRefs)
352 def prepareCalibratedExposure(self, exposure, skyCorr=None, visitSummary=None):
353 """Prepare a calibrated exposure and apply external calibrations
354 and sky corrections if so configured.
358 exposure : `lsst.afw.image.exposure.Exposure`
359 Input exposure to adjust calibrations.
360 skyCorr : `lsst.afw.math.backgroundList`, optional
361 Sky correction frame to apply if doApplySkyCorr=True.
362 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
363 Exposure catalog with update calibrations; any not-None calibration
364 objects attached will be used. These are applied first and may be
365 overridden by other arguments.
369 exposure : `lsst.afw.image.exposure.Exposure`
370 Exposure with adjusted calibrations.
372 detectorId = exposure.getInfo().getDetector().getId()
374 if visitSummary
is not None:
375 row = visitSummary.find(detectorId)
377 raise RuntimeError(f
"Detector id {detectorId} not found in visitSummary.")
378 if (photoCalib := row.getPhotoCalib())
is not None:
379 exposure.setPhotoCalib(photoCalib)
380 if (skyWcs := row.getWcs())
is not None:
381 exposure.setWcs(skyWcs)
382 if (psf := row.getPsf())
is not None:
384 if (apCorrMap := row.getApCorrMap())
is not None:
385 exposure.info.setApCorrMap(apCorrMap)
387 if skyCorr
is not None:
388 exposure.maskedImage -= skyCorr.getImage()
392 def generateMeasCat(self, dataId, exposure, refCat, refWcs):
393 """Generate a measurement catalog.
397 dataId : `lsst.daf.butler.DataCoordinate`
398 Butler data ID for this image, with ``{visit, detector}`` keys.
399 exposure : `lsst.afw.image.exposure.Exposure`
400 Exposure to generate the catalog for.
401 refCat : `lsst.afw.table.SourceCatalog`
402 Catalog of shapes and positions at which to force photometry.
403 refWcs : `lsst.afw.image.SkyWcs`
404 Reference world coordinate system.
405 This parameter is not currently used.
409 measCat : `lsst.afw.table.SourceCatalog`
410 Catalog of forced sources to measure.
412 Unique binary id associated with the input exposure
414 id_generator = self.config.idGenerator.apply(dataId)
415 measCat = self.measurement.generateMeasCat(exposure, refCat, refWcs,
416 idFactory=id_generator.make_table_id_factory())
417 return measCat, id_generator.catalog_id
419 def run(self, measCat, exposure, refCat, refWcs, exposureId=None):
420 """Perform forced measurement on a single exposure.
424 measCat : `lsst.afw.table.SourceCatalog`
425 The measurement catalog, based on the sources listed in the
427 exposure : `lsst.afw.image.Exposure`
428 The measurement image upon which to perform forced detection.
429 refCat : `lsst.afw.table.SourceCatalog`
430 The reference catalog of sources to measure.
431 refWcs : `lsst.afw.image.SkyWcs`
432 The WCS for the references.
434 Optional unique exposureId used for random seed in measurement
439 result : `lsst.pipe.base.Struct`
440 Structure with fields:
443 Catalog of forced measurement results
444 (`lsst.afw.table.SourceCatalog`).
446 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
447 if self.config.doApCorr:
448 apCorrMap = exposure.getInfo().getApCorrMap()
449 if apCorrMap
is None:
450 self.log.warning(
"Forced exposure image does not have valid aperture correction; skipping.")
452 self.applyApCorr.run(
456 self.catalogCalculation.run(measCat)
458 return pipeBase.Struct(measCat=measCat)
460 def attachFootprints(self, sources, refCat, exposure, refWcs):
461 """Attach footprints to blank sources prior to measurements.
465 `~lsst.afw.detection.Footprint` objects for forced photometry must
466 be in the pixel coordinate system of the image being measured, while
467 the actual detections may start out in a different coordinate system.
469 Subclasses of this class may implement this method to define how
470 those `~lsst.afw.detection.Footprint` objects should be generated.
472 This default implementation transforms depends on the
473 ``footprintSource`` configuration parameter.
475 if self.config.footprintSource ==
"transformed":
476 return self.measurement.attachTransformedFootprints(sources, refCat, exposure, refWcs)
477 elif self.config.footprintSource ==
"psf":
478 return self.measurement.attachPsfShapeFootprints(sources, exposure,
479 scaling=self.config.psfFootprintScaling)
481 def _prepSourceCatalogRefCat(self, refCatHandles, exposureBBox, exposureWcs):
482 """Prepare a merged, filtered reference catalog from SourceCatalog
487 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
488 Handles for catalogs of shapes and positions at which to force
490 exposureBBox : `lsst.geom.Box2I`
491 Bounding box on which to select rows that overlap
492 exposureWcs : `lsst.afw.geom.SkyWcs`
493 World coordinate system to convert sky coords in ref cat to
494 pixel coords with which to compare with exposureBBox
498 refSources : `lsst.afw.table.SourceCatalog`
499 Filtered catalog of forced sources to measure.
503 The majority of this code is based on the methods of
504 lsst.meas.algorithms.loadReferenceObjects.ReferenceObjectLoader
512 expBoxCorners = expBBox.getCorners()
513 expSkyCorners = [exposureWcs.pixelToSky(corner).getVector()
for corner
in expBoxCorners]
521 for refCat
in refCatHandles:
522 refCat = refCat.get()
523 if mergedRefCat
is None:
526 for record
in refCat:
528 expPolygon.contains(record.getCoord().getVector())
and record.getParent()
531 record.setFootprint(record.getFootprint())
532 mergedRefCat.append(record)
533 containedIds.add(record.getId())
534 if mergedRefCat
is None:
535 raise RuntimeError(
"No reference objects for forced photometry.")
539 def _prepDataFrameRefCat(self, refCatHandles, exposureBBox, exposureWcs):
540 """Prepare a merged, filtered reference catalog from DataFrame
545 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
546 Handles for catalogs of shapes and positions at which to force
548 exposureBBox : `lsst.geom.Box2I`
549 Bounding box on which to select rows that overlap
550 exposureWcs : `lsst.afw.geom.SkyWcs`
551 World coordinate system to convert sky coords in ref cat to
552 pixel coords with which to compare with exposureBBox
556 refCat : `lsst.afw.table.SourceTable`
557 Source Catalog with minimal schema that overlaps exposureBBox
563 self.config.refCatIdColumn,
564 self.config.refCatRaColumn,
565 self.config.refCatDecColumn,
569 for i
in refCatHandles
571 df = pd.concat(dfList)
574 mapping = exposureWcs.getTransform().getMapping()
575 x, y = mapping.applyInverse(
576 np.array(df[[self.config.refCatRaColumn, self.config.refCatDecColumn]].values*2*np.pi/360).T
579 refCat = self._makeMinimalSourceCatalogFromDataFrame(df[inBBox])
582 def _prepArrowAstropyRefCat(self, refCatHandles, exposureBBox, exposureWcs):
583 """Prepare a merged, filtered reference catalog from ArrowAstropy
588 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
589 Handles for catalogs of shapes and positions at which to force
591 exposureBBox : `lsst.geom.Box2I`
592 Bounding box on which to select rows that overlap
593 exposureWcs : `lsst.afw.geom.SkyWcs`
594 World coordinate system to convert sky coords in ref cat to
595 pixel coords with which to compare with exposureBBox
599 refCat : `lsst.afw.table.SourceTable`
600 Source Catalog with minimal schema that overlaps exposureBBox
606 self.config.refCatIdColumn,
607 self.config.refCatRaColumn,
608 self.config.refCatDecColumn,
612 for i
in refCatHandles
614 full_table = astropy.table.vstack(table_list)
617 mapping = exposureWcs.getTransform().getMapping()
618 ra_dec_rad = np.zeros((2, len(full_table)), dtype=float)
619 ra_dec_rad[0, :] = full_table[self.config.refCatRaColumn]
620 ra_dec_rad[1, :] = full_table[self.config.refCatDecColumn]
621 ra_dec_rad *= np.pi/180.0
622 x, y = mapping.applyInverse(ra_dec_rad)
624 refCat = self._makeMinimalSourceCatalogFromAstropy(full_table[inBBox])
627 def _makeMinimalSourceCatalogFromDataFrame(self, df):
628 """Create minimal schema SourceCatalog from a pandas DataFrame.
630 The forced measurement subtask expects this as input.
634 df : `pandas.DataFrame`
635 Table with locations and ids.
639 outputCatalog : `lsst.afw.table.SourceTable`
640 Output catalog with minimal schema.
644 outputCatalog.reserve(len(df))
646 for objectId, ra, dec
in df[[
'ra',
'dec']].itertuples():
647 outputRecord = outputCatalog.addNew()
648 outputRecord.setId(objectId)
652 def _makeMinimalSourceCatalogFromAstropy(self, table):
653 """Create minimal schema SourceCatalog from an Astropy Table.
655 The forced measurement subtask expects this as input.
659 table : `astropy.table.Table`
660 Table with locations and ids.
664 outputCatalog : `lsst.afw.table.SourceTable`
665 Output catalog with minimal schema.
669 outputCatalog.reserve(len(table))
672 outputRecord = outputCatalog.addNew()
673 outputRecord.setId(objectId)
679 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
680 defaultTemplates={
"inputCoaddName":
"goodSeeing",
681 "inputName":
"calexp",
686class ForcedPhotCcdFromDataFrameConfig(ForcedPhotCcdConfig,
687 pipelineConnections=ForcedPhotCcdFromDataFrameConnections):
688 def setDefaults(self):
689 super().setDefaults()
690 self.configureParquetRefCat(
"DataFrame")
691 self.connections.refCat =
"{inputCoaddName}Diff_fullDiaObjTable"
692 self.connections.outputSchema =
"forced_src_diaObject_schema"
693 self.connections.measCat =
"forced_src_diaObject"
696class ForcedPhotCcdFromDataFrameTask(ForcedPhotCcdTask):
697 """Force Photometry on a per-detector exposure with coords from a DataFrame
699 Uses input from a DataFrame instead of SourceCatalog
700 like the base class ForcedPhotCcd does.
701 Writes out a SourceCatalog so that the downstream
702 WriteForcedSourceTableTask can be reused with output from this Task.
704 _DefaultName =
"forcedPhotCcdFromDataFrame"
705 ConfigClass = ForcedPhotCcdFromDataFrameConfig
Tag types used to declare specialized field types.
static Schema makeMinimalSchema()
Return a minimal schema for Source tables and records.
static Key< RecordId > getParentKey()
Key for the parent ID.
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
ConvexPolygon is a closed convex polygon on the unit sphere.