36import lsst.pipe.base.connectionTypes
as cT
41from .forcedMeasurement
import ForcedMeasurementTask
42from .applyApCorr
import ApplyApCorrTask
43from .catalogCalculation
import CatalogCalculationTask
44from ._id_generator
import DetectorVisitIdGeneratorConfig
46__all__ = (
"ForcedPhotCcdConfig",
"ForcedPhotCcdTask",
47 "ForcedPhotCcdFromDataFrameTask",
"ForcedPhotCcdFromDataFrameConfig")
51 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
52 defaultTemplates={
"inputCoaddName":
"deep",
53 "inputName":
"calexp"}):
54 inputSchema = cT.InitInput(
55 doc=
"Schema for the input measurement catalogs.",
56 name=
"{inputCoaddName}Coadd_ref_schema",
57 storageClass=
"SourceCatalog",
59 outputSchema = cT.InitOutput(
60 doc=
"Schema for the output forced measurement catalogs.",
61 name=
"forced_src_schema",
62 storageClass=
"SourceCatalog",
65 doc=
"Input exposure to perform photometry on.",
67 storageClass=
"ExposureF",
68 dimensions=[
"instrument",
"visit",
"detector"],
71 doc=
"Catalog of shapes and positions at which to force photometry.",
72 name=
"{inputCoaddName}Coadd_ref",
73 storageClass=
"SourceCatalog",
74 dimensions=[
"skymap",
"tract",
"patch"],
79 doc=
"SkyMap dataset that defines the coordinate system of the reference catalog.",
80 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
81 storageClass=
"SkyMap",
82 dimensions=[
"skymap"],
85 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
87 storageClass=
"Background",
88 dimensions=(
"instrument",
"visit",
"detector"),
90 visitSummary = cT.Input(
91 doc=
"Input visit-summary catalog with updated calibration objects.",
92 name=
"finalVisitSummary",
93 storageClass=
"ExposureCatalog",
94 dimensions=(
"instrument",
"visit"),
97 doc=
"Output forced photometry catalog.",
99 storageClass=
"SourceCatalog",
100 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
103 def __init__(self, *, config=None):
104 super().__init__(config=config)
105 if not config.doApplySkyCorr:
106 self.inputs.remove(
"skyCorr")
109class ForcedPhotCcdConfig(pipeBase.PipelineTaskConfig,
110 pipelineConnections=ForcedPhotCcdConnections):
111 """Config class for forced measurement driver task."""
113 target=ForcedMeasurementTask,
114 doc=
"subtask to do forced measurement"
117 doc=
"coadd name: typically one of deep or goodSeeing",
124 doc=
"Run subtask to apply aperture corrections"
127 target=ApplyApCorrTask,
128 doc=
"Subtask to apply aperture corrections"
131 target=CatalogCalculationTask,
132 doc=
"Subtask to run catalogCalculation plugins on catalog"
137 doc=
"Apply sky correction?",
142 doc=
"Add photometric calibration variance to warp variance plane?",
146 doc=
"Where to obtain footprints to install in the measurement catalog, prior to measurement.",
148 "transformed":
"Transform footprints from the reference catalog (downgrades HeavyFootprints).",
149 "psf": (
"Use the scaled shape of the PSF at the position of each source (does not generate "
150 "HeavyFootprints)."),
153 default=
"transformed",
157 doc=
"Scaling factor to apply to the PSF shape when footprintSource='psf' (ignored otherwise).",
160 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
162 def setDefaults(self):
164 super().setDefaults()
167 self.measurement.doReplaceWithNoise =
False
170 self.measurement.plugins.names = [
"base_PixelFlags",
171 "base_TransformedCentroid",
173 "base_LocalBackground",
174 "base_LocalPhotoCalib",
177 self.measurement.slots.shape =
None
179 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'STREAK']
180 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'STREAK']
183 self.catalogCalculation.plugins.names = []
186class ForcedPhotCcdTask(pipeBase.PipelineTask):
187 """A pipeline task for performing forced measurement on CCD images.
191 refSchema : `lsst.afw.table.Schema`, optional
192 The schema of the reference catalog, passed to the constructor of the
193 references subtask. Optional, but must be specified if ``initInputs``
194 is not; if both are specified, ``initInputs`` takes precedence.
196 Dictionary that can contain a key ``inputSchema`` containing the
197 schema. If present will override the value of ``refSchema``.
199 Keyword arguments are passed to the supertask constructor.
202 ConfigClass = ForcedPhotCcdConfig
203 _DefaultName =
"forcedPhotCcd"
206 def __init__(self, refSchema=None, initInputs=None, **kwargs):
207 super().__init__(**kwargs)
209 if initInputs
is not None:
210 refSchema = initInputs[
'inputSchema'].schema
212 if refSchema
is None:
213 raise ValueError(
"No reference schema provided.")
215 self.makeSubtask(
"measurement", refSchema=refSchema)
219 if self.config.doApCorr:
220 self.makeSubtask(
"applyApCorr", schema=self.measurement.schema)
221 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
224 def runQuantum(self, butlerQC, inputRefs, outputRefs):
225 inputs = butlerQC.get(inputRefs)
227 tract = butlerQC.quantum.dataId[
'tract']
228 skyMap = inputs.pop(
'skyMap')
229 inputs[
'refWcs'] = skyMap[tract].getWcs()
232 skyCorr = inputs.pop(
'skyCorr',
None)
234 inputs[
'exposure'] = self.prepareCalibratedExposure(
237 visitSummary=inputs.pop(
"visitSummary"),
240 inputs[
'refCat'] = self.mergeAndFilterReferences(inputs[
'exposure'], inputs[
'refCat'],
243 if inputs[
'refCat']
is None:
244 self.log.info(
"No WCS for exposure %s. No %s catalog will be written.",
245 butlerQC.quantum.dataId, outputRefs.measCat.datasetType.name)
247 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(inputRefs.exposure.dataId,
249 inputs[
'refCat'], inputs[
'refWcs'])
250 self.attachFootprints(inputs[
'measCat'], inputs[
'refCat'], inputs[
'exposure'], inputs[
'refWcs'])
251 outputs = self.run(**inputs)
252 butlerQC.put(outputs, outputRefs)
254 def prepareCalibratedExposure(self, exposure, skyCorr=None, visitSummary=None):
255 """Prepare a calibrated exposure and apply external calibrations
256 and sky corrections if so configured.
260 exposure : `lsst.afw.image.exposure.Exposure`
261 Input exposure to adjust calibrations.
262 skyCorr : `lsst.afw.math.backgroundList`, optional
263 Sky correction frame to apply if doApplySkyCorr=True.
264 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
265 Exposure catalog with update calibrations; any not-None calibration
266 objects attached will be used. These are applied first and may be
267 overridden by other arguments.
271 exposure : `lsst.afw.image.exposure.Exposure`
272 Exposure with adjusted calibrations.
274 detectorId = exposure.getInfo().getDetector().getId()
276 if visitSummary
is not None:
277 row = visitSummary.find(detectorId)
279 raise RuntimeError(f
"Detector id {detectorId} not found in visitSummary.")
280 if (photoCalib := row.getPhotoCalib())
is not None:
281 exposure.setPhotoCalib(photoCalib)
282 if (skyWcs := row.getWcs())
is not None:
283 exposure.setWcs(skyWcs)
284 if (psf := row.getPsf())
is not None:
286 if (apCorrMap := row.getApCorrMap())
is not None:
287 exposure.info.setApCorrMap(apCorrMap)
289 if skyCorr
is not None:
290 exposure.maskedImage -= skyCorr.getImage()
294 def mergeAndFilterReferences(self, exposure, refCats, refWcs):
295 """Filter reference catalog so that all sources are within the
296 boundaries of the exposure.
300 exposure : `lsst.afw.image.exposure.Exposure`
301 Exposure to generate the catalog for.
302 refCats : sequence of `lsst.daf.butler.DeferredDatasetHandle`
303 Handles for catalogs of shapes and positions at which to force
305 refWcs : `lsst.afw.image.SkyWcs`
306 Reference world coordinate system.
310 refSources : `lsst.afw.table.SourceCatalog`
311 Filtered catalog of forced sources to measure.
315 The majority of this code is based on the methods of
316 lsst.meas.algorithms.loadReferenceObjects.ReferenceObjectLoader
323 expWcs = exposure.getWcs()
325 self.log.info(
"Exposure has no WCS. Returning None for mergedRefCat.")
327 expRegion = exposure.getBBox(lsst.afw.image.PARENT)
329 expBoxCorners = expBBox.getCorners()
330 expSkyCorners = [expWcs.pixelToSky(corner).getVector()
for
331 corner
in expBoxCorners]
339 for refCat
in refCats:
340 refCat = refCat.get()
341 if mergedRefCat
is None:
344 for record
in refCat:
345 if (expPolygon.contains(record.getCoord().getVector())
and record.getParent()
347 record.setFootprint(record.getFootprint())
348 mergedRefCat.append(record)
349 containedIds.add(record.getId())
350 if mergedRefCat
is None:
351 raise RuntimeError(
"No reference objects for forced photometry.")
355 def generateMeasCat(self, dataId, exposure, refCat, refWcs):
356 """Generate a measurement catalog.
360 dataId : `lsst.daf.butler.DataCoordinate`
361 Butler data ID for this image, with ``{visit, detector}`` keys.
362 exposure : `lsst.afw.image.exposure.Exposure`
363 Exposure to generate the catalog for.
364 refCat : `lsst.afw.table.SourceCatalog`
365 Catalog of shapes and positions at which to force photometry.
366 refWcs : `lsst.afw.image.SkyWcs`
367 Reference world coordinate system.
368 This parameter is not currently used.
372 measCat : `lsst.afw.table.SourceCatalog`
373 Catalog of forced sources to measure.
375 Unique binary id associated with the input exposure
377 id_generator = self.config.idGenerator.apply(dataId)
378 measCat = self.measurement.generateMeasCat(exposure, refCat, refWcs,
379 idFactory=id_generator.make_table_id_factory())
380 return measCat, id_generator.catalog_id
382 def run(self, measCat, exposure, refCat, refWcs, exposureId=None):
383 """Perform forced measurement on a single exposure.
387 measCat : `lsst.afw.table.SourceCatalog`
388 The measurement catalog, based on the sources listed in the
390 exposure : `lsst.afw.image.Exposure`
391 The measurement image upon which to perform forced detection.
392 refCat : `lsst.afw.table.SourceCatalog`
393 The reference catalog of sources to measure.
394 refWcs : `lsst.afw.image.SkyWcs`
395 The WCS for the references.
397 Optional unique exposureId used for random seed in measurement
402 result : `lsst.pipe.base.Struct`
403 Structure with fields:
406 Catalog of forced measurement results
407 (`lsst.afw.table.SourceCatalog`).
409 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
410 if self.config.doApCorr:
411 apCorrMap = exposure.getInfo().getApCorrMap()
412 if apCorrMap
is None:
413 self.log.warning(
"Forced exposure image does not have valid aperture correction; skipping.")
415 self.applyApCorr.run(
419 self.catalogCalculation.run(measCat)
421 return pipeBase.Struct(measCat=measCat)
423 def attachFootprints(self, sources, refCat, exposure, refWcs):
424 """Attach footprints to blank sources prior to measurements.
428 `~lsst.afw.detection.Footprint` objects for forced photometry must
429 be in the pixel coordinate system of the image being measured, while
430 the actual detections may start out in a different coordinate system.
432 Subclasses of this class may implement this method to define how
433 those `~lsst.afw.detection.Footprint` objects should be generated.
435 This default implementation transforms depends on the
436 ``footprintSource`` configuration parameter.
438 if self.
config.footprintSource ==
"transformed":
439 return self.measurement.attachTransformedFootprints(sources, refCat, exposure, refWcs)
440 elif self.
config.footprintSource ==
"psf":
441 return self.measurement.attachPsfShapeFootprints(sources, exposure,
442 scaling=self.
config.psfFootprintScaling)
445class ForcedPhotCcdFromDataFrameConnections(PipelineTaskConnections,
446 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
447 defaultTemplates={
"inputCoaddName":
"goodSeeing",
448 "inputName":
"calexp",
451 doc=
"Catalog of positions at which to force photometry.",
452 name=
"{inputCoaddName}Diff_fullDiaObjTable",
453 storageClass=
"DataFrame",
454 dimensions=[
"skymap",
"tract",
"patch"],
459 doc=
"Input exposure to perform photometry on.",
461 storageClass=
"ExposureF",
462 dimensions=[
"instrument",
"visit",
"detector"],
465 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
467 storageClass=
"Background",
468 dimensions=(
"instrument",
"visit",
"detector"),
470 visitSummary = cT.Input(
471 doc=
"Input visit-summary catalog with updated calibration objects.",
472 name=
"finalVisitSummary",
473 storageClass=
"ExposureCatalog",
474 dimensions=(
"instrument",
"visit"),
477 doc=
"Output forced photometry catalog.",
478 name=
"forced_src_diaObject",
479 storageClass=
"SourceCatalog",
480 dimensions=[
"instrument",
"visit",
"detector",
"skymap",
"tract"],
482 outputSchema = cT.InitOutput(
483 doc=
"Schema for the output forced measurement catalogs.",
484 name=
"forced_src_diaObject_schema",
485 storageClass=
"SourceCatalog",
488 def __init__(self, *, config=None):
489 super().__init__(config=config)
490 if not config.doApplySkyCorr:
491 self.inputs.remove(
"skyCorr")
494class ForcedPhotCcdFromDataFrameConfig(ForcedPhotCcdConfig,
495 pipelineConnections=ForcedPhotCcdFromDataFrameConnections):
496 def setDefaults(self):
497 super().setDefaults()
498 self.footprintSource =
"psf"
499 self.measurement.doReplaceWithNoise =
False
502 self.measurement.plugins.names = [
"base_PixelFlags",
503 "base_TransformedCentroidFromCoord",
505 "base_LocalBackground",
506 "base_LocalPhotoCalib",
509 self.measurement.slots.shape =
None
511 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'STREAK']
512 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'STREAK']
515 self.catalogCalculation.plugins.names = []
517 self.measurement.copyColumns = {
'id':
'diaObjectId',
'coord_ra':
'coord_ra',
'coord_dec':
'coord_dec'}
518 self.measurement.slots.centroid =
"base_TransformedCentroidFromCoord"
519 self.measurement.slots.psfFlux =
"base_PsfFlux"
523 if self.footprintSource ==
"transformed":
524 raise ValueError(
"Cannot transform footprints from reference catalog, "
525 "because DataFrames can't hold footprints.")
528class ForcedPhotCcdFromDataFrameTask(ForcedPhotCcdTask):
529 """Force Photometry on a per-detector exposure with coords from a DataFrame
531 Uses input from a DataFrame instead of SourceCatalog
532 like the base class ForcedPhotCcd does.
533 Writes out a SourceCatalog so that the downstream
534 WriteForcedSourceTableTask can be reused with output from this Task.
536 _DefaultName =
"forcedPhotCcdFromDataFrame"
537 ConfigClass = ForcedPhotCcdFromDataFrameConfig
539 def __init__(self, refSchema=None, initInputs=None, **kwargs):
541 pipeBase.PipelineTask.__init__(self, **kwargs)
547 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
551 inputs = butlerQC.get(inputRefs)
554 inputs[
'refWcs'] =
None
557 skyCorr = inputs.pop(
'skyCorr',
None)
559 inputs[
'exposure'] = self.prepareCalibratedExposure(
562 visitSummary=inputs.pop(
"visitSummary"),
565 self.log.info(
"Filtering ref cats: %s",
','.join([str(i.dataId)
for i
in inputs[
'refCat']]))
566 if inputs[
"exposure"].getWcs()
is not None:
567 refCat = self.df2RefCat([i.get(parameters={
"columns": [
'diaObjectId',
'ra',
'dec']})
568 for i
in inputs[
'refCat']],
569 inputs[
'exposure'].getBBox(), inputs[
'exposure'].getWcs())
570 inputs[
'refCat'] = refCat
572 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(
573 inputRefs.exposure.dataId, inputs[
'exposure'], inputs[
'refCat'], inputs[
'refWcs']
577 self.attachFootprints(inputs[
"measCat"], inputs[
"refCat"], inputs[
"exposure"], inputs[
"refWcs"])
578 outputs = self.run(**inputs)
580 butlerQC.put(outputs, outputRefs)
582 self.log.info(
"No WCS for %s. Skipping and no %s catalog will be written.",
583 butlerQC.quantum.dataId, outputRefs.measCat.datasetType.name)
586 """Convert list of DataFrames to reference catalog
588 Concatenate list of DataFrames presumably from multiple patches and
589 downselect rows that overlap the exposureBBox using the exposureWcs.
593 dfList : `list` of `pandas.DataFrame`
594 Each element containst diaObjects with ra/dec position in degrees
595 Columns 'diaObjectId', 'ra', 'dec' are expected
596 exposureBBox : `lsst.geom.Box2I`
597 Bounding box on which to select rows that overlap
598 exposureWcs : `lsst.afw.geom.SkyWcs`
599 World coordinate system to convert sky coords in ref cat to
600 pixel coords with which to compare with exposureBBox
604 refCat : `lsst.afw.table.SourceTable`
605 Source Catalog with minimal schema that overlaps exposureBBox
607 df = pd.concat(dfList)
610 mapping = exposureWcs.getTransform().getMapping()
611 x, y = mapping.applyInverse(np.array(df[[
'ra',
'dec']].values*2*np.pi/360).T)
613 refCat = self.df2SourceCat(df[inBBox])
617 """Create minimal schema SourceCatalog from a pandas DataFrame.
619 The forced measurement subtask expects this as input.
623 df : `pandas.DataFrame`
624 DiaObjects with locations and ids.
628 outputCatalog : `lsst.afw.table.SourceTable`
629 Output catalog with minimal schema.
633 outputCatalog.reserve(len(df))
635 for diaObjectId, ra, dec
in df[[
'ra',
'dec']].itertuples():
636 outputRecord = outputCatalog.addNew()
637 outputRecord.setId(diaObjectId)
static Schema makeMinimalSchema()
Return a minimal schema for Source tables and records.
static Key< RecordId > getParentKey()
Key for the parent ID.
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
ConvexPolygon is a closed convex polygon on the unit sphere.
df2RefCat(self, dfList, exposureBBox, exposureWcs)
runQuantum(self, butlerQC, inputRefs, outputRefs)