22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
29import astropy.table
as tb
41from .coaddBase
import makeSkyInfo
42from .simpleAssociation
import SimpleAssociationTask
46 dimensions=(
"tract",
"patch",
"skymap"),
47 defaultTemplates={
"coaddName":
"deep",
50 diaSourceTables = pipeBase.connectionTypes.Input(
51 doc=
"Set of catalogs of calibrated DiaSources.",
52 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
53 storageClass=
"DataFrame",
54 dimensions=(
"instrument",
"visit",
"detector"),
58 ssObjectTableRefs = pipeBase.connectionTypes.Input(
59 doc=
"Reference to catalogs of SolarSolarSystem objects expected to be "
60 "observable in each (visit, detector).",
61 name=
"preloaded_DRP_SsObjects",
62 storageClass=
"ArrowAstropy",
63 dimensions=(
"instrument",
"visit",
"detector"),
68 skyMap = pipeBase.connectionTypes.Input(
69 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
71 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
72 storageClass=
"SkyMap",
73 dimensions=(
"skymap", ),
75 finalVisitSummaryRefs = pipeBase.connectionTypes.Input(
76 doc=
"Reference to finalVisitSummary of each exposure, containing "
77 "visitInfo, bbox, and wcs.",
78 name=
"finalVisitSummary",
79 storageClass=
"ExposureCatalog",
80 dimensions=(
"instrument",
"visit"),
84 assocDiaSourceTable = pipeBase.connectionTypes.Output(
85 doc=
"Catalog of DiaSources covering the patch and associated with a "
87 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
88 storageClass=
"DataFrame",
89 dimensions=(
"tract",
"patch"),
91 associatedSsSources = pipeBase.connectionTypes.Output(
92 doc=
"Optional output storing ssSource data computed during association.",
93 name=
"{fakesType}{coaddName}Diff_assocSsSrcTable",
94 storageClass=
"ArrowAstropy",
95 dimensions=(
"tract",
"patch"),
97 unassociatedSsObjects = pipeBase.connectionTypes.Output(
98 doc=
"Expected locations of ssObjects with no associated source.",
99 name=
"{fakesType}{coaddName}Diff_unassocSsObjTable",
100 storageClass=
"ArrowAstropy",
101 dimensions=(
"tract",
"patch"),
103 diaObjectTable = pipeBase.connectionTypes.Output(
104 doc=
"Catalog of DiaObjects created from spatially associating "
106 name=
"{fakesType}{coaddName}Diff_diaObjTable",
107 storageClass=
"DataFrame",
108 dimensions=(
"tract",
"patch"),
111 def __init__(self, *, config=None):
112 super().__init__(config=config)
114 if not config.doSolarSystemAssociation:
115 del self.ssObjectTableRefs
116 del self.associatedSsSources
117 del self.unassociatedSsObjects
118 del self.finalVisitSummaryRefs
121class DrpAssociationPipeConfig(
122 pipeBase.PipelineTaskConfig,
123 pipelineConnections=DrpAssociationPipeConnections):
124 associator = pexConfig.ConfigurableField(
125 target=SimpleAssociationTask,
126 doc=
"Task used to associate DiaSources with DiaObjects.",
128 solarSystemAssociator = pexConfig.ConfigurableField(
129 target=SolarSystemAssociationTask,
130 doc=
"Task used to associate DiaSources with SolarSystemObjects.",
132 doAddDiaObjectCoords = pexConfig.Field(
135 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
136 "Duplicates information, but needed for bulk ingest into qserv."
138 doWriteEmptyTables = pexConfig.Field(
141 doc=
"If True, construct and write out empty diaSource and diaObject "
142 "tables. If False, raise NoWorkFound"
144 doSolarSystemAssociation = pexConfig.Field(
147 doc=
"Process SolarSystem objects through the pipeline.",
149 idGenerator = SkyMapIdGeneratorConfig.make_field()
152class DrpAssociationPipeTask(pipeBase.PipelineTask):
153 """Driver pipeline for loading DiaSource catalogs in a patch/tract
154 region and associating them.
156 ConfigClass = DrpAssociationPipeConfig
157 _DefaultName =
"drpAssociation"
159 def __init__(self, **kwargs):
160 super().__init__(**kwargs)
161 self.makeSubtask(
'associator')
162 if self.config.doSolarSystemAssociation:
163 self.makeSubtask(
"solarSystemAssociator")
165 def runQuantum(self, butlerQC, inputRefs, outputRefs):
166 inputs = butlerQC.get(inputRefs)
168 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
169 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
170 inputs[
"idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
171 if not self.config.doSolarSystemAssociation:
172 inputs[
"ssObjectTableRefs"] = []
173 inputs[
"finalVisitSummaryRefs"] = []
174 outputs = self.run(**inputs)
175 butlerQC.put(outputs, outputRefs)
181 finalVisitSummaryRefs,
185 """Trim DiaSources to the current Patch and run association.
187 Takes in the set of DiaSource catalogs that covers the current patch,
188 trims them to the dimensions of the patch, and [TODO: eventually]
189 runs association on the concatenated DiaSource Catalog.
193 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
194 Set of DiaSource catalogs potentially covering this patch/tract.
195 ssObjectTableRefs: `list` of `lsst.daf.butler.DeferredDatasetHandle`
196 Set of known SSO ephemerides potentially covering this patch/tract.
197 skyMap : `lsst.skymap.BaseSkyMap`
198 SkyMap defining the patch/tract
199 visitInfoRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
200 Reference to finalVisitSummary of each exposure potentially
201 covering this patch/tract, which contain visitInfo, bbox, and wcs
203 Id of current tract being processed.
205 Id of current patch being processed.
206 idGenerator : `lsst.meas.base.IdGenerator`, optional
207 Object that generates Object IDs and random number generator seeds.
211 output : `lsst.pipe.base.Struct`
212 Results struct with attributes:
214 ``assocDiaSourceTable``
215 Table of DiaSources with updated value for diaObjectId.
218 Table of DiaObjects from matching DiaSources
219 (`pandas.DataFrame`).
221 self.log.info(
"Running DPR Association on patch %i, tract %i...",
224 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
227 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
228 innerTractSkyRegion = skyInfo.tractInfo.getInnerSkyRegion()
230 def visitDetectorPair(dataRef):
231 return (dataRef.dataId[
"visit"], dataRef.dataId[
"detector"])
234 diaIdDict, ssObjectIdDict, finalVisitSummaryIdDict = {}, {}, {}
235 for diaCatRef
in diaSourceTables:
236 diaIdDict[visitDetectorPair(diaCatRef)] = diaCatRef
237 if self.config.doSolarSystemAssociation:
238 for ssCatRef
in ssObjectTableRefs:
239 ssObjectIdDict[visitDetectorPair(ssCatRef)] = ssCatRef
240 for finalVisitSummaryRef
in finalVisitSummaryRefs:
241 finalVisitSummaryIdDict[finalVisitSummaryRef.dataId[
"visit"]] = finalVisitSummaryRef
243 diaSourceHistory, ssSourceHistory, unassociatedSsObjectHistory = [], [], []
244 for visit, detector
in diaIdDict:
245 diaCatRef = diaIdDict[(visit, detector)]
246 diaCat = diaCatRef.get()
247 associatedSsSources, unassociatedSsObjects =
None,
None
248 nSsSrc, nSsObj = 0, 0
250 if all([(visit, detector)
in ssObjectIdDict
and visit
in finalVisitSummaryIdDict]):
252 ssCat = ssObjectIdDict[(visit, detector)].get()
253 finalVisitSummary = finalVisitSummaryIdDict[visit].get()
255 visitInfo = finalVisitSummary.find(detector).visitInfo
256 bbox = finalVisitSummary.find(detector).getBBox()
257 wcs = finalVisitSummary.find(detector).wcs
258 ssoAssocResult = self.solarSystemAssociator.run(
259 tb.Table.from_pandas(diaCat),
266 associatedSsSources = ssoAssocResult.associatedSsSources
267 associatedSsDiaSources = ssoAssocResult.ssoAssocDiaSources
268 ssInTractPatch = self._trimToPatch(associatedSsSources.to_pandas(),
272 associatedSsSources = associatedSsSources[ssInTractPatch]
273 assocDiaSrcIds = set(associatedSsSources[
'diaSourceId'])
274 diaSrcMask = [diaId
in assocDiaSrcIds
for diaId
in associatedSsDiaSources[
'diaSourceId']]
275 associatedSsDiaSources = associatedSsDiaSources[np.array(diaSrcMask)]
277 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects
278 ssObjInTractPatch = self._trimToPatch(unassociatedSsObjects.to_pandas(),
282 unassociatedSsObjects = unassociatedSsObjects[ssObjInTractPatch]
283 nSsSrc = ssInTractPatch.sum()
284 nSsObj = ssObjInTractPatch.sum()
285 if len(ssoAssocResult.unAssocDiaSources) > 0:
286 diaCat = ssoAssocResult.unAssocDiaSources.to_pandas()
288 diaCat = pd.DataFrame(columns=ssoAssocResult.unAssocDiaSources.columns)
290 diaInTractPatch = self._trimToPatch(diaCat,
294 diaCat = diaCat[diaInTractPatch]
296 nDiaSrc = diaInTractPatch.sum()
299 "Read DiaSource catalog of length %i from visit %i, "
300 "detector %i. Found %i sources within the patch/tract "
301 "footprint, including %i associated with SSOs.",
302 len(diaCat), diaCatRef.dataId[
"visit"],
303 diaCatRef.dataId[
"detector"], nDiaSrc + nSsSrc, nSsSrc)
306 diaSourceHistory.append(diaCat)
308 ssSourceHistory.append(associatedSsSources)
309 diaSourceHistory.append(associatedSsDiaSources.to_pandas())
311 unassociatedSsObjects[
'visit'] = visit
312 unassociatedSsObjects[
'detector'] = detector
313 unassociatedSsObjectHistory.append(unassociatedSsObjects)
316 diaSourceHistoryCat = pd.concat(diaSourceHistory)
318 diaSourceHistoryCat = diaCat.drop(diaCat.index)
319 if self.config.doSolarSystemAssociation:
320 nSsSrc, nSsObj = 0, 0
322 ssSourceHistoryCat = tb.vstack(ssSourceHistory)
323 ssSourceHistoryCat.remove_columns([
'ra',
'dec'])
324 nSsSrc = len(ssSourceHistoryCat)
326 ssSourceHistoryCat = associatedSsSources
327 if unassociatedSsObjectHistory:
328 unassociatedSsObjectHistoryCat = tb.vstack(unassociatedSsObjectHistory)
329 nSsObj = len(unassociatedSsObjectHistoryCat)
331 unassociatedSsObjectHistoryCat = unassociatedSsObjects
332 self.log.info(
"Found %i ssSources and %i missing ssObjects in patch %i, tract %i",
333 nSsSrc, nSsObj, patchId, tractId)
335 ssSourceHistoryCat =
None
336 unassociatedSsObjectHistoryCat =
None
338 if (
not diaSourceHistory)
and not (self.config.doSolarSystemAssociation
and ssSourceHistory):
339 if not self.config.doWriteEmptyTables:
340 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
342 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
343 len(diaSourceHistoryCat), patchId, tractId)
345 assocResult = self.associator.run(diaSourceHistoryCat, idGenerator=idGenerator)
347 self.log.info(
"Associated DiaSources into %i DiaObjects",
348 len(assocResult.diaObjects))
350 if self.config.doAddDiaObjectCoords:
351 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
352 assocResult.assocDiaSources)
354 return pipeBase.Struct(
355 diaObjectTable=assocResult.diaObjects,
356 assocDiaSourceTable=assocResult.assocDiaSources,
357 associatedSsSources=ssSourceHistoryCat,
358 unassociatedSsObjects=unassociatedSsObjectHistoryCat,
362 obj = objects[[
'ra',
'dec']].rename(columns={
"ra":
"coord_ra",
"dec":
"coord_dec"})
363 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
364 how=
'inner').set_index(
'diaSourceId')
368 """Create generator testing if a set of DiaSources are in the
373 cat : `pandas.DataFrame`
374 Catalog of DiaSources to test within patch/tract.
375 innerPatchBox : `lsst.geom.Box2D`
376 Bounding box of the patch.
377 wcs : `lsst.geom.SkyWcs`
382 isInPatch : `numpy.ndarray`, (N,)
383 Booleans representing if the DiaSources are contained within the
384 current patch and tract.
386 isInPatch = np.zeros(len(cat), dtype=bool)
388 for idx, row
in cat.iterrows():
390 pxCoord = wcs.skyToPixel(spPoint)
391 ra_rad = np.deg2rad(row[
"ra"])
392 dec_rad = np.deg2rad(row[
"dec"])
394 isInPatch[idx] = innerPatchBox.contains(pxCoord)
and innerTractSkyRegion.contains(ra_rad, dec_rad)
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
_addDiaObjectCoords(self, objects, sources)
_trimToPatch(self, cat, innerPatchBox, innerTractSkyRegion, wcs)