39import lsst.pipe.base.connectionTypes
as cT
44from .forcedMeasurement
import ForcedMeasurementTask
45from .applyApCorr
import ApplyApCorrTask
46from .catalogCalculation
import CatalogCalculationTask
47from ._id_generator
import DetectorVisitIdGeneratorConfig
49__all__ = (
"ForcedPhotCcdConfig",
"ForcedPhotCcdTask",
50 "ForcedPhotCcdFromDataFrameTask",
"ForcedPhotCcdFromDataFrameConfig")
54 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
55 defaultTemplates={
"inputCoaddName":
"deep",
56 "inputName":
"calexp"}):
57 inputSchema = cT.InitInput(
58 doc=
"Schema for the input measurement catalogs.",
59 name=
"{inputCoaddName}Coadd_ref_schema",
60 storageClass=
"SourceCatalog",
62 outputSchema = cT.InitOutput(
63 doc=
"Schema for the output forced measurement catalogs.",
64 name=
"forced_src_schema",
65 storageClass=
"SourceCatalog",
68 doc=
"Input exposure to perform photometry on.",
70 storageClass=
"ExposureF",
71 dimensions=[
"instrument",
"visit",
"detector"],
74 doc=
"Catalog of shapes and positions at which to force photometry.",
75 name=
"{inputCoaddName}Coadd_ref",
76 storageClass=
"SourceCatalog",
77 dimensions=[
"skymap",
"tract",
"patch"],
82 doc=
"SkyMap dataset that defines the coordinate system of the reference catalog.",
83 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
84 storageClass=
"SkyMap",
85 dimensions=[
"skymap"],
88 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
90 storageClass=
"Background",
91 dimensions=(
"instrument",
"visit",
"detector"),
93 visitSummary = cT.Input(
94 doc=
"Input visit-summary catalog with updated calibration objects.",
95 name=
"finalVisitSummary",
96 storageClass=
"ExposureCatalog",
97 dimensions=(
"instrument",
"visit"),
100 doc=
"Output forced photometry catalog.",
102 storageClass=
"SourceCatalog",
103 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
106 def __init__(self, *, config=None):
107 super().__init__(config=config)
108 if not config.doApplySkyCorr:
110 if not config.useVisitSummary:
111 del self.visitSummary
112 if config.refCatStorageClass !=
"SourceCatalog":
116 self.refCat = dataclasses.replace(self.refCat, storageClass=config.refCatStorageClass)
119class ForcedPhotCcdConfig(pipeBase.PipelineTaskConfig,
120 pipelineConnections=ForcedPhotCcdConnections):
121 """Config class for forced measurement driver task."""
123 target=ForcedMeasurementTask,
124 doc=
"subtask to do forced measurement"
127 doc=
"coadd name: typically one of deep or goodSeeing",
134 doc=
"Run subtask to apply aperture corrections"
137 target=ApplyApCorrTask,
138 doc=
"Subtask to apply aperture corrections"
141 target=CatalogCalculationTask,
142 doc=
"Subtask to run catalogCalculation plugins on catalog"
147 doc=
"Apply sky correction?",
153 "Use updated WCS, PhotoCalib, ApCorr, and PSF from visit summary? "
154 "This should be False if and only if the input image already has the best-available calibration "
161 "SourceCatalog":
"Read an lsst.afw.table.SourceCatalog.",
162 "DataFrame":
"Read a pandas.DataFrame.",
163 "ArrowAstropy":
"Read an astropy.table.Table saved to Parquet.",
165 default=
"SourceCatalog",
167 "The butler storage class for the refCat connection. "
168 "If set to something other than 'SourceCatalog', the "
169 "'inputSchema' connection will be ignored."
174 default=
"diaObjectId",
176 "Name of the column that provides the object ID from the refCat connection. "
177 "measurement.copyColumns['id'] must be set to this value as well."
178 "Ignored if refCatStorageClass='SourceCatalog'."
185 "Name of the column that provides the right ascension (in floating-point degrees) from the "
186 "refCat connection. "
187 "Ignored if refCatStorageClass='SourceCatalog'."
194 "Name of the column that provides the declination (in floating-point degrees) from the "
195 "refCat connection. "
196 "Ignored if refCatStorageClass='SourceCatalog'."
203 doc=
"Add photometric calibration variance to warp variance plane?",
204 deprecated=
"Deprecated and unused; will be removed after v29.",
208 doc=
"Where to obtain footprints to install in the measurement catalog, prior to measurement.",
210 "transformed":
"Transform footprints from the reference catalog (downgrades HeavyFootprints).",
211 "psf": (
"Use the scaled shape of the PSF at the position of each source (does not generate "
212 "HeavyFootprints)."),
215 default=
"transformed",
219 doc=
"Scaling factor to apply to the PSF shape when footprintSource='psf' (ignored otherwise).",
222 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
224 def setDefaults(self):
226 super().setDefaults()
229 self.measurement.doReplaceWithNoise =
False
232 self.measurement.plugins.names = [
"base_PixelFlags",
233 "base_TransformedCentroid",
235 "base_LocalBackground",
236 "base_LocalPhotoCalib",
239 self.measurement.slots.psfFlux =
"base_PsfFlux"
240 self.measurement.slots.shape =
None
243 self.catalogCalculation.plugins.names = []
247 if self.refCatStorageClass !=
"SourceCatalog":
248 if self.footprintSource ==
"transformed":
249 raise ValueError(
"Cannot transform footprints from reference catalog, because "
250 f
"{self.config.refCatStorageClass} datasets can't hold footprints.")
251 if self.measurement.copyColumns[
"id"] != self.refCatIdColumn:
253 f
"measurement.copyColumns['id'] should be set to {self.refCatIdColumn} "
254 f
"(refCatIdColumn) when refCatStorageClass={self.refCatStorageClass}."
257 def configureParquetRefCat(self, refCatStorageClass: str =
"ArrowAstropy"):
258 """Set the refCatStorageClass option to a Parquet-based type, and
259 reconfigure the measurement subtask and footprintSources accordingly.
261 self.refCatStorageClass = refCatStorageClass
262 self.footprintSource =
"psf"
263 self.measurement.doReplaceWithNoise =
False
264 self.measurement.plugins.names -= {
"base_TransformedCentroid"}
265 self.measurement.plugins.names |= {
"base_TransformedCentroidFromCoord"}
266 self.measurement.copyColumns[
"id"] = self.refCatIdColumn
267 self.measurement.copyColumns.pop(
"deblend_nChild",
None)
268 self.measurement.slots.centroid =
"base_TransformedCentroidFromCoord"
271class ForcedPhotCcdTask(pipeBase.PipelineTask):
272 """A pipeline task for performing forced measurement on CCD images.
276 refSchema : `lsst.afw.table.Schema`, optional
277 The schema of the reference catalog, passed to the constructor of the
278 references subtask. Optional, but must be specified if ``initInputs``
279 is not; if both are specified, ``initInputs`` takes precedence.
281 Dictionary that can contain a key ``inputSchema`` containing the
282 schema. If present will override the value of ``refSchema``.
284 Keyword arguments are passed to the supertask constructor.
287 ConfigClass = ForcedPhotCcdConfig
288 _DefaultName =
"forcedPhotCcd"
291 def __init__(self, refSchema=None, initInputs=None, **kwargs):
292 super().__init__(**kwargs)
295 refSchema = initInputs[
'inputSchema'].schema
297 if refSchema
is None:
300 self.makeSubtask(
"measurement", refSchema=refSchema)
304 if self.config.doApCorr:
305 self.makeSubtask(
"applyApCorr", schema=self.measurement.schema)
306 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
309 def runQuantum(self, butlerQC, inputRefs, outputRefs):
310 inputs = butlerQC.get(inputRefs)
312 tract = butlerQC.quantum.dataId[
'tract']
313 skyMap = inputs.pop(
'skyMap')
314 inputs[
'refWcs'] = skyMap[tract].getWcs()
317 skyCorr = inputs.pop(
'skyCorr',
None)
319 inputs[
'exposure'] = self.prepareCalibratedExposure(
322 visitSummary=inputs.pop(
"visitSummary",
None),
325 if inputs[
"exposure"].getWcs()
is None:
326 raise NoWorkFound(
"Exposure has no WCS.")
328 match self.config.refCatStorageClass:
329 case
"SourceCatalog":
330 prepFunc = self._prepSourceCatalogRefCat
332 prepFunc = self._prepDataFrameRefCat
334 prepFunc = self._prepArrowAstropyRefCat
336 raise AssertionError(
"Configuration should not have passed validation.")
337 self.log.info(
"Filtering ref cats: %s",
','.join([str(i.dataId)
for i
in inputs[
'refCat']]))
338 inputs[
'refCat'] = prepFunc(
340 inputs[
'exposure'].getBBox(),
341 inputs[
'exposure'].getWcs(),
345 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(
346 inputRefs.exposure.dataId, inputs[
'exposure'], inputs[
'refCat'], inputs[
'refWcs']
350 self.attachFootprints(inputs[
"measCat"], inputs[
"refCat"], inputs[
"exposure"], inputs[
"refWcs"])
351 outputs = self.run(**inputs)
352 butlerQC.put(outputs, outputRefs)
354 def prepareCalibratedExposure(self, exposure, skyCorr=None, visitSummary=None):
355 """Prepare a calibrated exposure and apply external calibrations
356 and sky corrections if so configured.
360 exposure : `lsst.afw.image.exposure.Exposure`
361 Input exposure to adjust calibrations.
362 skyCorr : `lsst.afw.math.backgroundList`, optional
363 Sky correction frame to apply if doApplySkyCorr=True.
364 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
365 Exposure catalog with update calibrations; any not-None calibration
366 objects attached will be used. These are applied first and may be
367 overridden by other arguments.
371 exposure : `lsst.afw.image.exposure.Exposure`
372 Exposure with adjusted calibrations.
374 detectorId = exposure.getInfo().getDetector().getId()
376 if visitSummary
is not None:
377 row = visitSummary.find(detectorId)
379 raise RuntimeError(f
"Detector id {detectorId} not found in visitSummary.")
380 if (photoCalib := row.getPhotoCalib())
is not None:
381 exposure.setPhotoCalib(photoCalib)
382 if (skyWcs := row.getWcs())
is not None:
383 exposure.setWcs(skyWcs)
384 if (psf := row.getPsf())
is not None:
386 if (apCorrMap := row.getApCorrMap())
is not None:
387 exposure.info.setApCorrMap(apCorrMap)
389 if skyCorr
is not None:
390 exposure.maskedImage -= skyCorr.getImage()
394 def generateMeasCat(self, dataId, exposure, refCat, refWcs):
395 """Generate a measurement catalog.
399 dataId : `lsst.daf.butler.DataCoordinate`
400 Butler data ID for this image, with ``{visit, detector}`` keys.
401 exposure : `lsst.afw.image.exposure.Exposure`
402 Exposure to generate the catalog for.
403 refCat : `lsst.afw.table.SourceCatalog`
404 Catalog of shapes and positions at which to force photometry.
405 refWcs : `lsst.afw.image.SkyWcs`
406 Reference world coordinate system.
407 This parameter is not currently used.
411 measCat : `lsst.afw.table.SourceCatalog`
412 Catalog of forced sources to measure.
414 Unique binary id associated with the input exposure
416 id_generator = self.config.idGenerator.apply(dataId)
417 measCat = self.measurement.generateMeasCat(exposure, refCat, refWcs,
418 idFactory=id_generator.make_table_id_factory())
419 return measCat, id_generator.catalog_id
421 def run(self, measCat, exposure, refCat, refWcs, exposureId=None):
422 """Perform forced measurement on a single exposure.
426 measCat : `lsst.afw.table.SourceCatalog`
427 The measurement catalog, based on the sources listed in the
429 exposure : `lsst.afw.image.Exposure`
430 The measurement image upon which to perform forced detection.
431 refCat : `lsst.afw.table.SourceCatalog`
432 The reference catalog of sources to measure.
433 refWcs : `lsst.afw.image.SkyWcs`
434 The WCS for the references.
436 Optional unique exposureId used for random seed in measurement
441 result : `lsst.pipe.base.Struct`
442 Structure with fields:
445 Catalog of forced measurement results
446 (`lsst.afw.table.SourceCatalog`).
448 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
449 if self.config.doApCorr:
450 apCorrMap = exposure.getInfo().getApCorrMap()
451 if apCorrMap
is None:
452 self.log.warning(
"Forced exposure image does not have valid aperture correction; skipping.")
454 self.applyApCorr.run(
458 self.catalogCalculation.run(measCat)
460 return pipeBase.Struct(measCat=measCat)
462 def attachFootprints(self, sources, refCat, exposure, refWcs):
463 """Attach footprints to blank sources prior to measurements.
467 `~lsst.afw.detection.Footprint` objects for forced photometry must
468 be in the pixel coordinate system of the image being measured, while
469 the actual detections may start out in a different coordinate system.
471 Subclasses of this class may implement this method to define how
472 those `~lsst.afw.detection.Footprint` objects should be generated.
474 This default implementation transforms depends on the
475 ``footprintSource`` configuration parameter.
477 if self.config.footprintSource ==
"transformed":
478 return self.measurement.attachTransformedFootprints(sources, refCat, exposure, refWcs)
479 elif self.config.footprintSource ==
"psf":
480 return self.measurement.attachPsfShapeFootprints(sources, exposure,
481 scaling=self.config.psfFootprintScaling)
483 def _prepSourceCatalogRefCat(self, refCatHandles, exposureBBox, exposureWcs):
484 """Prepare a merged, filtered reference catalog from SourceCatalog
489 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
490 Handles for catalogs of shapes and positions at which to force
492 exposureBBox : `lsst.geom.Box2I`
493 Bounding box on which to select rows that overlap
494 exposureWcs : `lsst.afw.geom.SkyWcs`
495 World coordinate system to convert sky coords in ref cat to
496 pixel coords with which to compare with exposureBBox
500 refSources : `lsst.afw.table.SourceCatalog`
501 Filtered catalog of forced sources to measure.
505 The majority of this code is based on the methods of
506 lsst.meas.algorithms.loadReferenceObjects.ReferenceObjectLoader
514 expBoxCorners = expBBox.getCorners()
515 expSkyCorners = [exposureWcs.pixelToSky(corner).getVector()
for corner
in expBoxCorners]
523 for refCat
in refCatHandles:
524 refCat = refCat.get()
525 if mergedRefCat
is None:
528 for record
in refCat:
530 expPolygon.contains(record.getCoord().getVector())
and record.getParent()
533 record.setFootprint(record.getFootprint())
534 mergedRefCat.append(record)
535 containedIds.add(record.getId())
536 if mergedRefCat
is None:
537 raise RuntimeError(
"No reference objects for forced photometry.")
541 def _prepDataFrameRefCat(self, refCatHandles, exposureBBox, exposureWcs):
542 """Prepare a merged, filtered reference catalog from DataFrame
547 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
548 Handles for catalogs of shapes and positions at which to force
550 exposureBBox : `lsst.geom.Box2I`
551 Bounding box on which to select rows that overlap
552 exposureWcs : `lsst.afw.geom.SkyWcs`
553 World coordinate system to convert sky coords in ref cat to
554 pixel coords with which to compare with exposureBBox
558 refCat : `lsst.afw.table.SourceTable`
559 Source Catalog with minimal schema that overlaps exposureBBox
565 self.config.refCatIdColumn,
566 self.config.refCatRaColumn,
567 self.config.refCatDecColumn,
571 for i
in refCatHandles
573 df = pd.concat(dfList)
576 mapping = exposureWcs.getTransform().getMapping()
577 x, y = mapping.applyInverse(
578 np.array(df[[self.config.refCatRaColumn, self.config.refCatDecColumn]].values*2*np.pi/360).T
581 refCat = self._makeMinimalSourceCatalogFromDataFrame(df[inBBox])
584 def _prepArrowAstropyRefCat(self, refCatHandles, exposureBBox, exposureWcs):
585 """Prepare a merged, filtered reference catalog from ArrowAstropy
590 refCatHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle`
591 Handles for catalogs of shapes and positions at which to force
593 exposureBBox : `lsst.geom.Box2I`
594 Bounding box on which to select rows that overlap
595 exposureWcs : `lsst.afw.geom.SkyWcs`
596 World coordinate system to convert sky coords in ref cat to
597 pixel coords with which to compare with exposureBBox
601 refCat : `lsst.afw.table.SourceTable`
602 Source Catalog with minimal schema that overlaps exposureBBox
608 self.config.refCatIdColumn,
609 self.config.refCatRaColumn,
610 self.config.refCatDecColumn,
614 for i
in refCatHandles
616 full_table = astropy.table.vstack(table_list)
619 mapping = exposureWcs.getTransform().getMapping()
620 ra_dec_rad = np.zeros((2, len(full_table)), dtype=float)
621 ra_dec_rad[0, :] = full_table[self.config.refCatRaColumn]
622 ra_dec_rad[1, :] = full_table[self.config.refCatDecColumn]
623 ra_dec_rad *= np.pi/180.0
624 x, y = mapping.applyInverse(ra_dec_rad)
626 refCat = self._makeMinimalSourceCatalogFromAstropy(full_table[inBBox])
629 def _makeMinimalSourceCatalogFromDataFrame(self, df):
630 """Create minimal schema SourceCatalog from a pandas DataFrame.
632 The forced measurement subtask expects this as input.
636 df : `pandas.DataFrame`
637 Table with locations and ids.
641 outputCatalog : `lsst.afw.table.SourceTable`
642 Output catalog with minimal schema.
646 outputCatalog.reserve(len(df))
648 for objectId, ra, dec
in df[[
'ra',
'dec']].itertuples():
649 outputRecord = outputCatalog.addNew()
650 outputRecord.setId(objectId)
654 def _makeMinimalSourceCatalogFromAstropy(self, table):
655 """Create minimal schema SourceCatalog from an Astropy Table.
657 The forced measurement subtask expects this as input.
661 table : `astropy.table.Table`
662 Table with locations and ids.
666 outputCatalog : `lsst.afw.table.SourceTable`
667 Output catalog with minimal schema.
671 outputCatalog.reserve(len(table))
673 for objectId, ra, dec
in table.iterrows():
674 outputRecord = outputCatalog.addNew()
675 outputRecord.setId(objectId)
681 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
682 defaultTemplates={
"inputCoaddName":
"goodSeeing",
683 "inputName":
"calexp",
688class ForcedPhotCcdFromDataFrameConfig(ForcedPhotCcdConfig,
689 pipelineConnections=ForcedPhotCcdFromDataFrameConnections):
690 def setDefaults(self):
691 super().setDefaults()
692 self.configureParquetRefCat(
"DataFrame")
693 self.connections.refCat =
"{inputCoaddName}Diff_fullDiaObjTable"
694 self.connections.outputSchema =
"forced_src_diaObject_schema"
695 self.connections.measCat =
"forced_src_diaObject"
698class ForcedPhotCcdFromDataFrameTask(ForcedPhotCcdTask):
699 """Force Photometry on a per-detector exposure with coords from a DataFrame
701 Uses input from a DataFrame instead of SourceCatalog
702 like the base class ForcedPhotCcd does.
703 Writes out a SourceCatalog so that the downstream
704 WriteForcedSourceTableTask can be reused with output from this Task.
706 _DefaultName =
"forcedPhotCcdFromDataFrame"
707 ConfigClass = ForcedPhotCcdFromDataFrameConfig
static Schema makeMinimalSchema()
Return a minimal schema for Source tables and records.
static Key< RecordId > getParentKey()
Key for the parent ID.
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
ConvexPolygon is a closed convex polygon on the unit sphere.