23 """Build star observations for input to FGCM using sourceTable_visit.
25 This task finds all the visits and sourceTable_visits in a repository (or a
26 subset based on command line parameters) and extracts all the potential
27 calibration stars for input into fgcm. This task additionally uses fgcm to
28 match star observations into unique stars, and performs as much cleaning of the
29 input catalog as possible.
44 from .fgcmBuildStarsBase
import FgcmBuildStarsConfigBase, FgcmBuildStarsRunner, FgcmBuildStarsBaseTask
45 from .utilities
import computeApproxPixelAreaFields, computeApertureRadiusFromDataRef
46 from .utilities
import lookupStaticCalibrations
48 __all__ = [
'FgcmBuildStarsTableConfig',
'FgcmBuildStarsTableTask']
52 dimensions=(
"instrument",),
54 camera = connectionTypes.PrerequisiteInput(
55 doc=
"Camera instrument",
57 storageClass=
"Camera",
58 dimensions=(
"instrument",),
59 lookupFunction=lookupStaticCalibrations,
63 fgcmLookUpTable = connectionTypes.PrerequisiteInput(
64 doc=(
"Atmosphere + instrument look-up-table for FGCM throughput and "
65 "chromatic corrections."),
66 name=
"fgcmLookUpTable",
67 storageClass=
"Catalog",
68 dimensions=(
"instrument",),
72 sourceSchema = connectionTypes.InitInput(
73 doc=
"Schema for source catalogs",
75 storageClass=
"SourceCatalog",
78 refCat = connectionTypes.PrerequisiteInput(
79 doc=
"Reference catalog to use for photometric calibration",
81 storageClass=
"SimpleCatalog",
82 dimensions=(
"skypix",),
87 sourceTable_visit = connectionTypes.Input(
88 doc=
"Source table in parquet format, per visit",
89 name=
"sourceTable_visit",
90 storageClass=
"DataFrame",
91 dimensions=(
"instrument",
"visit"),
96 visitSummary = connectionTypes.Input(
97 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
98 "detector id for the id and must be sorted for fast lookups of a "
101 storageClass=
"ExposureCatalog",
102 dimensions=(
"instrument",
"visit"),
107 background = connectionTypes.Input(
108 doc=
"Calexp background model",
109 name=
"calexpBackground",
110 storageClass=
"Background",
111 dimensions=(
"instrument",
"visit",
"detector"),
116 fgcmVisitCatalog = connectionTypes.Output(
117 doc=
"Catalog of visit information for fgcm",
118 name=
"fgcmVisitCatalog",
119 storageClass=
"Catalog",
120 dimensions=(
"instrument",),
123 fgcmStarObservations = connectionTypes.Output(
124 doc=
"Catalog of star observations for fgcm",
125 name=
"fgcmStarObservations",
126 storageClass=
"Catalog",
127 dimensions=(
"instrument",),
130 fgcmStarIds = connectionTypes.Output(
131 doc=
"Catalog of fgcm calibration star IDs",
133 storageClass=
"Catalog",
134 dimensions=(
"instrument",),
137 fgcmStarIndices = connectionTypes.Output(
138 doc=
"Catalog of fgcm calibration star indices",
139 name=
"fgcmStarIndices",
140 storageClass=
"Catalog",
141 dimensions=(
"instrument",),
144 fgcmReferenceStars = connectionTypes.Output(
145 doc=
"Catalog of fgcm-matched reference stars",
146 name=
"fgcmReferenceStars",
147 storageClass=
"Catalog",
148 dimensions=(
"instrument",),
154 if not config.doReferenceMatches:
155 self.prerequisiteInputs.remove(
"refCat")
156 self.prerequisiteInputs.remove(
"fgcmLookUpTable")
158 if not config.doModelErrorsWithBackground:
159 self.inputs.remove(
"background")
161 if not config.doReferenceMatches:
162 self.outputs.remove(
"fgcmReferenceStars")
166 pipelineConnections=FgcmBuildStarsTableConnections):
167 """Config for FgcmBuildStarsTableTask"""
169 referenceCCD = pexConfig.Field(
170 doc=
"Reference CCD for checking PSF and background",
192 sourceSelector.flags.bad = [
'pixelFlags_edge',
193 'pixelFlags_interpolatedCenter',
194 'pixelFlags_saturatedCenter',
195 'pixelFlags_crCenter',
197 'pixelFlags_interpolated',
198 'pixelFlags_saturated',
204 sourceSelector.flags.bad.append(localBackgroundFlagName)
209 sourceSelector.isolated.parentName =
'parentSourceId'
210 sourceSelector.isolated.nChildName =
'deblend_nChild'
212 sourceSelector.unresolved.name =
'extendedness'
217 Build stars for the FGCM global calibration, using sourceTable_visit catalogs.
219 ConfigClass = FgcmBuildStarsTableConfig
220 RunnerClass = FgcmBuildStarsRunner
221 _DefaultName =
"fgcmBuildStarsTable"
223 canMultiprocess =
False
226 super().
__init__(initInputs=initInputs, **kwargs)
227 if initInputs
is not None:
231 inputRefDict = butlerQC.get(inputRefs)
233 sourceTableRefs = inputRefDict[
'sourceTable_visit']
235 self.log.
info(
"Running with %d sourceTable_visit dataRefs",
236 len(sourceTableRefs))
238 sourceTableDataRefDict = {sourceTableRef.dataId[
'visit']: sourceTableRef
for
239 sourceTableRef
in sourceTableRefs}
241 if self.config.doReferenceMatches:
243 lutDataRef = inputRefDict[
'fgcmLookUpTable']
246 refConfig = self.config.fgcmLoadReferenceCatalog.refObjLoader
248 for ref
in inputRefs.refCat],
249 refCats=butlerQC.get(inputRefs.refCat),
252 self.makeSubtask(
'fgcmLoadReferenceCatalog', refObjLoader=refObjLoader)
258 calibFluxApertureRadius =
None
259 if self.config.doSubtractLocalBackground:
262 self.config.instFluxField)
263 except RuntimeError
as e:
264 raise RuntimeError(
"Could not determine aperture radius from %s. "
265 "Cannot use doSubtractLocalBackground." %
266 (self.config.instFluxField))
from e
268 visitSummaryRefs = inputRefDict[
'visitSummary']
269 visitSummaryDataRefDict = {visitSummaryRef.dataId[
'visit']: visitSummaryRef
for
270 visitSummaryRef
in visitSummaryRefs}
272 camera = inputRefDict[
'camera']
273 groupedDataRefs = self.
_groupDataRefs_groupDataRefs(sourceTableDataRefDict,
274 visitSummaryDataRefDict)
276 if self.config.doModelErrorsWithBackground:
277 bkgRefs = inputRefDict[
'background']
278 bkgDataRefDict = {(bkgRef.dataId.byName()[
'visit'],
279 bkgRef.dataId.byName()[
'detector']): bkgRef
for
282 bkgDataRefDict =
None
287 bkgDataRefDict=bkgDataRefDict,
288 visitCatDataRef=
None,
291 rad = calibFluxApertureRadius
297 calibFluxApertureRadius=rad,
299 visitCatDataRef=
None,
302 butlerQC.put(visitCat, outputRefs.fgcmVisitCatalog)
303 butlerQC.put(fgcmStarObservationCat, outputRefs.fgcmStarObservations)
305 fgcmStarIdCat, fgcmStarIndicesCat, fgcmRefCat = self.
fgcmMatchStarsfgcmMatchStars(visitCat,
306 fgcmStarObservationCat,
307 lutDataRef=lutDataRef)
309 butlerQC.put(fgcmStarIdCat, outputRefs.fgcmStarIds)
310 butlerQC.put(fgcmStarIndicesCat, outputRefs.fgcmStarIndices)
311 if fgcmRefCat
is not None:
312 butlerQC.put(fgcmRefCat, outputRefs.fgcmReferenceStars)
315 def _makeArgumentParser(cls):
316 """Create an argument parser"""
317 parser = pipeBase.ArgumentParser(name=cls.
_DefaultName_DefaultName)
318 parser.add_id_argument(
"--id",
"sourceTable_visit", help=
"Data ID, e.g. --id visit=6789")
322 def _groupDataRefs(self, sourceTableDataRefDict, visitSummaryDataRefDict):
323 """Group sourceTable and visitSummary dataRefs (gen3 only).
327 sourceTableDataRefDict : `dict` [`int`, `str`]
328 Dict of source tables, keyed by visit.
329 visitSummaryDataRefDict : `dict` [int, `str`]
330 Dict of visit summary catalogs, keyed by visit.
334 groupedDataRefs : `dict` [`int`, `list`]
335 Dictionary with sorted visit keys, and `list`s with
336 `lsst.daf.butler.DeferredDataSetHandle`. The first
337 item in the list will be the visitSummary ref, and
338 the second will be the source table ref.
340 groupedDataRefs = collections.defaultdict(list)
341 visits = sorted(sourceTableDataRefDict.keys())
344 groupedDataRefs[visit] = [visitSummaryDataRefDict[visit],
345 sourceTableDataRefDict[visit]]
347 return groupedDataRefs
349 def _findAndGroupDataRefsGen2(self, butler, camera, dataRefs):
350 self.log.
info(
"Grouping dataRefs by %s", (self.config.visitDataRefName))
353 for detector
in camera:
354 ccdIds.append(detector.getId())
358 ccdIds.insert(0, self.config.referenceCCD)
365 groupedDataRefs = collections.defaultdict(list)
366 for dataRef
in dataRefs:
367 visit = dataRef.dataId[self.config.visitDataRefName]
373 calexpRef = butler.dataRef(
'calexp', dataId={self.config.visitDataRefName: visit,
374 self.config.ccdDataRefName: ccdId})
380 if not calexpRef.datasetExists():
385 groupedDataRefs[visit].
append(calexpRef)
389 groupedDataRefs[visit].
append(dataRef)
392 return dict(sorted(groupedDataRefs.items()))
397 calibFluxApertureRadius=None,
398 visitCatDataRef=None,
401 startTime = time.time()
407 if (visitCatDataRef
is not None and starObsDataRef
is None
408 or visitCatDataRef
is None and starObsDataRef
is not None):
409 self.log.
warning(
"Only one of visitCatDataRef and starObsDataRef are set, so "
410 "no checkpoint files will be persisted.")
412 if self.config.doSubtractLocalBackground
and calibFluxApertureRadius
is None:
413 raise RuntimeError(
"Must set calibFluxApertureRadius if doSubtractLocalBackground is True.")
418 outputSchema = sourceMapper.getOutputSchema()
422 for ccdIndex, detector
in enumerate(camera):
423 ccdMapping[detector.getId()] = ccdIndex
427 if inStarObsCat
is not None:
428 fullCatalog = inStarObsCat
429 comp1 = fullCatalog.schema.compare(outputSchema, outputSchema.EQUAL_KEYS)
430 comp2 = fullCatalog.schema.compare(outputSchema, outputSchema.EQUAL_NAMES)
431 if not comp1
or not comp2:
432 raise RuntimeError(
"Existing fgcmStarObservations file found with mismatched schema.")
436 visitKey = outputSchema[
'visit'].asKey()
437 ccdKey = outputSchema[
'ccd'].asKey()
438 instMagKey = outputSchema[
'instMag'].asKey()
439 instMagErrKey = outputSchema[
'instMagErr'].asKey()
442 if self.config.doSubtractLocalBackground:
443 localBackgroundArea = np.pi*calibFluxApertureRadius**2.
449 for counter, visit
in enumerate(visitCat):
451 if visit[
'sources_read']:
454 expTime = visit[
'exptime']
456 dataRef = groupedDataRefs[visit[
'visit']][-1]
459 srcTable = dataRef.get()
462 df = srcTable.toDataFrame(columns)
465 inColumns = dataRef.get(component=
'columns')
467 df = dataRef.get(parameters={
'columns': columns})
469 goodSrc = self.sourceSelector.selectSources(df)
473 if self.config.doSubtractLocalBackground:
474 localBackground = localBackgroundArea*df[self.config.localBackgroundFluxField].values
475 use, = np.where((goodSrc.selected)
476 & ((df[self.config.instFluxField].values - localBackground) > 0.0))
478 use, = np.where(goodSrc.selected)
481 tempCat.resize(use.size)
483 tempCat[
'ra'][:] = np.deg2rad(df[
'ra'].values[use])
484 tempCat[
'dec'][:] = np.deg2rad(df[
'decl'].values[use])
485 tempCat[
'x'][:] = df[
'x'].values[use]
486 tempCat[
'y'][:] = df[
'y'].values[use]
488 tempCat[visitKey][:] = df[
'visit'].values[use]
489 tempCat[ccdKey][:] = df[detColumn].values[use]
490 tempCat[
'psf_candidate'] = df[self.config.psfCandidateName].values[use]
492 if self.config.doSubtractLocalBackground:
506 tempCat[
'deltaMagBkg'] = (-2.5*np.log10(df[self.config.instFluxField].values[use]
507 - localBackground[use]) -
508 -2.5*np.log10(df[self.config.instFluxField].values[use]))
510 tempCat[
'deltaMagBkg'][:] = 0.0
513 for detector
in camera:
514 ccdId = detector.getId()
516 use2 = (tempCat[ccdKey] == ccdId)
517 tempCat[
'jacobian'][use2] = approxPixelAreaFields[ccdId].evaluate(tempCat[
'x'][use2],
519 scaledInstFlux = (df[self.config.instFluxField].values[use[use2]]
520 * visit[
'scaling'][ccdMapping[ccdId]])
521 tempCat[instMagKey][use2] = (-2.5*np.log10(scaledInstFlux) + 2.5*np.log10(expTime))
525 tempCat[instMagErrKey][:] = k*(df[self.config.instFluxField +
'Err'].values[use]
526 / df[self.config.instFluxField].values[use])
529 if self.config.doApplyWcsJacobian:
530 tempCat[instMagKey][:] -= 2.5*np.log10(tempCat[
'jacobian'][:])
532 fullCatalog.extend(tempCat)
535 with np.warnings.catch_warnings():
537 np.warnings.simplefilter(
"ignore")
539 instMagIn = -2.5*np.log10(df[self.config.apertureInnerInstFluxField].values[use])
540 instMagErrIn = k*(df[self.config.apertureInnerInstFluxField +
'Err'].values[use]
541 / df[self.config.apertureInnerInstFluxField].values[use])
542 instMagOut = -2.5*np.log10(df[self.config.apertureOuterInstFluxField].values[use])
543 instMagErrOut = k*(df[self.config.apertureOuterInstFluxField +
'Err'].values[use]
544 / df[self.config.apertureOuterInstFluxField].values[use])
546 ok = (np.isfinite(instMagIn) & np.isfinite(instMagErrIn)
547 & np.isfinite(instMagOut) & np.isfinite(instMagErrOut))
549 visit[
'deltaAper'] = np.median(instMagIn[ok] - instMagOut[ok])
550 visit[
'sources_read'] =
True
552 self.log.
info(
" Found %d good stars in visit %d (deltaAper = %0.3f)",
553 use.size, visit[
'visit'], visit[
'deltaAper'])
555 if ((counter % self.config.nVisitsPerCheckpoint) == 0
556 and starObsDataRef
is not None and visitCatDataRef
is not None):
559 starObsDataRef.put(fullCatalog)
560 visitCatDataRef.put(visitCat)
562 self.log.
info(
"Found all good star observations in %.2f s" %
563 (time.time() - startTime))
567 def _get_sourceTable_visit_columns(self, inColumns):
569 Get the sourceTable_visit columns from the config.
574 List of columns available in the sourceTable_visit
579 List of columns to read from sourceTable_visit.
580 detectorColumn : `str`
581 Name of the detector column.
583 if 'detector' in inColumns:
585 detectorColumn =
'detector'
588 detectorColumn =
'ccd'
590 columns = [
'visit', detectorColumn,
591 'ra',
'decl',
'x',
'y', self.config.psfCandidateName,
592 self.config.instFluxField, self.config.instFluxField +
'Err',
593 self.config.apertureInnerInstFluxField, self.config.apertureInnerInstFluxField +
'Err',
594 self.config.apertureOuterInstFluxField, self.config.apertureOuterInstFluxField +
'Err']
595 if self.sourceSelector.config.doFlags:
596 columns.extend(self.sourceSelector.config.flags.bad)
597 if self.sourceSelector.config.doUnresolved:
598 columns.append(self.sourceSelector.config.unresolved.name)
599 if self.sourceSelector.config.doIsolated:
600 columns.append(self.sourceSelector.config.isolated.parentName)
601 columns.append(self.sourceSelector.config.isolated.nChildName)
602 if self.config.doSubtractLocalBackground:
603 columns.append(self.config.localBackgroundFluxField)
605 return columns, detectorColumn
def fgcmMatchStars(self, visitCat, obsCat, lutDataRef=None)
def fgcmMakeAllStarObservations(self, groupedDataRefs, visitCat, sourceSchema, camera, calibFluxApertureRadius=None, visitCatDataRef=None, starObsDataRef=None, inStarObsCat=None)
def _makeSourceMapper(self, sourceSchema)
def fgcmMakeVisitCatalog(self, camera, groupedDataRefs, bkgDataRefDict=None, visitCatDataRef=None, inVisitCat=None)
apertureOuterInstFluxField
apertureInnerInstFluxField
doSubtractLocalBackground
apertureInnerInstFluxField
apertureOuterInstFluxField
def __init__(self, *config=None)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
def fgcmMakeAllStarObservations(self, groupedDataRefs, visitCat, sourceSchema, camera, calibFluxApertureRadius=None, visitCatDataRef=None, starObsDataRef=None, inStarObsCat=None)
def _get_sourceTable_visit_columns(self, inColumns)
def __init__(self, initInputs=None, **kwargs)
def _groupDataRefs(self, sourceTableDataRefDict, visitSummaryDataRefDict)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def computeApertureRadiusFromDataRef(dataRef, fluxField)
def computeApproxPixelAreaFields(camera)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.