23 """Build star observations for input to FGCM using sourceTable_visit.
25 This task finds all the visits and sourceTable_visits in a repository (or a
26 subset based on command line parameters) and extracts all the potential
27 calibration stars for input into fgcm. This task additionally uses fgcm to
28 match star observations into unique stars, and performs as much cleaning of the
29 input catalog as possible.
44 from .fgcmBuildStarsBase
import FgcmBuildStarsConfigBase, FgcmBuildStarsRunner, FgcmBuildStarsBaseTask
45 from .utilities
import computeApproxPixelAreaFields, computeApertureRadiusFromDataRef
46 from .utilities
import lookupStaticCalibrations
48 __all__ = [
'FgcmBuildStarsTableConfig',
'FgcmBuildStarsTableTask']
52 dimensions=(
"instrument",),
54 camera = connectionTypes.PrerequisiteInput(
55 doc=
"Camera instrument",
57 storageClass=
"Camera",
58 dimensions=(
"instrument",),
59 lookupFunction=lookupStaticCalibrations,
63 fgcmLookUpTable = connectionTypes.PrerequisiteInput(
64 doc=(
"Atmosphere + instrument look-up-table for FGCM throughput and "
65 "chromatic corrections."),
66 name=
"fgcmLookUpTable",
67 storageClass=
"Catalog",
68 dimensions=(
"instrument",),
72 sourceSchema = connectionTypes.PrerequisiteInput(
73 doc=
"Schema for source catalogs",
75 storageClass=
"SourceCatalog",
79 refCat = connectionTypes.PrerequisiteInput(
80 doc=
"Reference catalog to use for photometric calibration",
82 storageClass=
"SimpleCatalog",
83 dimensions=(
"skypix",),
88 sourceTable_visit = connectionTypes.Input(
89 doc=
"Source table in parquet format, per visit",
90 name=
"sourceTable_visit",
91 storageClass=
"DataFrame",
92 dimensions=(
"instrument",
"visit"),
97 visitSummary = connectionTypes.Input(
98 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
99 "detector id for the id and must be sorted for fast lookups of a "
102 storageClass=
"ExposureCatalog",
103 dimensions=(
"instrument",
"visit"),
108 background = connectionTypes.Input(
109 doc=
"Calexp background model",
110 name=
"calexpBackground",
111 storageClass=
"Background",
112 dimensions=(
"instrument",
"visit",
"detector"),
117 fgcmVisitCatalog = connectionTypes.Output(
118 doc=
"Catalog of visit information for fgcm",
119 name=
"fgcmVisitCatalog",
120 storageClass=
"Catalog",
121 dimensions=(
"instrument",),
124 fgcmStarObservations = connectionTypes.Output(
125 doc=
"Catalog of star observations for fgcm",
126 name=
"fgcmStarObservations",
127 storageClass=
"Catalog",
128 dimensions=(
"instrument",),
131 fgcmStarIds = connectionTypes.Output(
132 doc=
"Catalog of fgcm calibration star IDs",
134 storageClass=
"Catalog",
135 dimensions=(
"instrument",),
138 fgcmStarIndices = connectionTypes.Output(
139 doc=
"Catalog of fgcm calibration star indices",
140 name=
"fgcmStarIndices",
141 storageClass=
"Catalog",
142 dimensions=(
"instrument",),
145 fgcmReferenceStars = connectionTypes.Output(
146 doc=
"Catalog of fgcm-matched reference stars",
147 name=
"fgcmReferenceStars",
148 storageClass=
"Catalog",
149 dimensions=(
"instrument",),
155 if not config.doReferenceMatches:
156 self.prerequisiteInputs.remove(
"refCat")
157 self.prerequisiteInputs.remove(
"fgcmLookUpTable")
159 if not config.doModelErrorsWithBackground:
160 self.inputs.remove(
"background")
162 if not config.doReferenceMatches:
163 self.outputs.remove(
"fgcmReferenceStars")
167 pipelineConnections=FgcmBuildStarsTableConnections):
168 """Config for FgcmBuildStarsTableTask"""
170 referenceCCD = pexConfig.Field(
171 doc=
"Reference CCD for checking PSF and background",
193 sourceSelector.flags.bad = [
'PixelFlags_edge',
194 'PixelFlags_interpolatedCenter',
195 'PixelFlags_saturatedCenter',
196 'PixelFlags_crCenter',
198 'PixelFlags_interpolated',
199 'PixelFlags_saturated',
205 sourceSelector.flags.bad.append(localBackgroundFlagName)
210 sourceSelector.isolated.parentName =
'parentSourceId'
211 sourceSelector.isolated.nChildName =
'Deblend_nChild'
213 sourceSelector.unresolved.name =
'extendedness'
218 Build stars for the FGCM global calibration, using sourceTable_visit catalogs.
220 ConfigClass = FgcmBuildStarsTableConfig
221 RunnerClass = FgcmBuildStarsRunner
222 _DefaultName =
"fgcmBuildStarsTable"
224 canMultiprocess =
False
227 inputRefDict = butlerQC.get(inputRefs)
229 sourceTableRefs = inputRefDict[
'sourceTable_visit']
231 self.log.
info(
"Running with %d sourceTable_visit dataRefs",
232 len(sourceTableRefs))
234 sourceTableDataRefDict = {sourceTableRef.dataId[
'visit']: sourceTableRef
for
235 sourceTableRef
in sourceTableRefs}
237 if self.config.doReferenceMatches:
239 lutDataRef = inputRefDict[
'fgcmLookUpTable']
242 refConfig = self.config.fgcmLoadReferenceCatalog.refObjLoader
244 for ref
in inputRefs.refCat],
245 refCats=butlerQC.get(inputRefs.refCat),
248 self.makeSubtask(
'fgcmLoadReferenceCatalog', refObjLoader=refObjLoader)
254 calibFluxApertureRadius =
None
255 if self.config.doSubtractLocalBackground:
258 self.config.instFluxField)
259 except RuntimeError
as e:
260 raise RuntimeError(
"Could not determine aperture radius from %s. "
261 "Cannot use doSubtractLocalBackground." %
262 (self.config.instFluxField))
from e
264 visitSummaryRefs = inputRefDict[
'visitSummary']
265 visitSummaryDataRefDict = {visitSummaryRef.dataId[
'visit']: visitSummaryRef
for
266 visitSummaryRef
in visitSummaryRefs}
268 camera = inputRefDict[
'camera']
269 groupedDataRefs = self.
_groupDataRefs_groupDataRefs(sourceTableDataRefDict,
270 visitSummaryDataRefDict)
272 if self.config.doModelErrorsWithBackground:
273 bkgRefs = inputRefDict[
'background']
274 bkgDataRefDict = {(bkgRef.dataId.byName()[
'visit'],
275 bkgRef.dataId.byName()[
'detector']): bkgRef
for
278 bkgDataRefDict =
None
283 bkgDataRefDict=bkgDataRefDict,
284 visitCatDataRef=
None,
287 rad = calibFluxApertureRadius
288 sourceSchemaDataRef = inputRefDict[
'sourceSchema']
293 calibFluxApertureRadius=rad,
295 visitCatDataRef=
None,
298 butlerQC.put(visitCat, outputRefs.fgcmVisitCatalog)
299 butlerQC.put(fgcmStarObservationCat, outputRefs.fgcmStarObservations)
301 fgcmStarIdCat, fgcmStarIndicesCat, fgcmRefCat = self.
fgcmMatchStarsfgcmMatchStars(visitCat,
302 fgcmStarObservationCat,
303 lutDataRef=lutDataRef)
305 butlerQC.put(fgcmStarIdCat, outputRefs.fgcmStarIds)
306 butlerQC.put(fgcmStarIndicesCat, outputRefs.fgcmStarIndices)
307 if fgcmRefCat
is not None:
308 butlerQC.put(fgcmRefCat, outputRefs.fgcmReferenceStars)
311 def _makeArgumentParser(cls):
312 """Create an argument parser"""
313 parser = pipeBase.ArgumentParser(name=cls.
_DefaultName_DefaultName)
314 parser.add_id_argument(
"--id",
"sourceTable_visit", help=
"Data ID, e.g. --id visit=6789")
318 def _groupDataRefs(self, sourceTableDataRefDict, visitSummaryDataRefDict):
319 """Group sourceTable and visitSummary dataRefs (gen3 only).
323 sourceTableDataRefDict : `dict` [`int`, `str`]
324 Dict of source tables, keyed by visit.
325 visitSummaryDataRefDict : `dict` [int, `str`]
326 Dict of visit summary catalogs, keyed by visit.
330 groupedDataRefs : `dict` [`int`, `list`]
331 Dictionary with sorted visit keys, and `list`s with
332 `lsst.daf.butler.DeferredDataSetHandle`. The first
333 item in the list will be the visitSummary ref, and
334 the second will be the source table ref.
336 groupedDataRefs = collections.defaultdict(list)
337 visits = sorted(sourceTableDataRefDict.keys())
340 groupedDataRefs[visit] = [visitSummaryDataRefDict[visit],
341 sourceTableDataRefDict[visit]]
343 return groupedDataRefs
345 def _findAndGroupDataRefsGen2(self, butler, camera, dataRefs):
346 self.log.
info(
"Grouping dataRefs by %s", (self.config.visitDataRefName))
349 for detector
in camera:
350 ccdIds.append(detector.getId())
354 ccdIds.insert(0, self.config.referenceCCD)
361 groupedDataRefs = collections.defaultdict(list)
362 for dataRef
in dataRefs:
363 visit = dataRef.dataId[self.config.visitDataRefName]
369 calexpRef = butler.dataRef(
'calexp', dataId={self.config.visitDataRefName: visit,
370 self.config.ccdDataRefName: ccdId})
376 if not calexpRef.datasetExists():
381 groupedDataRefs[visit].
append(calexpRef)
385 groupedDataRefs[visit].
append(dataRef)
388 return dict(sorted(groupedDataRefs.items()))
393 calibFluxApertureRadius=None,
394 visitCatDataRef=None,
397 startTime = time.time()
403 if (visitCatDataRef
is not None and starObsDataRef
is None
404 or visitCatDataRef
is None and starObsDataRef
is not None):
405 self.log.
warn(
"Only one of visitCatDataRef and starObsDataRef are set, so "
406 "no checkpoint files will be persisted.")
408 if self.config.doSubtractLocalBackground
and calibFluxApertureRadius
is None:
409 raise RuntimeError(
"Must set calibFluxApertureRadius if doSubtractLocalBackground is True.")
413 sourceSchema = sourceSchemaDataRef.get().schema
415 outputSchema = sourceMapper.getOutputSchema()
419 for ccdIndex, detector
in enumerate(camera):
420 ccdMapping[detector.getId()] = ccdIndex
424 if inStarObsCat
is not None:
425 fullCatalog = inStarObsCat
426 comp1 = fullCatalog.schema.compare(outputSchema, outputSchema.EQUAL_KEYS)
427 comp2 = fullCatalog.schema.compare(outputSchema, outputSchema.EQUAL_NAMES)
428 if not comp1
or not comp2:
429 raise RuntimeError(
"Existing fgcmStarObservations file found with mismatched schema.")
433 visitKey = outputSchema[
'visit'].asKey()
434 ccdKey = outputSchema[
'ccd'].asKey()
435 instMagKey = outputSchema[
'instMag'].asKey()
436 instMagErrKey = outputSchema[
'instMagErr'].asKey()
439 if self.config.doSubtractLocalBackground:
440 localBackgroundArea = np.pi*calibFluxApertureRadius**2.
447 for counter, visit
in enumerate(visitCat):
449 if visit[
'sources_read']:
452 expTime = visit[
'exptime']
454 dataRef = groupedDataRefs[visit[
'visit']][-1]
457 srcTable = dataRef.get()
458 df = srcTable.toDataFrame(columns)
460 df = dataRef.get(parameters={
'columns': columns})
462 goodSrc = self.sourceSelector.selectSources(df)
466 if self.config.doSubtractLocalBackground:
467 localBackground = localBackgroundArea*df[self.config.localBackgroundFluxField].values
468 use, = np.where((goodSrc.selected)
469 & ((df[self.config.instFluxField].values - localBackground) > 0.0))
471 use, = np.where(goodSrc.selected)
474 tempCat.resize(use.size)
476 tempCat[
'ra'][:] = np.deg2rad(df[
'ra'].values[use])
477 tempCat[
'dec'][:] = np.deg2rad(df[
'decl'].values[use])
478 tempCat[
'x'][:] = df[
'x'].values[use]
479 tempCat[
'y'][:] = df[
'y'].values[use]
482 tempCat[visitKey][:] = df[
'visit'].values[use]
483 tempCat[ccdKey][:] = df[
'ccd'].values[use]
484 tempCat[
'psf_candidate'] = df[
'Calib_psf_candidate'].values[use]
486 if self.config.doSubtractLocalBackground:
500 tempCat[
'deltaMagBkg'] = (-2.5*np.log10(df[self.config.instFluxField].values[use]
501 - localBackground[use]) -
502 -2.5*np.log10(df[self.config.instFluxField].values[use]))
504 tempCat[
'deltaMagBkg'][:] = 0.0
507 for detector
in camera:
508 ccdId = detector.getId()
510 use2 = (tempCat[ccdKey] == ccdId)
511 tempCat[
'jacobian'][use2] = approxPixelAreaFields[ccdId].evaluate(tempCat[
'x'][use2],
513 scaledInstFlux = (df[self.config.instFluxField].values[use[use2]]
514 * visit[
'scaling'][ccdMapping[ccdId]])
515 tempCat[instMagKey][use2] = (-2.5*np.log10(scaledInstFlux) + 2.5*np.log10(expTime))
519 tempCat[instMagErrKey][:] = k*(df[self.config.instFluxField +
'Err'].values[use]
520 / df[self.config.instFluxField].values[use])
523 if self.config.doApplyWcsJacobian:
524 tempCat[instMagKey][:] -= 2.5*np.log10(tempCat[
'jacobian'][:])
526 fullCatalog.extend(tempCat)
529 with np.warnings.catch_warnings():
531 np.warnings.simplefilter(
"ignore")
533 instMagIn = -2.5*np.log10(df[self.config.apertureInnerInstFluxField].values[use])
534 instMagErrIn = k*(df[self.config.apertureInnerInstFluxField +
'Err'].values[use]
535 / df[self.config.apertureInnerInstFluxField].values[use])
536 instMagOut = -2.5*np.log10(df[self.config.apertureOuterInstFluxField].values[use])
537 instMagErrOut = k*(df[self.config.apertureOuterInstFluxField +
'Err'].values[use]
538 / df[self.config.apertureOuterInstFluxField].values[use])
540 ok = (np.isfinite(instMagIn) & np.isfinite(instMagErrIn)
541 & np.isfinite(instMagOut) & np.isfinite(instMagErrOut))
543 visit[
'deltaAper'] = np.median(instMagIn[ok] - instMagOut[ok])
544 visit[
'sources_read'] =
True
546 self.log.
info(
" Found %d good stars in visit %d (deltaAper = %0.3f)",
547 use.size, visit[
'visit'], visit[
'deltaAper'])
549 if ((counter % self.config.nVisitsPerCheckpoint) == 0
550 and starObsDataRef
is not None and visitCatDataRef
is not None):
553 starObsDataRef.put(fullCatalog)
554 visitCatDataRef.put(visitCat)
556 self.log.
info(
"Found all good star observations in %.2f s" %
557 (time.time() - startTime))
561 def _get_sourceTable_visit_columns(self):
563 Get the sourceTable_visit columns from the config.
568 List of columns to read from sourceTable_visit
571 columns = [
'visit',
'ccd',
572 'ra',
'decl',
'x',
'y', self.config.psfCandidateName,
573 self.config.instFluxField, self.config.instFluxField +
'Err',
574 self.config.apertureInnerInstFluxField, self.config.apertureInnerInstFluxField +
'Err',
575 self.config.apertureOuterInstFluxField, self.config.apertureOuterInstFluxField +
'Err']
576 if self.sourceSelector.config.doFlags:
577 columns.extend(self.sourceSelector.config.flags.bad)
578 if self.sourceSelector.config.doUnresolved:
579 columns.append(self.sourceSelector.config.unresolved.name)
580 if self.sourceSelector.config.doIsolated:
581 columns.append(self.sourceSelector.config.isolated.parentName)
582 columns.append(self.sourceSelector.config.isolated.nChildName)
583 if self.config.doSubtractLocalBackground:
584 columns.append(self.config.localBackgroundFluxField)
def fgcmMatchStars(self, visitCat, obsCat, lutDataRef=None)
def _makeSourceMapper(self, sourceSchema)
def fgcmMakeVisitCatalog(self, camera, groupedDataRefs, bkgDataRefDict=None, visitCatDataRef=None, inVisitCat=None)
def fgcmMakeAllStarObservations(self, groupedDataRefs, visitCat, sourceSchemaDataRef, camera, calibFluxApertureRadius=None, visitCatDataRef=None, starObsDataRef=None, inStarObsCat=None)
apertureOuterInstFluxField
apertureInnerInstFluxField
doSubtractLocalBackground
apertureInnerInstFluxField
apertureOuterInstFluxField
def __init__(self, *config=None)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def _get_sourceTable_visit_columns(self)
def fgcmMakeAllStarObservations(self, groupedDataRefs, visitCat, sourceSchemaDataRef, camera, calibFluxApertureRadius=None, visitCatDataRef=None, starObsDataRef=None, inStarObsCat=None)
def _groupDataRefs(self, sourceTableDataRefDict, visitSummaryDataRefDict)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def computeApertureRadiusFromDataRef(dataRef, fluxField)
def computeApproxPixelAreaFields(camera)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.