cycleNumber = pexConfig.Field(
doc="Final fit cycle from FGCM fit",
dtype=int,
default=None,
)
physicalFilterMap = pexConfig.DictField(
doc="Mapping from 'physicalFilter' to band.",
keytype=str,
itemtype=str,
default={},
)
# The following fields refer to calibrating from a reference
# catalog, but in the future this might need to be expanded
doReferenceCalibration = pexConfig.Field(
doc=("Transfer 'absolute' calibration from reference catalog? "
"This afterburner step is unnecessary if reference stars "
"were used in the full fit in FgcmFitCycleTask."),
dtype=bool,
default=False,
)
doRefcatOutput = pexConfig.Field(
doc="Output standard stars in reference catalog format",
dtype=bool,
default=True,
)
doAtmosphereOutput = pexConfig.Field(
doc="Output atmospheres in transmission_atmosphere_fgcm format",
dtype=bool,
default=True,
)
doZeropointOutput = pexConfig.Field(
doc="Output zeropoints in fgcm_photoCalib format",
dtype=bool,
default=True,
)
doComposeWcsJacobian = pexConfig.Field(
doc="Compose Jacobian of WCS with fgcm calibration for output photoCalib?",
dtype=bool,
default=True,
)
doApplyMeanChromaticCorrection = pexConfig.Field(
doc="Apply the mean chromatic correction to the zeropoints?",
dtype=bool,
default=True,
)
refObjLoader = pexConfig.ConfigurableField(
target=LoadIndexedReferenceObjectsTask,
doc="reference object loader for 'absolute' photometric calibration",
)
photoCal = pexConfig.ConfigurableField(
target=PhotoCalTask,
doc="task to perform 'absolute' calibration",
)
referencePixelizationNside = pexConfig.Field(
doc="Healpix nside to pixelize catalog to compare to reference catalog",
dtype=int,
default=64,
)
referencePixelizationMinStars = pexConfig.Field(
doc=("Minimum number of stars per healpix pixel to select for comparison"
"to the specified reference catalog"),
dtype=int,
default=200,
)
referenceMinMatch = pexConfig.Field(
doc="Minimum number of stars matched to reference catalog to be used in statistics",
dtype=int,
default=50,
)
referencePixelizationNPixels = pexConfig.Field(
doc=("Number of healpix pixels to sample to do comparison. "
"Doing too many will take a long time and not yield any more "
"precise results because the final number is the median offset "
"(per band) from the set of pixels."),
dtype=int,
default=100,
)
datasetConfig = pexConfig.ConfigField(
dtype=DatasetConfig,
doc="Configuration for writing/reading ingested catalog",
)
def setDefaults(self):
pexConfig.Config.setDefaults(self)
# In order to transfer the "absolute" calibration from a reference
# catalog to the relatively calibrated FGCM standard stars (one number
# per band), we use the PhotoCalTask to match stars in a sample of healpix
# pixels. These basic settings ensure that only well-measured, good stars
# from the source and reference catalogs are used for the matching.
# applyColorTerms needs to be False if doReferenceCalibration is False,
# as is the new default after DM-16702
self.photoCal.applyColorTerms = False
self.photoCal.fluxField = 'instFlux'
self.photoCal.magErrFloor = 0.003
self.photoCal.match.referenceSelection.doSignalToNoise = True
self.photoCal.match.referenceSelection.signalToNoise.minimum = 10.0
self.photoCal.match.sourceSelection.doSignalToNoise = True
self.photoCal.match.sourceSelection.signalToNoise.minimum = 10.0
self.photoCal.match.sourceSelection.signalToNoise.fluxField = 'instFlux'
self.photoCal.match.sourceSelection.signalToNoise.errField = 'instFluxErr'
self.photoCal.match.sourceSelection.doFlags = True
self.photoCal.match.sourceSelection.flags.good = []
self.photoCal.match.sourceSelection.flags.bad = ['flag_badStar']
self.photoCal.match.sourceSelection.doUnresolved = False
self.datasetConfig.ref_dataset_name = 'fgcm_stars'
self.datasetConfig.format_version = 1
def validate(self):
super().validate()
# Force the connections to conform with cycleNumber
self.connections.cycleNumber = str(self.cycleNumber)
class FgcmOutputProductsRunner(pipeBase.ButlerInitializedTaskRunner):
@staticmethod
def getTargetList(parsedCmd):
return [parsedCmd.butler]
def __call__(self, butler):
task = self.TaskClass(butler=butler, config=self.config, log=self.log)
exitStatus = 0
if self.doRaise:
results = task.runDataRef(butler)
else:
try:
results = task.runDataRef(butler)
except Exception as e:
exitStatus = 1
task.log.fatal("Failed: %s" % e)
if not isinstance(e, pipeBase.TaskError):
traceback.print_exc(file=sys.stderr)
task.writeMetadata(butler)
if self.doReturnResults:
# The results here are the zeropoint offsets for each band
return [pipeBase.Struct(exitStatus=exitStatus,
results=results)]
else:
return [pipeBase.Struct(exitStatus=exitStatus)]
def run(self, parsedCmd):
resultList = []
if self.precall(parsedCmd):
targetList = self.getTargetList(parsedCmd)
# make sure that we only get 1
resultList = self(targetList[0])
return resultList
class FgcmOutputProductsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
ConfigClass = FgcmOutputProductsConfig
RunnerClass = FgcmOutputProductsRunner
_DefaultName = "fgcmOutputProducts"
def __init__(self, butler=None, **kwargs):
super().__init__(**kwargs)
# no saving of metadata for now
def _getMetadataName(self):
return None
def runQuantum(self, butlerQC, inputRefs, outputRefs):
dataRefDict = {}
dataRefDict['camera'] = butlerQC.get(inputRefs.camera)
dataRefDict['fgcmLookUpTable'] = butlerQC.get(inputRefs.fgcmLookUpTable)
dataRefDict['fgcmVisitCatalog'] = butlerQC.get(inputRefs.fgcmVisitCatalog)
dataRefDict['fgcmStandardStars'] = butlerQC.get(inputRefs.fgcmStandardStars)
if self.config.doZeropointOutput:
dataRefDict['fgcmZeropoints'] = butlerQC.get(inputRefs.fgcmZeropoints)
photoCalibRefDict = {photoCalibRef.dataId.byName()['visit']:
photoCalibRef for photoCalibRef in outputRefs.fgcmPhotoCalib}
if self.config.doAtmosphereOutput:
dataRefDict['fgcmAtmosphereParameters'] = butlerQC.get(inputRefs.fgcmAtmosphereParameters)
atmRefDict = {atmRef.dataId.byName()['visit']: atmRef for
atmRef in outputRefs.fgcmTransmissionAtmosphere}
if self.config.doReferenceCalibration:
refConfig = self.config.refObjLoader
self.refObjLoader = ReferenceObjectLoader(dataIds=[ref.datasetRef.dataId
for ref in inputRefs.refCat],
refCats=butlerQC.get(inputRefs.refCat),
config=refConfig,
log=self.log)
else:
self.refObjLoader = None
struct = self.run(dataRefDict, self.config.physicalFilterMap, returnCatalogs=True)
# Output the photoCalib exposure catalogs
if struct.photoCalibCatalogs is not None:
self.log.info("Outputting photoCalib catalogs.")
for visit, expCatalog in struct.photoCalibCatalogs:
butlerQC.put(expCatalog, photoCalibRefDict[visit])
self.log.info("Done outputting photoCalib catalogs.")
# Output the atmospheres
if struct.atmospheres is not None:
self.log.info("Outputting atmosphere transmission files.")
for visit, atm in struct.atmospheres:
butlerQC.put(atm, atmRefDict[visit])
self.log.info("Done outputting atmosphere files.")
if self.config.doReferenceCalibration:
# Turn offset into simple catalog for persistence if necessary
schema = afwTable.Schema()
schema.addField('offset', type=np.float64,
doc="Post-process calibration offset (mag)")
offsetCat = afwTable.BaseCatalog(schema)
offsetCat.resize(len(struct.offsets))
offsetCat['offset'][:] = struct.offsets
butlerQC.put(offsetCat, outputRefs.fgcmOffsets)
return
@pipeBase.timeMethod
def runDataRef(self, butler):
if self.config.doReferenceCalibration:
# We need the ref obj loader to get the flux field
self.makeSubtask("refObjLoader", butler=butler)
# Check to make sure that the fgcmBuildStars config exists, to retrieve
# the visit and ccd dataset tags
if not butler.datasetExists('fgcmBuildStarsTable_config') and \
not butler.datasetExists('fgcmBuildStars_config'):
raise RuntimeError("Cannot find fgcmBuildStarsTable_config or fgcmBuildStars_config, "
"which is prereq for fgcmOutputProducts")
if butler.datasetExists('fgcmBuildStarsTable_config'):
fgcmBuildStarsConfig = butler.get('fgcmBuildStarsTable_config')
else:
fgcmBuildStarsConfig = butler.get('fgcmBuildStars_config')
visitDataRefName = fgcmBuildStarsConfig.visitDataRefName
ccdDataRefName = fgcmBuildStarsConfig.ccdDataRefName
physicalFilterMap = fgcmBuildStarsConfig.physicalFilterMap
if self.config.doComposeWcsJacobian and not fgcmBuildStarsConfig.doApplyWcsJacobian:
raise RuntimeError("Cannot compose the WCS jacobian if it hasn't been applied "
"in fgcmBuildStarsTask.")
if not self.config.doComposeWcsJacobian and fgcmBuildStarsConfig.doApplyWcsJacobian:
self.log.warning("Jacobian was applied in build-stars but doComposeWcsJacobian is not set.")
# And make sure that the atmosphere was output properly
if (self.config.doAtmosphereOutput
and not butler.datasetExists('fgcmAtmosphereParameters', fgcmcycle=self.config.cycleNumber)):
raise RuntimeError(f"Atmosphere parameters are missing for cycle {self.config.cycleNumber}.")
if not butler.datasetExists('fgcmStandardStars',
fgcmcycle=self.config.cycleNumber):
raise RuntimeError("Standard stars are missing for cycle %d." %
(self.config.cycleNumber))
if (self.config.doZeropointOutput
and (not butler.datasetExists('fgcmZeropoints', fgcmcycle=self.config.cycleNumber))):
raise RuntimeError("Zeropoints are missing for cycle %d." %
(self.config.cycleNumber))
dataRefDict = {}
# This is the _actual_ camera
dataRefDict['camera'] = butler.get('camera')
dataRefDict['fgcmLookUpTable'] = butler.dataRef('fgcmLookUpTable')
dataRefDict['fgcmVisitCatalog'] = butler.dataRef('fgcmVisitCatalog')
dataRefDict['fgcmStandardStars'] = butler.dataRef('fgcmStandardStars',
fgcmcycle=self.config.cycleNumber)
if self.config.doZeropointOutput:
dataRefDict['fgcmZeropoints'] = butler.dataRef('fgcmZeropoints',
fgcmcycle=self.config.cycleNumber)
if self.config.doAtmosphereOutput:
dataRefDict['fgcmAtmosphereParameters'] = butler.dataRef('fgcmAtmosphereParameters',
fgcmcycle=self.config.cycleNumber)
struct = self.run(dataRefDict, physicalFilterMap, butler=butler, returnCatalogs=False)
if struct.photoCalibs is not None:
self.log.info("Outputting photoCalib files.")
for visit, detector, physicalFilter, photoCalib in struct.photoCalibs:
butler.put(photoCalib, 'fgcm_photoCalib',
dataId={visitDataRefName: visit,
ccdDataRefName: detector,
'filter': physicalFilter})
self.log.info("Done outputting photoCalib files.")
if struct.atmospheres is not None:
self.log.info("Outputting atmosphere transmission files.")
for visit, atm in struct.atmospheres:
butler.put(atm, "transmission_atmosphere_fgcm",
dataId={visitDataRefName: visit})
self.log.info("Done outputting atmosphere transmissions.")
return pipeBase.Struct(offsets=struct.offsets)
def run(self, dataRefDict, physicalFilterMap, returnCatalogs=True, butler=None):
stdCat = dataRefDict['fgcmStandardStars'].get()
md = stdCat.getMetadata()
bands = md.getArray('BANDS')
if self.config.doReferenceCalibration:
lutCat = dataRefDict['fgcmLookUpTable'].get()
offsets = self._computeReferenceOffsets(stdCat, lutCat, physicalFilterMap, bands)
else:
offsets = np.zeros(len(bands))
# This is Gen2 only, and requires the butler.
if self.config.doRefcatOutput and butler is not None:
self._outputStandardStars(butler, stdCat, offsets, bands, self.config.datasetConfig)
del stdCat
if self.config.doZeropointOutput:
zptCat = dataRefDict['fgcmZeropoints'].get()
visitCat = dataRefDict['fgcmVisitCatalog'].get()
pcgen = self._outputZeropoints(dataRefDict['camera'], zptCat, visitCat, offsets, bands,
physicalFilterMap, returnCatalogs=returnCatalogs)
else:
pcgen = None
if self.config.doAtmosphereOutput:
atmCat = dataRefDict['fgcmAtmosphereParameters'].get()
atmgen = self._outputAtmospheres(dataRefDict, atmCat)
else:
atmgen = None
retStruct = pipeBase.Struct(offsets=offsets,
atmospheres=atmgen)
if returnCatalogs:
retStruct.photoCalibCatalogs = pcgen
else:
retStruct.photoCalibs = pcgen
return retStruct
def generateTractOutputProducts(self, dataRefDict, tract,
visitCat, zptCat, atmCat, stdCat,
fgcmBuildStarsConfig,
returnCatalogs=True,
butler=None):
physicalFilterMap = fgcmBuildStarsConfig.physicalFilterMap
md = stdCat.getMetadata()
bands = md.getArray('BANDS')
if self.config.doComposeWcsJacobian and not fgcmBuildStarsConfig.doApplyWcsJacobian:
raise RuntimeError("Cannot compose the WCS jacobian if it hasn't been applied "
"in fgcmBuildStarsTask.")
if not self.config.doComposeWcsJacobian and fgcmBuildStarsConfig.doApplyWcsJacobian:
self.log.warning("Jacobian was applied in build-stars but doComposeWcsJacobian is not set.")
if self.config.doReferenceCalibration:
lutCat = dataRefDict['fgcmLookUpTable'].get()
offsets = self._computeReferenceOffsets(stdCat, lutCat, bands, physicalFilterMap)
else:
offsets = np.zeros(len(bands))
if self.config.doRefcatOutput and butler is not None:
# Create a special config that has the tract number in it
datasetConfig = copy.copy(self.config.datasetConfig)
datasetConfig.ref_dataset_name = '%s_%d' % (self.config.datasetConfig.ref_dataset_name,
tract)
self._outputStandardStars(butler, stdCat, offsets, bands, datasetConfig)
if self.config.doZeropointOutput:
pcgen = self._outputZeropoints(dataRefDict['camera'], zptCat, visitCat, offsets, bands,
physicalFilterMap, returnCatalogs=returnCatalogs)
else:
pcgen = None
if self.config.doAtmosphereOutput:
atmgen = self._outputAtmospheres(dataRefDict, atmCat)
else:
atmgen = None
retStruct = pipeBase.Struct(offsets=offsets,
atmospheres=atmgen)
if returnCatalogs:
retStruct.photoCalibCatalogs = pcgen
else:
retStruct.photoCalibs = pcgen
return retStruct
def _computeReferenceOffsets(self, stdCat, lutCat, physicalFilterMap, bands):
# Only use stars that are observed in all the bands that were actually used
# This will ensure that we use the same healpix pixels for the absolute
# calibration of each band.
minObs = stdCat['ngood'].min(axis=1)
goodStars = (minObs >= 1)
stdCat = stdCat[goodStars]
self.log.info("Found %d stars with at least 1 good observation in each band" %
(len(stdCat)))
# Associate each band with the appropriate physicalFilter and make
# filterLabels
filterLabels = []
lutPhysicalFilters = lutCat[0]['physicalFilters'].split(',')
lutStdPhysicalFilters = lutCat[0]['stdPhysicalFilters'].split(',')
physicalFilterMapBands = list(physicalFilterMap.values())
physicalFilterMapFilters = list(physicalFilterMap.keys())
for band in bands:
# Find a physical filter associated from the band by doing
# a reverse lookup on the physicalFilterMap dict
physicalFilterMapIndex = physicalFilterMapBands.index(band)
physicalFilter = physicalFilterMapFilters[physicalFilterMapIndex]
# Find the appropriate fgcm standard physicalFilter
lutPhysicalFilterIndex = lutPhysicalFilters.index(physicalFilter)
stdPhysicalFilter = lutStdPhysicalFilters[lutPhysicalFilterIndex]
filterLabels.append(afwImage.FilterLabel(band=band,
physical=stdPhysicalFilter))
# We have to make a table for each pixel with flux/fluxErr
# This is a temporary table generated for input to the photoCal task.
# These fluxes are not instFlux (they are top-of-the-atmosphere approximate and
# have had chromatic corrections applied to get to the standard system
# specified by the atmosphere/instrumental parameters), nor are they
# in Jansky (since they don't have a proper absolute calibration: the overall
# zeropoint is estimated from the telescope size, etc.)
sourceMapper = afwTable.SchemaMapper(stdCat.schema)
sourceMapper.addMinimalSchema(afwTable.SimpleTable.makeMinimalSchema())
sourceMapper.editOutputSchema().addField('instFlux', type=np.float64,
doc="instrumental flux (counts)")
sourceMapper.editOutputSchema().addField('instFluxErr', type=np.float64,
doc="instrumental flux error (counts)")
badStarKey = sourceMapper.editOutputSchema().addField('flag_badStar',
type='Flag',
doc="bad flag")
# Split up the stars
# Note that there is an assumption here that the ra/dec coords stored
# on-disk are in radians, and therefore that starObs['coord_ra'] /
# starObs['coord_dec'] return radians when used as an array of numpy float64s.
theta = np.pi/2. - stdCat['coord_dec']
phi = stdCat['coord_ra']
ipring = hp.ang2pix(self.config.referencePixelizationNside, theta, phi)
h, rev = esutil.stat.histogram(ipring, rev=True)
gdpix, = np.where(h >= self.config.referencePixelizationMinStars)
self.log.info("Found %d pixels (nside=%d) with at least %d good stars" %
(gdpix.size,
self.config.referencePixelizationNside,
self.config.referencePixelizationMinStars))
if gdpix.size < self.config.referencePixelizationNPixels:
self.log.warning("Found fewer good pixels (%d) than preferred in configuration (%d)" %
(gdpix.size, self.config.referencePixelizationNPixels))
else:
# Sample out the pixels we want to use
gdpix = np.random.choice(gdpix, size=self.config.referencePixelizationNPixels, replace=False)
results = np.zeros(gdpix.size, dtype=[('hpix', 'i4'),
('nstar', 'i4', len(bands)),
('nmatch', 'i4', len(bands)),
('zp', 'f4', len(bands)),
('zpErr', 'f4', len(bands))])
results['hpix'] = ipring[rev[rev[gdpix]]]
# We need a boolean index to deal with catalogs...
selected = np.zeros(len(stdCat), dtype=bool)
refFluxFields = [None]*len(bands)
for p_index, pix in enumerate(gdpix):
i1a = rev[rev[pix]: rev[pix + 1]]
# the stdCat afwTable can only be indexed with boolean arrays,
# and not numpy index arrays (see DM-16497). This little trick
# converts the index array into a boolean array
selected[:] = False
selected[i1a] = True
for b_index, filterLabel in enumerate(filterLabels):
struct = self._computeOffsetOneBand(sourceMapper, badStarKey, b_index,
filterLabel, stdCat,
selected, refFluxFields)
results['nstar'][p_index, b_index] = len(i1a)
results['nmatch'][p_index, b_index] = len(struct.arrays.refMag)
results['zp'][p_index, b_index] = struct.zp
results['zpErr'][p_index, b_index] = struct.sigma
# And compute the summary statistics
offsets = np.zeros(len(bands))
for b_index, band in enumerate(bands):
# make configurable
ok, = np.where(results['nmatch'][:, b_index] >= self.config.referenceMinMatch)
offsets[b_index] = np.median(results['zp'][ok, b_index])
# use median absolute deviation to estimate Normal sigma
# see https://en.wikipedia.org/wiki/Median_absolute_deviation
madSigma = 1.4826*np.median(np.abs(results['zp'][ok, b_index] - offsets[b_index]))
self.log.info("Reference catalog offset for %s band: %.12f +/- %.12f",
band, offsets[b_index], madSigma)
return offsets
def _computeOffsetOneBand(self, sourceMapper, badStarKey,
b_index, filterLabel, stdCat, selected, refFluxFields):
Definition at line 899 of file fgcmOutputProducts.py.