29from .references
import MultiBandReferencesTask
30from .forcedMeasurement
import ForcedMeasurementTask
31from .applyApCorr
import ApplyApCorrTask
32from .catalogCalculation
import CatalogCalculationTask
34__all__ = (
"ForcedPhotCoaddConfig",
"ForcedPhotCoaddTask")
38 """Get the psfCache setting into ForcedPhotCoaddTask"""
41 return pipeBase.ButlerInitializedTaskRunner.getTargetList(parsedCmd,
42 psfCache=parsedCmd.psfCache)
46 dimensions=(
"band",
"skymap",
"tract",
"patch"),
47 defaultTemplates={
"inputCoaddName":
"deep",
48 "outputCoaddName":
"deep"}):
49 inputSchema = pipeBase.connectionTypes.InitInput(
50 doc=
"Schema for the input measurement catalogs.",
51 name=
"{inputCoaddName}Coadd_ref_schema",
52 storageClass=
"SourceCatalog",
54 outputSchema = pipeBase.connectionTypes.InitOutput(
55 doc=
"Schema for the output forced measurement catalogs.",
56 name=
"{outputCoaddName}Coadd_forced_src_schema",
57 storageClass=
"SourceCatalog",
59 exposure = pipeBase.connectionTypes.Input(
60 doc=
"Input exposure to perform photometry on.",
61 name=
"{inputCoaddName}Coadd_calexp",
62 storageClass=
"ExposureF",
63 dimensions=[
"band",
"skymap",
"tract",
"patch"],
65 refCat = pipeBase.connectionTypes.Input(
66 doc=
"Catalog of shapes and positions at which to force photometry.",
67 name=
"{inputCoaddName}Coadd_ref",
68 storageClass=
"SourceCatalog",
69 dimensions=[
"skymap",
"tract",
"patch"],
71 refCatInBand = pipeBase.connectionTypes.Input(
72 doc=
"Catalog of shapes and positions in the band having forced photometry done",
73 name=
"{inputCoaddName}Coadd_meas",
74 storageClass=
"SourceCatalog",
75 dimensions=(
"band",
"skymap",
"tract",
"patch")
77 footprintCatInBand = pipeBase.connectionTypes.Input(
78 doc=
"Catalog of footprints to attach to sources",
79 name=
"{inputCoaddName}Coadd_deblendedFlux",
80 storageClass=
"SourceCatalog",
81 dimensions=(
"band",
"skymap",
"tract",
"patch")
83 scarletModels = pipeBase.connectionTypes.Input(
84 doc=
"Multiband scarlet models produced by the deblender",
85 name=
"{inputCoaddName}Coadd_scarletModelData",
86 storageClass=
"ScarletModelData",
87 dimensions=(
"tract",
"patch",
"skymap"),
89 refWcs = pipeBase.connectionTypes.Input(
90 doc=
"Reference world coordinate system.",
91 name=
"{inputCoaddName}Coadd.wcs",
93 dimensions=[
"band",
"skymap",
"tract",
"patch"],
95 measCat = pipeBase.connectionTypes.Output(
96 doc=
"Output forced photometry catalog.",
97 name=
"{outputCoaddName}Coadd_forced_src",
98 storageClass=
"SourceCatalog",
99 dimensions=[
"band",
"skymap",
"tract",
"patch"],
102 def __init__(self, *, config=None):
103 super().__init__(config=config)
104 if config.footprintDatasetName !=
"ScarletModelData":
105 self.inputs.remove(
"scarletModels")
106 if config.footprintDatasetName !=
"DeblendedFlux":
107 self.inputs.remove(
"footprintCatInBand")
110class ForcedPhotCoaddConfig(pipeBase.PipelineTaskConfig,
111 pipelineConnections=ForcedPhotCoaddConnections):
113 target=MultiBandReferencesTask,
114 doc=
"subtask to retrieve reference source catalog"
117 target=ForcedMeasurementTask,
118 doc=
"subtask to do forced measurement"
121 doc=
"coadd name: typically one of deep or goodSeeing",
128 doc=
"Run subtask to apply aperture corrections"
131 target=ApplyApCorrTask,
132 doc=
"Subtask to apply aperture corrections"
135 target=CatalogCalculationTask,
136 doc=
"Subtask to run catalogCalculation plugins on catalog"
139 doc=
"Dataset (without coadd prefix) that should be used to obtain (Heavy)Footprints for sources. "
140 "Must have IDs that match those of the reference catalog."
141 "If None, Footprints will be generated by transforming the reference Footprints.",
143 default=
"ScarletModelData",
149 doc=
"Whether to use the deblender models as templates to re-distribute the flux "
150 "from the 'exposure' (True), or to perform measurements on the deblender model footprints. "
151 "If footprintDatasetName != 'ScarletModelData' then this field is ignored.")
155 doc=
"Whether to strip footprints from the output catalog before "
157 "This is usually done when using scarlet models to save disk space.")
161 doc=
"Should be set to True if fake sources have been inserted into the input data."
170 self.catalogCalculation.plugins.names = []
171 self.measurement.copyColumns[
"id"] =
"id"
172 self.measurement.copyColumns[
"parent"] =
"parent"
173 self.references.removePatchOverlaps =
False
174 self.measurement.plugins.names |= [
'base_InputCount',
'base_Variance']
175 self.measurement.plugins[
'base_PixelFlags'].masksFpAnywhere = [
'CLIPPED',
'SENSOR_EDGE',
176 'REJECTED',
'INEXACT_PSF']
177 self.measurement.plugins[
'base_PixelFlags'].masksFpCenter = [
'CLIPPED',
'SENSOR_EDGE',
178 'REJECTED',
'INEXACT_PSF']
182 if (self.measurement.doReplaceWithNoise
and self.footprintDatasetName
is not None
183 and self.references.removePatchOverlaps):
184 raise ValueError(
"Cannot use removePatchOverlaps=True with deblended footprints, as parent "
185 "sources may be rejected while their children are not.")
188class ForcedPhotCoaddTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
189 """A command-line driver for performing forced measurement on coadd images.
194 A Butler which will be passed to the references subtask to allow it to
195 load its schema from disk. Optional, but must be specified
if
196 ``refSchema``
is not;
if both are specified, ``refSchema`` takes
199 The schema of the reference catalog, passed to the constructor of the
200 references subtask. Optional, but must be specified
if ``butler``
is
201 not;
if both are specified, ``refSchema`` takes precedence.
203 Keyword arguments are passed to the supertask constructor.
206 ConfigClass = ForcedPhotCoaddConfig
207 RunnerClass = ForcedPhotCoaddRunner
208 _DefaultName = "forcedPhotCoadd"
209 dataPrefix =
"deepCoadd_"
211 def __init__(self, butler=None, refSchema=None, initInputs=None, **kwds):
212 super().__init__(**kwds)
214 if initInputs
is not None:
215 refSchema = initInputs[
'inputSchema'].schema
217 self.makeSubtask(
"references", butler=butler, schema=refSchema)
218 if refSchema
is None:
219 refSchema = self.references.schema
220 self.makeSubtask(
"measurement", refSchema=refSchema)
223 if self.config.doApCorr:
224 self.makeSubtask(
"applyApCorr", schema=self.measurement.schema)
225 self.makeSubtask(
'catalogCalculation', schema=self.measurement.schema)
228 def runQuantum(self, butlerQC, inputRefs, outputRefs):
229 inputs = butlerQC.get(inputRefs)
231 refCatInBand = inputs.pop(
'refCatInBand')
232 if self.config.footprintDatasetName ==
"ScarletModelData":
233 footprintData = inputs.pop(
"scarletModels")
234 elif self.config.footprintDatasetName ==
"DeblendedFlux":
235 footprintData = inputs.pop(
"footprintCatIndBand")
238 inputs[
'measCat'], inputs[
'exposureId'] = self.generateMeasCat(inputRefs.exposure.dataId,
245 outputs = self.run(**inputs)
247 if self.config.footprintDatasetName ==
"ScarletModelData" and self.config.doStripFootprints:
248 sources = outputs.measCat
249 for source
in sources[sources[
"parent"] != 0]:
250 source.setFootprint(
None)
251 butlerQC.put(outputs, outputRefs)
253 def generateMeasCat(self, exposureDataId, exposure, refCat, refCatInBand, refWcs, idPackerName,
255 """Generate a measurement catalog for Gen3.
259 exposureDataId : `DataId`
260 Butler dataId for this exposure.
262 Exposure to generate the catalog
for.
264 Catalog of shapes
and positions at which to force photometry.
266 Catalog of shapes
and position
in the band forced photometry
is
267 currently being performed
268 refWcs : `lsst.afw.image.SkyWcs`
269 Reference world coordinate system.
271 Type of ID packer to construct
from the registry.
273 Either the scarlet data models
or the deblended catalog
274 containing footprints.
275 If `footprintData`
is `
None` then the footprints contained
276 in `refCatInBand` are used.
281 Catalog of forced sources to measure.
283 Unique binary id associated
with the input exposure
288 Raised
if a footprint
with a given source id was
in the reference
289 catalog but
not in the reference catalog
in band (meaning there
290 was some sort of mismatch
in the two input catalogs)
292 exposureIdInfo = ExposureIdInfo.fromDataId(exposureDataId, idPackerName)
293 idFactory = exposureIdInfo.makeSourceIdFactory()
295 measCat = self.measurement.generateMeasCat(exposure, refCat, refWcs,
299 if self.config.footprintDatasetName ==
"ScarletModelData":
301 self._attachScarletFootprints(
303 modelData=footprintData,
305 band=exposureDataId[
"band"]
308 if self.config.footprintDatasetName
is None:
309 footprintCat = refCatInBand
311 footprintCat = footprintData
312 for srcRecord
in measCat:
313 fpRecord = footprintCat.find(srcRecord.getId())
315 raise LookupError(
"Cannot find Footprint for source {}; please check that {} "
316 "IDs are compatible with reference source IDs"
317 .
format(srcRecord.getId(), footprintCat))
318 srcRecord.setFootprint(fpRecord.getFootprint())
319 return measCat, exposureIdInfo.expId
321 def runDataRef(self, dataRef, psfCache=None):
322 """Perform forced measurement on a single exposure.
327 Passed to the ``references`` subtask to obtain the reference WCS,
328 the ``getExposure`` method (implemented by derived classes) to
329 read the measurment image, and the ``fetchReferences`` method to
330 get the exposure
and load the reference catalog (see
333 and data ID keys which are used.
334 psfCache : `int`, optional
335 Size of PSF cache,
or `
None`. The size of the PSF cache can have
336 a significant effect upon the runtime
for complicated PSF models.
340 Sources are generated
with ``generateMeasCat``
in the ``measurement``
341 subtask. These are passed to ``measurement``
's ``run`` method, which
342 fills the source catalog with the forced measurement results. The
343 sources are then passed to the ``writeOutputs`` method (implemented by
344 derived classes) which writes the outputs.
346 refWcs = self.references.getWcs(dataRef)
347 exposure = self.getExposure(dataRef)
348 if psfCache
is not None:
349 exposure.getPsf().setCacheCapacity(psfCache)
350 refCat = self.fetchReferences(dataRef, exposure)
352 exposureId = coaddUtils.getGen3CoaddExposureId(dataRef, coaddName=self.config.coaddName,
353 includeBand=
False, log=self.log)
354 measCat = self.measurement.generateMeasCat(
355 exposure, refCat, refWcs, idFactory=self.makeIdFactory(dataRef, exposureId=exposureId))
356 self.log.
info(
"Performing forced measurement on %s", dataRef.dataId)
357 self.attachFootprints(measCat, refCat, exposure, refWcs, dataRef)
359 forcedPhotResult = self.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
361 self.writeOutput(dataRef, forcedPhotResult.measCat)
363 def run(self, measCat, exposure, refCat, refWcs, exposureId=None):
364 """Perform forced measurement on a single exposure.
369 The measurement catalog, based on the sources listed in the
372 The measurement image upon which to perform forced detection.
374 The reference catalog of sources to measure.
375 refWcs : `lsst.afw.image.SkyWcs`
376 The WCS
for the references.
378 Optional unique exposureId used
for random seed
in measurement
383 result : ~`lsst.pipe.base.Struct`
384 Structure
with fields:
387 Catalog of forced measurement results
390 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
391 if self.config.doApCorr:
392 self.applyApCorr.
run(
394 apCorrMap=exposure.getInfo().getApCorrMap()
396 self.catalogCalculation.
run(measCat)
398 return pipeBase.Struct(measCat=measCat)
400 def makeIdFactory(self, dataRef, exposureId):
401 """Create an object that generates globally unique source IDs.
403 Source IDs are created based on a per-CCD ID and the ID of the CCD
409 Butler data reference. The
"CoaddId_bits" and "CoaddId" datasets
410 are accessed. The data ID must have tract
and patch keys.
419 exposureIdInfo = ExposureIdInfo(exposureId, dataRef.get(self.config.coaddName +
"CoaddId_bits"))
420 return exposureIdInfo.makeSourceIdFactory()
423 """Return an iterable of reference sources which overlap the exposure.
428 Butler data reference corresponding to the image to be measured;
429 should have tract, patch, and filter keys.
436 All work
is delegated to the references subtask; see
437 `CoaddSrcReferencesTask`
for information about the default behavior.
439 skyMap = dataRef.get(self.dataPrefix + "skyMap", immediate=
True)
440 tractInfo = skyMap[dataRef.dataId[
"tract"]]
441 patch = tuple(
int(v)
for v
in dataRef.dataId[
"patch"].split(
","))
442 patchInfo = tractInfo.getPatchInfo(patch)
444 references.extend(self.references.fetchInPatches(dataRef, patchList=[patchInfo]))
448 r"""Attach Footprints to source records.
450 For coadd forced photometry, we use the deblended "heavy"
452 of the same band - because we
've guaranteed that the peaks (and hence
453 child sources) will be consistent across all bands before we get to
454 measurement, this should yield reasonable deblending
for most sources.
455 It
's most likely limitation is that it will not provide good flux
456 upper limits for sources that were
not detected
in this band but were
457 blended
with sources that were.
459 if self.config.footprintDatasetName
is None:
460 return self.measurement.attachTransformedFootprints(sources, refCat, exposure, refWcs)
462 self.log.
info(
"Loading deblended footprints for sources from %s, %s",
463 self.config.footprintDatasetName, dataRef.dataId)
465 if self.config.footprintDatasetName ==
"ScarletModelData":
467 dataModel = dataRef.get(
"%sCoadd_%s" % (self.config.coaddName, self.config.footprintDatasetName),
469 self._attachScarletFootprints(refCat, dataModel, exposure, dataRef.dataId[
"band"])
471 fpCat = dataRef.get(
"%sCoadd_%s" % (self.config.coaddName, self.config.footprintDatasetName),
473 for refRecord, srcRecord
in zip(refCat, sources):
474 fpRecord = fpCat.find(refRecord.getId())
476 raise LookupError(
"Cannot find Footprint for source %s; please check that %sCoadd_%s "
477 "IDs are compatible with reference source IDs" %
478 (srcRecord.getId(), self.config.coaddName,
479 self.config.footprintDatasetName))
480 srcRecord.setFootprint(fpRecord.getFootprint())
482 def _attachScarletFootprints(self, catalog, modelData, exposure, band):
483 """Attach scarlet models as HeavyFootprints
485 if self.config.doConserveFlux:
486 redistributeImage = exposure.image
488 redistributeImage =
None
490 modelData.updateCatalogFootprints(
493 psfModel=exposure.getPsf(),
494 redistributeImage=redistributeImage,
495 removeScarletData=
True,
496 updateFluxColumns=
False,
500 """Read input exposure on which measurement will be performed.
505 Butler data reference.
507 if self.config.hasFakes:
508 name =
"fakes_" + self.config.coaddName +
"Coadd_calexp"
510 name = self.config.coaddName +
"Coadd_calexp"
512 return dataRef.get(name)
if dataRef.datasetExists(name)
else None
515 """Write forced source table
520 Butler data reference. The forced_src dataset (with
521 self.dataPrefix prepended)
is all that will be modified.
523 Catalog of sources to save.
525 dataRef.put(sources, self.dataPrefix + "forced_src", flags=lsst.afw.table.SOURCE_IO_NO_FOOTPRINTS)
528 """The schema catalogs that will be used by this task.
532 schemaCatalogs : `dict`
533 Dictionary mapping dataset type to schema catalog.
537 There is only one schema
for each type of forced measurement. The
538 dataset type
for this measurement
is defined
in the mapper.
541 catalog.getTable().setMetadata(self.measurement.algMetadata)
542 datasetType = self.dataPrefix + "forced_src"
543 return {datasetType: catalog}
545 def _getConfigName(self):
547 return self.dataPrefix +
"forced_config"
549 def _getMetadataName(self):
551 return self.dataPrefix +
"forced_metadata"
554 def _makeArgumentParser(cls):
555 parser = pipeBase.ArgumentParser(name=cls._DefaultName)
556 parser.add_id_argument(
"--id",
"deepCoadd_forced_src", help=
"data ID, with raw CCD keys + tract",
558 parser.add_argument(
"--psfCache", type=int, default=100, help=
"Size of CoaddPsf cache")
A class to contain the data, WCS, and other information needed to describe an image of the sky.
Defines the fields and offsets for a table.
def getTargetList(parsedCmd, **kwargs)
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
def attachFootprints(self, sources, refCat, exposure, refWcs, dataRef)
def writeOutput(self, dataRef, sources)
def fetchReferences(self, dataRef, exposure)
def getExposure(self, dataRef)
def getSchemaCatalogs(self)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)