36import lsst.pipe.base.connectionTypes
as cT
41from .forcedMeasurement
import ForcedMeasurementTask
42from .applyApCorr
import ApplyApCorrTask
43from .catalogCalculation
import CatalogCalculationTask
44from ._id_generator
import DetectorVisitIdGeneratorConfig
46__all__ = (
"ForcedPhotCcdConfig",
"ForcedPhotCcdTask",
47 "ForcedPhotCcdFromDataFrameTask",
"ForcedPhotCcdFromDataFrameConfig")
51 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
52 defaultTemplates={
"inputCoaddName":
"deep",
53 "inputName":
"calexp"}):
54 inputSchema = cT.InitInput(
55 doc=
"Schema for the input measurement catalogs.",
56 name=
"{inputCoaddName}Coadd_ref_schema",
57 storageClass=
"SourceCatalog",
59 outputSchema = cT.InitOutput(
60 doc=
"Schema for the output forced measurement catalogs.",
61 name=
"forced_src_schema",
62 storageClass=
"SourceCatalog",
65 doc=
"Input exposure to perform photometry on.",
67 storageClass=
"ExposureF",
68 dimensions=[
"instrument",
"visit",
"detector"],
71 doc=
"Catalog of shapes and positions at which to force photometry.",
72 name=
"{inputCoaddName}Coadd_ref",
73 storageClass=
"SourceCatalog",
74 dimensions=[
"skymap",
"tract",
"patch"],
79 doc=
"SkyMap dataset that defines the coordinate system of the reference catalog.",
80 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
81 storageClass=
"SkyMap",
82 dimensions=[
"skymap"],
85 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
87 storageClass=
"Background",
88 dimensions=(
"instrument",
"visit",
"detector"),
90 visitSummary = cT.Input(
91 doc=
"Input visit-summary catalog with updated calibration objects.",
92 name=
"finalVisitSummary",
93 storageClass=
"ExposureCatalog",
94 dimensions=(
"instrument",
"visit"),
97 doc=
"Output forced photometry catalog.",
99 storageClass=
"SourceCatalog",
100 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
103 def __init__(self, *, config=None):
104 super().__init__(config=config)
105 if not config.doApplySkyCorr:
106 self.inputs.remove(
"skyCorr")
109class ForcedPhotCcdConfig(pipeBase.PipelineTaskConfig,
110 pipelineConnections=ForcedPhotCcdConnections):
111 """Config class for forced measurement driver task."""
113 target=ForcedMeasurementTask,
114 doc=
"subtask to do forced measurement"
117 doc=
"coadd name: typically one of deep or goodSeeing",
124 doc=
"Run subtask to apply aperture corrections"
127 target=ApplyApCorrTask,
128 doc=
"Subtask to apply aperture corrections"
131 target=CatalogCalculationTask,
132 doc=
"Subtask to run catalogCalculation plugins on catalog"
137 doc=
"Apply sky correction?",
142 doc=
"Add photometric calibration variance to warp variance plane?",
146 doc=
"Where to obtain footprints to install in the measurement catalog, prior to measurement.",
148 "transformed":
"Transform footprints from the reference catalog (downgrades HeavyFootprints).",
149 "psf": (
"Use the scaled shape of the PSF at the position of each source (does not generate "
150 "HeavyFootprints)."),
153 default=
"transformed",
157 doc=
"Scaling factor to apply to the PSF shape when footprintSource='psf' (ignored otherwise).",
160 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
162 def setDefaults(self):
164 super().setDefaults()
167 self.measurement.doReplaceWithNoise =
False
170 self.measurement.plugins.names = [
"base_PixelFlags",
171 "base_TransformedCentroid",
173 "base_LocalBackground",
174 "base_LocalPhotoCalib",
177 self.measurement.slots.shape =
None
180 self.catalogCalculation.plugins.names = []
183class ForcedPhotCcdTask(pipeBase.PipelineTask):
184 """A pipeline task for performing forced measurement on CCD images.
188 refSchema : `lsst.afw.table.Schema`, optional
189 The schema of the reference catalog, passed to the constructor of the
190 references subtask. Optional, but must be specified if ``initInputs``
191 is not; if both are specified, ``initInputs`` takes precedence.
193 Dictionary that can contain a key ``inputSchema`` containing the
194 schema. If present will override the value of ``refSchema``.
196 Keyword arguments are passed to the supertask constructor.
199 ConfigClass = ForcedPhotCcdConfig
200 _DefaultName =
"forcedPhotCcd"
203 def __init__(self, refSchema=None, initInputs=None, **kwargs):
204 super().__init__(**kwargs)
206 if initInputs
is not None:
207 refSchema = initInputs[
'inputSchema'].schema
209 if refSchema
is None:
210 raise ValueError(
"No reference schema provided.")
212 self.makeSubtask(
"measurement", refSchema=refSchema)
216 if self.config.doApCorr:
217 self.makeSubtask(
"applyApCorr", schema=self.measurement.schema)
218 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
222 inputs = butlerQC.get(inputRefs)
224 tract = butlerQC.quantum.dataId[
'tract']
225 skyMap = inputs.pop(
'skyMap')
226 inputs[
'refWcs'] = skyMap[tract].getWcs()
229 skyCorr = inputs.pop(
'skyCorr',
None)
231 inputs[
'exposure'] = self.prepareCalibratedExposure(
234 visitSummary=inputs.pop(
"visitSummary"),
237 inputs[
'refCat'] = self.mergeAndFilterReferences(inputs[
'exposure'], inputs[
'refCat'],
240 if inputs[
'refCat']
is None:
241 self.log.info(
"No WCS for exposure %s. No %s catalog will be written.",
242 butlerQC.quantum.dataId, outputRefs.measCat.datasetType.name)
244 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(inputRefs.exposure.dataId,
246 inputs[
'refCat'], inputs[
'refWcs'])
247 self.attachFootprints(inputs[
'measCat'], inputs[
'refCat'], inputs[
'exposure'], inputs[
'refWcs'])
248 outputs = self.run(**inputs)
249 butlerQC.put(outputs, outputRefs)
251 def prepareCalibratedExposure(self, exposure, skyCorr=None, visitSummary=None):
252 """Prepare a calibrated exposure and apply external calibrations
253 and sky corrections if so configured.
257 exposure : `lsst.afw.image.exposure.Exposure`
258 Input exposure to adjust calibrations.
259 skyCorr : `lsst.afw.math.backgroundList`, optional
260 Sky correction frame to apply if doApplySkyCorr=True.
261 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
262 Exposure catalog with update calibrations; any not-None calibration
263 objects attached will be used. These are applied first and may be
264 overridden by other arguments.
268 exposure : `lsst.afw.image.exposure.Exposure`
269 Exposure with adjusted calibrations.
271 detectorId = exposure.getInfo().getDetector().getId()
273 if visitSummary
is not None:
274 row = visitSummary.find(detectorId)
276 raise RuntimeError(f
"Detector id {detectorId} not found in visitSummary.")
277 if (photoCalib := row.getPhotoCalib())
is not None:
278 exposure.setPhotoCalib(photoCalib)
279 if (skyWcs := row.getWcs())
is not None:
280 exposure.setWcs(skyWcs)
281 if (psf := row.getPsf())
is not None:
283 if (apCorrMap := row.getApCorrMap())
is not None:
284 exposure.info.setApCorrMap(apCorrMap)
286 if skyCorr
is not None:
287 exposure.maskedImage -= skyCorr.getImage()
291 def mergeAndFilterReferences(self, exposure, refCats, refWcs):
292 """Filter reference catalog so that all sources are within the
293 boundaries of the exposure.
297 exposure : `lsst.afw.image.exposure.Exposure`
298 Exposure to generate the catalog for.
299 refCats : sequence of `lsst.daf.butler.DeferredDatasetHandle`
300 Handles for catalogs of shapes and positions at which to force
302 refWcs : `lsst.afw.image.SkyWcs`
303 Reference world coordinate system.
307 refSources : `lsst.afw.table.SourceCatalog`
308 Filtered catalog of forced sources to measure.
312 The majority of this code is based on the methods of
313 lsst.meas.algorithms.loadReferenceObjects.ReferenceObjectLoader
320 expWcs = exposure.getWcs()
322 self.log.info(
"Exposure has no WCS. Returning None for mergedRefCat.")
324 expRegion = exposure.getBBox(lsst.afw.image.PARENT)
326 expBoxCorners = expBBox.getCorners()
327 expSkyCorners = [expWcs.pixelToSky(corner).getVector()
for
328 corner
in expBoxCorners]
336 for refCat
in refCats:
337 refCat = refCat.get()
338 if mergedRefCat
is None:
341 for record
in refCat:
342 if (expPolygon.contains(record.getCoord().getVector())
and record.getParent()
344 record.setFootprint(record.getFootprint())
345 mergedRefCat.append(record)
346 containedIds.add(record.getId())
347 if mergedRefCat
is None:
348 raise RuntimeError(
"No reference objects for forced photometry.")
352 def generateMeasCat(self, dataId, exposure, refCat, refWcs):
353 """Generate a measurement catalog.
357 dataId : `lsst.daf.butler.DataCoordinate`
358 Butler data ID for this image, with ``{visit, detector}`` keys.
359 exposure : `lsst.afw.image.exposure.Exposure`
360 Exposure to generate the catalog for.
361 refCat : `lsst.afw.table.SourceCatalog`
362 Catalog of shapes and positions at which to force photometry.
363 refWcs : `lsst.afw.image.SkyWcs`
364 Reference world coordinate system.
365 This parameter is not currently used.
369 measCat : `lsst.afw.table.SourceCatalog`
370 Catalog of forced sources to measure.
372 Unique binary id associated with the input exposure
374 id_generator = self.config.idGenerator.apply(dataId)
375 measCat = self.measurement.generateMeasCat(exposure, refCat, refWcs,
376 idFactory=id_generator.make_table_id_factory())
377 return measCat, id_generator.catalog_id
379 def run(self, measCat, exposure, refCat, refWcs, exposureId=None):
380 """Perform forced measurement on a single exposure.
384 measCat : `lsst.afw.table.SourceCatalog`
385 The measurement catalog, based on the sources listed in the
387 exposure : `lsst.afw.image.Exposure`
388 The measurement image upon which to perform forced detection.
389 refCat : `lsst.afw.table.SourceCatalog`
390 The reference catalog of sources to measure.
391 refWcs : `lsst.afw.image.SkyWcs`
392 The WCS for the references.
394 Optional unique exposureId used for random seed in measurement
399 result : `lsst.pipe.base.Struct`
400 Structure with fields:
403 Catalog of forced measurement results
404 (`lsst.afw.table.SourceCatalog`).
406 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
407 if self.config.doApCorr:
408 apCorrMap = exposure.getInfo().getApCorrMap()
409 if apCorrMap
is None:
410 self.log.warning(
"Forced exposure image does not have valid aperture correction; skipping.")
412 self.applyApCorr.run(
416 self.catalogCalculation.run(measCat)
418 return pipeBase.Struct(measCat=measCat)
420 def attachFootprints(self, sources, refCat, exposure, refWcs):
421 """Attach footprints to blank sources prior to measurements.
425 `~lsst.afw.detection.Footprint` objects for forced photometry must
426 be in the pixel coordinate system of the image being measured, while
427 the actual detections may start out in a different coordinate system.
429 Subclasses of this class may implement this method to define how
430 those `~lsst.afw.detection.Footprint` objects should be generated.
432 This default implementation transforms depends on the
433 ``footprintSource`` configuration parameter.
435 if self.
config.footprintSource ==
"transformed":
436 return self.measurement.attachTransformedFootprints(sources, refCat, exposure, refWcs)
437 elif self.
config.footprintSource ==
"psf":
438 return self.measurement.attachPsfShapeFootprints(sources, exposure,
439 scaling=self.
config.psfFootprintScaling)
442class ForcedPhotCcdFromDataFrameConnections(PipelineTaskConnections,
443 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
444 defaultTemplates={
"inputCoaddName":
"goodSeeing",
445 "inputName":
"calexp",
448 doc=
"Catalog of positions at which to force photometry.",
449 name=
"{inputCoaddName}Diff_fullDiaObjTable",
450 storageClass=
"DataFrame",
451 dimensions=[
"skymap",
"tract",
"patch"],
456 doc=
"Input exposure to perform photometry on.",
458 storageClass=
"ExposureF",
459 dimensions=[
"instrument",
"visit",
"detector"],
462 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
464 storageClass=
"Background",
465 dimensions=(
"instrument",
"visit",
"detector"),
467 visitSummary = cT.Input(
468 doc=
"Input visit-summary catalog with updated calibration objects.",
469 name=
"finalVisitSummary",
470 storageClass=
"ExposureCatalog",
471 dimensions=(
"instrument",
"visit"),
474 doc=
"SkyMap dataset that defines the coordinate system of the reference catalog.",
475 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
476 storageClass=
"SkyMap",
477 dimensions=[
"skymap"],
480 doc=
"Output forced photometry catalog.",
481 name=
"forced_src_diaObject",
482 storageClass=
"SourceCatalog",
483 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
485 outputSchema = cT.InitOutput(
486 doc=
"Schema for the output forced measurement catalogs.",
487 name=
"forced_src_diaObject_schema",
488 storageClass=
"SourceCatalog",
491 def __init__(self, *, config=None):
492 super().__init__(config=config)
493 if not config.doApplySkyCorr:
494 self.inputs.remove(
"skyCorr")
497class ForcedPhotCcdFromDataFrameConfig(ForcedPhotCcdConfig,
498 pipelineConnections=ForcedPhotCcdFromDataFrameConnections):
499 def setDefaults(self):
500 super().setDefaults()
501 self.footprintSource =
"psf"
502 self.measurement.doReplaceWithNoise =
False
505 self.measurement.plugins.names = [
"base_PixelFlags",
506 "base_TransformedCentroidFromCoord",
508 "base_LocalBackground",
509 "base_LocalPhotoCalib",
512 self.measurement.slots.shape =
None
515 self.catalogCalculation.plugins.names = []
517 self.measurement.copyColumns = {
'id':
'diaObjectId',
'coord_ra':
'coord_ra',
'coord_dec':
'coord_dec'}
518 self.measurement.slots.centroid =
"base_TransformedCentroidFromCoord"
519 self.measurement.slots.psfFlux =
"base_PsfFlux"
523 if self.footprintSource ==
"transformed":
524 raise ValueError(
"Cannot transform footprints from reference catalog, "
525 "because DataFrames can't hold footprints.")
528class ForcedPhotCcdFromDataFrameTask(ForcedPhotCcdTask):
529 """Force Photometry on a per-detector exposure with coords from a DataFrame
531 Uses input from a DataFrame instead of SourceCatalog
532 like the base class ForcedPhotCcd does.
533 Writes out a SourceCatalog so that the downstream
534 WriteForcedSourceTableTask can be reused with output from this Task.
536 _DefaultName =
"forcedPhotCcdFromDataFrame"
537 ConfigClass = ForcedPhotCcdFromDataFrameConfig
539 def __init__(self, refSchema=None, initInputs=None, **kwargs):
541 pipeBase.PipelineTask.__init__(self, **kwargs)
547 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
551 inputs = butlerQC.get(inputRefs)
553 tract = butlerQC.quantum.dataId[
"tract"]
554 skyMap = inputs.pop(
"skyMap")
555 inputs[
"refWcs"] = skyMap[tract].getWcs()
558 skyCorr = inputs.pop(
'skyCorr',
None)
560 inputs[
'exposure'] = self.prepareCalibratedExposure(
563 visitSummary=inputs.pop(
"visitSummary"),
566 self.log.info(
"Filtering ref cats: %s",
','.join([str(i.dataId)
for i
in inputs[
'refCat']]))
567 if inputs[
"exposure"].getWcs()
is not None:
568 refCat = self.df2RefCat([i.get(parameters={
"columns": [
'diaObjectId',
'ra',
'dec']})
569 for i
in inputs[
'refCat']],
570 inputs[
'exposure'].getBBox(), inputs[
'exposure'].getWcs())
571 inputs[
'refCat'] = refCat
573 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(
574 inputRefs.exposure.dataId, inputs[
'exposure'], inputs[
'refCat'], inputs[
'refWcs']
578 self.attachFootprints(inputs[
"measCat"], inputs[
"refCat"], inputs[
"exposure"], inputs[
"refWcs"])
579 outputs = self.run(**inputs)
581 butlerQC.put(outputs, outputRefs)
583 self.log.info(
"No WCS for %s. Skipping and no %s catalog will be written.",
584 butlerQC.quantum.dataId, outputRefs.measCat.datasetType.name)
587 """Convert list of DataFrames to reference catalog
589 Concatenate list of DataFrames presumably from multiple patches and
590 downselect rows that overlap the exposureBBox using the exposureWcs.
594 dfList : `list` of `pandas.DataFrame`
595 Each element containst diaObjects with ra/dec position in degrees
596 Columns 'diaObjectId', 'ra', 'dec' are expected
597 exposureBBox : `lsst.geom.Box2I`
598 Bounding box on which to select rows that overlap
599 exposureWcs : `lsst.afw.geom.SkyWcs`
600 World coordinate system to convert sky coords in ref cat to
601 pixel coords with which to compare with exposureBBox
605 refCat : `lsst.afw.table.SourceTable`
606 Source Catalog with minimal schema that overlaps exposureBBox
608 df = pd.concat(dfList)
611 mapping = exposureWcs.getTransform().getMapping()
612 x, y = mapping.applyInverse(np.array(df[[
'ra',
'dec']].values*2*np.pi/360).T)
614 refCat = self.df2SourceCat(df[inBBox])
618 """Create minimal schema SourceCatalog from a pandas DataFrame.
620 The forced measurement subtask expects this as input.
624 df : `pandas.DataFrame`
625 DiaObjects with locations and ids.
629 outputCatalog : `lsst.afw.table.SourceTable`
630 Output catalog with minimal schema.
634 outputCatalog.reserve(len(df))
636 for diaObjectId, ra, dec
in df[[
'ra',
'dec']].itertuples():
637 outputRecord = outputCatalog.addNew()
638 outputRecord.setId(diaObjectId)
static Schema makeMinimalSchema()
Return a minimal schema for Source tables and records.
static Key< RecordId > getParentKey()
Key for the parent ID.
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
ConvexPolygon is a closed convex polygon on the unit sphere.
df2RefCat(self, dfList, exposureBBox, exposureWcs)
runQuantum(self, butlerQC, inputRefs, outputRefs)