22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
29import astropy.table
as tb
41from .coaddBase
import makeSkyInfo
42from .simpleAssociation
import SimpleAssociationTask
46 dimensions=(
"tract",
"patch",
"skymap"),
47 defaultTemplates={
"coaddName":
"deep",
50 diaSourceTables = pipeBase.connectionTypes.Input(
51 doc=
"Set of catalogs of calibrated DiaSources.",
52 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
53 storageClass=
"ArrowAstropy",
54 dimensions=(
"instrument",
"visit",
"detector"),
58 ssObjectTableRefs = pipeBase.connectionTypes.Input(
59 doc=
"Reference to catalogs of SolarSolarSystem objects expected to be "
60 "observable in each (visit, detector).",
61 name=
"preloaded_ss_object_visit",
62 storageClass=
"ArrowAstropy",
63 dimensions=(
"instrument",
"visit"),
68 skyMap = pipeBase.connectionTypes.Input(
69 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
71 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
72 storageClass=
"SkyMap",
73 dimensions=(
"skymap", ),
75 finalVisitSummaryRefs = pipeBase.connectionTypes.Input(
76 doc=
"Reference to finalVisitSummary of each exposure, containing "
77 "visitInfo, bbox, and wcs.",
78 name=
"finalVisitSummary",
79 storageClass=
"ExposureCatalog",
80 dimensions=(
"instrument",
"visit"),
84 assocDiaSourceTable = pipeBase.connectionTypes.Output(
85 doc=
"Catalog of DiaSources covering the patch and associated with a "
87 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
88 storageClass=
"DataFrame",
89 dimensions=(
"tract",
"patch"),
91 associatedSsSources = pipeBase.connectionTypes.Output(
92 doc=
"Optional output storing ssSource data computed during association.",
93 name=
"{fakesType}{coaddName}Diff_assocSsSrcTable",
94 storageClass=
"ArrowAstropy",
95 dimensions=(
"tract",
"patch"),
97 unassociatedSsObjects = pipeBase.connectionTypes.Output(
98 doc=
"Expected locations of ssObjects with no associated source.",
99 name=
"{fakesType}{coaddName}Diff_unassocSsObjTable",
100 storageClass=
"ArrowAstropy",
101 dimensions=(
"tract",
"patch"),
103 diaObjectTable = pipeBase.connectionTypes.Output(
104 doc=
"Catalog of DiaObjects created from spatially associating "
106 name=
"{fakesType}{coaddName}Diff_diaObjTable",
107 storageClass=
"DataFrame",
108 dimensions=(
"tract",
"patch"),
111 def __init__(self, *, config=None):
112 super().__init__(config=config)
114 if not config.doSolarSystemAssociation:
115 del self.ssObjectTableRefs
116 del self.associatedSsSources
117 del self.unassociatedSsObjects
118 del self.finalVisitSummaryRefs
121class DrpAssociationPipeConfig(
122 pipeBase.PipelineTaskConfig,
123 pipelineConnections=DrpAssociationPipeConnections):
124 associator = pexConfig.ConfigurableField(
125 target=SimpleAssociationTask,
126 doc=
"Task used to associate DiaSources with DiaObjects.",
128 solarSystemAssociator = pexConfig.ConfigurableField(
129 target=SolarSystemAssociationTask,
130 doc=
"Task used to associate DiaSources with SolarSystemObjects.",
132 doAddDiaObjectCoords = pexConfig.Field(
135 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
136 "Duplicates information, but needed for bulk ingest into qserv."
138 doWriteEmptyTables = pexConfig.Field(
141 doc=
"If True, construct and write out empty diaSource and diaObject "
142 "tables. If False, raise NoWorkFound"
144 doSolarSystemAssociation = pexConfig.Field(
147 doc=
"Process SolarSystem objects through the pipeline.",
149 idGenerator = SkyMapIdGeneratorConfig.make_field()
152class DrpAssociationPipeTask(pipeBase.PipelineTask):
153 """Driver pipeline for loading DiaSource catalogs in a patch/tract
154 region and associating them.
156 ConfigClass = DrpAssociationPipeConfig
157 _DefaultName =
"drpAssociation"
159 def __init__(self, **kwargs):
160 super().__init__(**kwargs)
161 self.makeSubtask(
'associator')
162 if self.config.doSolarSystemAssociation:
163 self.makeSubtask(
"solarSystemAssociator")
165 def runQuantum(self, butlerQC, inputRefs, outputRefs):
166 inputs = butlerQC.get(inputRefs)
168 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
169 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
170 inputs[
"idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
171 if not self.config.doSolarSystemAssociation:
172 inputs[
"ssObjectTableRefs"] = []
173 inputs[
"finalVisitSummaryRefs"] = []
174 outputs = self.run(**inputs)
175 butlerQC.put(outputs, outputRefs)
181 finalVisitSummaryRefs,
185 """Trim DiaSources to the current Patch and run association.
187 Takes in the set of DiaSource catalogs that covers the current patch,
188 trims them to the dimensions of the patch, and [TODO: eventually]
189 runs association on the concatenated DiaSource Catalog.
193 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
194 Set of DiaSource catalogs potentially covering this patch/tract.
195 ssObjectTableRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
196 Set of known SSO ephemerides potentially covering this patch/tract.
197 skyMap : `lsst.skymap.BaseSkyMap`
198 SkyMap defining the patch/tract
199 finalVisitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
200 Reference to finalVisitSummary of each exposure potentially
201 covering this patch/tract, which contain visitInfo, bbox, and wcs
203 Id of current tract being processed.
205 Id of current patch being processed.
206 idGenerator : `lsst.meas.base.IdGenerator`, optional
207 Object that generates Object IDs and random number generator seeds.
211 output : `lsst.pipe.base.Struct`
212 Results struct with attributes:
214 ``assocDiaSourceTable``
215 Table of DiaSources with updated value for diaObjectId.
218 Table of DiaObjects from matching DiaSources
219 (`pandas.DataFrame`).
221 self.log.info(
"Running DPR Association on patch %i, tract %i...",
224 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
227 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
228 outerPatchBox =
geom.Box2D(skyInfo.patchInfo.getOuterBBox())
229 innerTractSkyRegion = skyInfo.tractInfo.getInnerSkyRegion()
234 finalVisitSummaryIdDict =
prepareCatalogDict(finalVisitSummaryRefs, useVisitDetector=
False)
236 diaSourceHistory, ssSourceHistory, unassociatedSsObjectHistory = [], [], []
237 nSsSrc, nSsObj = 0, 0
238 visits = set([v
for v, _
in diaIdDict.keys()])
242 visitSummary = finalVisitSummaryIdDict[visit].get()
if visit
in finalVisitSummaryIdDict
else None
243 ssCat = ssObjectIdDict[visit].get()
if visit
in ssObjectIdDict
else None
244 detectors = [det
for (v, det)
in diaIdDict.keys()
if v == visit]
245 for detector
in detectors:
246 diaCat = diaIdDict[(visit, detector)].get()
247 nDiaSrcIn = len(diaCat)
248 if (ssCat
is not None)
and (visitSummary
is not None):
249 ssoAssocResult = self.runSolarSystemAssociation(diaCat,
251 visitSummary=visitSummary,
252 patchBbox=innerPatchBox,
253 patchWcs=skyInfo.wcs,
254 innerTractSkyRegion=innerTractSkyRegion,
259 nSsSrc = len(ssoAssocResult.associatedSsSources)
260 nSsObj = len(ssoAssocResult.unassociatedSsObjects)
264 diaCat = ssoAssocResult.unassociatedDiaSources
266 nSsSrc, nSsObj = 0, 0
272 diaInPatch = self._trimToPatch(diaCat.to_pandas(), outerPatchBox, skyInfo.wcs)
274 nDiaSrc = diaInPatch.sum()
277 "Read DiaSource catalog of length %i from visit %i, "
278 "detector %i. Found %i sources within the patch/tract "
279 "footprint, including %i associated with SSOs.",
280 nDiaSrcIn, visit, detector, nDiaSrc + nSsSrc, nSsSrc)
283 diaSourceHistory.append(diaCat[diaInPatch])
285 ssSourceHistory.append(ssoAssocResult.associatedSsSources)
287 unassociatedSsObjectHistory.append(ssoAssocResult.unassociatedSsObjects)
291 diaSourceHistoryCat = self._stackCatalogs(diaSourceHistory)
292 if self.config.doSolarSystemAssociation:
293 ssSourceHistoryCat = self._stackCatalogs(ssSourceHistory, remove_columns=[
'ra',
'dec'])
294 nSsSrcTotal = len(ssSourceHistoryCat)
if ssSourceHistoryCat
else 0
295 unassociatedSsObjectHistoryCat = self._stackCatalogs(unassociatedSsObjectHistory)
296 nSsObjTotal = len(unassociatedSsObjectHistoryCat)
if unassociatedSsObjectHistoryCat
else 0
297 self.log.info(
"Found %i ssSources and %i missing ssObjects in patch %i, tract %i",
298 nSsSrcTotal, nSsObjTotal, patchId, tractId)
300 ssSourceHistoryCat =
None
301 unassociatedSsObjectHistoryCat =
None
303 if (
not diaSourceHistory)
and (
not ssSourceHistory):
304 if not self.config.doWriteEmptyTables:
305 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
307 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
308 len(diaSourceHistoryCat), patchId, tractId)
310 diaSourceTable = diaSourceHistoryCat.to_pandas()
311 diaSourceTable.set_index(
"diaSourceId", drop=
False)
313 assocResult = self.associator.run(diaSourceTable, idGenerator=idGenerator)
317 objectsInTractPatch = self._trimToPatch(assocResult.diaObjects,
320 innerTractSkyRegion=innerTractSkyRegion)
321 diaObjects = assocResult.diaObjects[objectsInTractPatch]
326 assocDiaSources = self.dropDiaSourceByDiaObjectId(assocResult.diaObjects[~objectsInTractPatch].index,
327 assocResult.assocDiaSources)
329 self.log.info(
"Associated DiaSources into %i DiaObjects", len(diaObjects))
331 if self.config.doAddDiaObjectCoords:
332 assocDiaSources = self._addDiaObjectCoords(diaObjects, assocDiaSources)
334 return pipeBase.Struct(
335 diaObjectTable=diaObjects,
336 assocDiaSourceTable=assocDiaSources,
337 associatedSsSources=ssSourceHistoryCat,
338 unassociatedSsObjects=unassociatedSsObjectHistoryCat,
342 """Stack a list of catalogs.
346 catalogs : `list` of `astropy.table.Table`
347 Input catalogs with the same columns to be combined.
348 remove_columns : `list` of `str` or None, optional
349 List of column names to drop from the tables before stacking.
353 `astropy.table.Table`
354 The combined catalog.
357 sourceHistory = tb.vstack(catalogs)
358 if remove_columns
is not None:
359 sourceHistory.remove_columns(remove_columns)
372 """Run Solar System object association and filter the results.
376 diaCat : `pandas.DataFrame`
377 Catalog of detected diaSources on the image difference.
378 ssCat : `astropy.table.Table`
379 Catalog of predicted coordinates of known Solar System objects.
380 visitSummary : `lsst.afw.table.ExposureCatalog`
381 Table of calibration and metadata for all detectors in a visit.
382 patchBbox : `lsst.geom.Box2D`
383 Bounding box of the patch.
384 patchWcs : `lsst.geom.SkyWcs`
385 Wcs of the tract containing the patch.
386 innerTractSkyRegion : `lsst.sphgeom.Box`
387 Region defining the inner non-overlapping part of a tract.
389 Detector number of the science exposure.
391 Visit number of the science exposure.
395 ssoAssocResult : `lsst.pipe.base.Struct`
396 Results struct with attributes:
398 ``associatedSsSources``
399 Table of DiaSources associated with Solar System objects.
400 (`astropy.table.Table`)
401 ``associatedSsDiaSources``
402 Table of Solar System objects associated with DiaSources.
403 (`astropy.table.Table`).
404 ``unassociatedSsObjects``
405 Table of Solar System objects in the patch not associated with
406 any DiaSource (`astropy.table.Table`).
407 ``unassociatedDiaSources``
408 Table of DiaSources not associated with any Solar System object
409 (`astropy.table.Table`).
412 ssoAssocResult = self.solarSystemAssociator.run(
415 visitInfo=visitSummary.find(detector).visitInfo,
416 bbox=visitSummary.find(detector).getBBox(),
417 wcs=visitSummary.find(detector).wcs,
420 ssInTractPatch = self._trimToPatch(ssoAssocResult.associatedSsSources.to_pandas(),
423 innerTractSkyRegion=innerTractSkyRegion)
424 associatedSsSources = ssoAssocResult.associatedSsSources[ssInTractPatch].copy()
425 assocDiaSrcIds = set(associatedSsSources[
'diaSourceId'])
426 diaSrcMask = [diaId
in assocDiaSrcIds
for diaId
in ssoAssocResult.ssoAssocDiaSources[
'diaSourceId']]
427 associatedSsDiaSources = ssoAssocResult.ssoAssocDiaSources[np.array(diaSrcMask)]
429 ssObjInTractPatch = self._trimToPatch(ssoAssocResult.unassociatedSsObjects.to_pandas(),
432 innerTractSkyRegion=innerTractSkyRegion)
433 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects[ssObjInTractPatch].copy()
436 if len(unassociatedSsObjects) > 0:
437 unassociatedSsObjects[
'visit'] = visit
438 unassociatedSsObjects[
'detector'] = detector
440 return pipeBase.Struct(
441 associatedSsSources=associatedSsSources,
442 associatedSsDiaSources=associatedSsDiaSources,
443 unassociatedSsObjects=unassociatedSsObjects,
444 unassociatedDiaSources=ssoAssocResult.unAssocDiaSources
448 obj = objects[[
'ra',
'dec']].rename(columns={
"ra":
"coord_ra",
"dec":
"coord_dec"})
449 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
450 how=
'inner').set_index(
'diaSourceId')
454 """Create generator testing if a set of DiaSources are in the
459 cat : `pandas.DataFrame`
460 Catalog of DiaSources to test within patch/tract.
461 patchBox : `lsst.geom.Box2D`
462 Bounding box of the patch.
463 wcs : `lsst.geom.SkyWcs`
465 innerTractSkyRegion : `lsst.sphgeom.Box`, optional
466 Region defining the inner non-overlapping part of a tract.
470 isInPatch : `numpy.ndarray`, (N,)
471 Booleans representing if the DiaSources are contained within the
472 current patch and tract.
474 isInPatch = np.zeros(len(cat), dtype=bool)
476 for idx, row
in cat.reset_index().iterrows():
478 pxCoord = wcs.skyToPixel(spPoint)
479 ra_rad = np.deg2rad(row[
"ra"])
480 dec_rad = np.deg2rad(row[
"dec"])
481 isInPatch[idx] = patchBox.contains(pxCoord)
483 if innerTractSkyRegion
is not None:
484 isInPatch[idx] &= innerTractSkyRegion.contains(ra_rad, dec_rad)
489 """Drop diaSources with diaObject IDs in the supplied list.
493 droppedDiaObjectIds : `pandas.DataFrame`
494 DiaObjectIds to match and drop from the list of diaSources.
495 diaSources : `pandas.DataFrame`
496 Catalog of diaSources to check and filter.
500 filteredDiaSources : `pandas.DataFrame`
501 The input diaSources with any rows matching the listed diaObjectIds
504 toDrop = diaSources[
'diaObjectId'].isin(droppedDiaObjectIds)
507 return diaSources.loc[~toDrop].copy()
511 """Prepare lookup tables of the data references.
515 dataRefList : `list` of `lsst.daf.butler.DeferredDatasetHandle`
516 The data references to make a lookup table for.
517 useVisitDetector : `bool`, optional
518 Use both visit and detector in the dict key? If False, use only visit.
522 `dict` of `lsst.daf.butler.DeferredDatasetHandle`
523 Lookup table of the data references by visit (and optionally detector)
528 for dataRef
in dataRefList:
529 dataDict[(dataRef.dataId[
"visit"], dataRef.dataId[
"detector"])] = dataRef
531 for dataRef
in dataRefList:
532 dataDict[dataRef.dataId[
"visit"]] = dataRef
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
_addDiaObjectCoords(self, objects, sources)
_stackCatalogs(self, catalogs, remove_columns=None, empty=None)
_trimToPatch(self, cat, patchBox, wcs, innerTractSkyRegion=None)
prepareCatalogDict(dataRefList, useVisitDetector=True)
runSolarSystemAssociation(self, diaCat, ssCat, visitSummary, patchBbox, patchWcs, innerTractSkyRegion, detector, visit)
dropDiaSourceByDiaObjectId(self, droppedDiaObjectIds, diaSources)