LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
diaCalculation.py
Go to the documentation of this file.
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22from collections import namedtuple
23import numpy as np
24import pandas as pd
25
26from .catalogCalculation import (CatalogCalculationPluginConfig,
27 CatalogCalculationPlugin,
28 CatalogCalculationConfig,
29 CatalogCalculationTask,
30 CCContext)
31from .pluginsBase import BasePlugin
32from .pluginRegistry import (PluginRegistry, PluginMap)
33import lsst.pipe.base
34from lsst.utils.timer import timeMethod
35
36# Enforce an error for unsafe column/array value setting in pandas.
37pd.options.mode.chained_assignment = 'raise'
38
39__all__ = ("DiaObjectCalculationPlugin", "DiaObjectCalculationPluginConfig",
40 "DiaObjectCalculationTask", "DiaObjectCalculationConfig")
41
42
44 """Default configuration class for DIA catalog calculation plugins.
45 """
46 pass
47
48
50 """Base class for DIA catalog calculation plugins.
51
52 Task follows CatalogCalculationPlugin with modifications for use in AP.
53
54 Parameters
55 ----------
56 config : `DiaObjectCalculationPlugin.ConfigClass`
57 Plugin configuration.
58 name : `str`
59 The string the plugin was registered with.
60 metadata : `lsst.daf.base.PropertySet`
61 Plugin metadata that will be attached to the output catalog
62 """
63
64 ConfigClass = DiaObjectCalculationPluginConfig
65
66 registry = PluginRegistry(DiaObjectCalculationPluginConfig)
67 """List of available plugins (`lsst.meas.base.PluginRegistry`).
68 """
69
70 FLUX_MOMENTS_CALCULATED = 5.0
71 """Add order after flux means and stds are calculated.
72 """
73
74 plugType = 'single'
75 """Does the plugin operate on a single source or the whole catalog (`str`)?
76 If the plugin operates on a single source at a time, this should be set to
77 ``"single"``; if it expects the whoe catalog, to ``"multi"``. If the
78 plugin is of type ``"multi"``, the `fail` method must be implemented to
79 accept the whole catalog. If the plugin is of type ``"single"``, `fail`
80 should accept a single source record.
81 """
82
83 inputCols = []
84 """DiaObject column names required by the plugin in order to run and
85 complete its calculation. DiaCalculationTask should raise an error is a
86 plugin is instantiated without the needed column available. Input columns
87 should be defined in the DPDD/cat/Apdb schema. Filter dependent columns
88 should be specified without the filter name perpended to them. eg
89 ``psfFluxMean`` instead of ``u_psfFluxMean``.
90 """
91 outputCols = []
92 """DiaObject column names output by the plugin. DiaCalculationTask should
93 raise an error if another pluging is run output to the same column.
94 Output columns should be defined in the DPDD/cat/Apdb schema. Filter
95 dependent columns should be specified without the filter name perpended to
96 them. eg ``psfFluxMean`` instead of ``u_psfFluxMean``.
97 """
98
99 needsFilter = True
100 """This plugin requires a filter to be specified. Plugin's using filter
101 names usually deal with fluxes and only a sub-set of the DiaSource
102 catalog. Plugins that to not use the filter name usually run over a value
103 common across all observations/detections such as position.
104 """
105
106 def __init__(self, config, name, metadata):
107 BasePlugin.__init__(self, config, name)
108
109 def calculate(self,
110 diaObject,
111 diaSources,
112 filterDiaFluxes=None,
113 band=None,
114 **kwargs):
115 """Perform the calculation specified by this plugin.
116
117 This method can either be used to operate on a single catalog record
118 or a whole catalog, populating it with the output defined by this
119 plugin.
120
121 Note that results may be added to catalog records as new columns, or
122 may result in changes to existing values.
123
124 Parameters
125 ----------
126 diaObject : `dict`
127 Summary object to store values in.
128 diaSources : `pandas.DataFrame`
129 DataFrame representing all diaSources associated with this
130 diaObject.
131 filterDiaFluxes : `pandas.DataFrame`
132 DataFrame representing diaSources associated with this
133 diaObject that are observed in the band pass ``band``.
134 band : `str`
135 Simple name of the filter for the flux being calculated.
136 **kwargs
137 Any additional keyword arguments that may be passed to the plugin.
138 """
139 raise NotImplementedError()
140
141 def fail(self, diaObject, columns, error=None):
142 """Set diaObject position values to nan.
143
144 Parameters
145 ----------
146 diaObject : `dict`
147 Summary object to store values in.
148 columns : `list` of `str`
149 List of string names of columns to write a the failed value.
150 error : `BaseException` or `None`
151 Error to pass. Kept for consistency with CatologCalculationPlugin.
152 Unused.
153 """
154 for colName in columns:
155 diaObject[colName] = np.nan
156
157
159 """Config class for the catalog calculation driver task.
160
161 Specifies which plugins will execute when the `CatalogCalculationTask`
162 associated with this configuration is run.
163 """
164
165 plugins = DiaObjectCalculationPlugin.registry.makeField(
166 multi=True,
167 default=["ap_meanPosition",
168 "ap_meanFlux"],
169 doc="Plugins to be run and their configuration")
170
171
173 """Run plugins which operate on a catalog of DIA sources.
174
175 This task facilitates running plugins which will operate on a source
176 catalog. These plugins may do things such as classifying an object based
177 on source record entries inserted during a measurement task.
178
179 This task differs from CatalogCaculationTask in the following ways:
180
181 -No multi mode is available for plugins. All plugins are assumed to run
182 in single mode.
183
184 -Input and output catalog types are assumed to be `pandas.DataFrames` with
185 columns following those used in the Apdb.
186
187 -No schema argument is passed to the plugins. Each plugin specifies
188 output columns and required inputs.
189
190 Parameters
191 ----------
192 plugMetaData : `lsst.daf.base.PropertyList` or `None`
193 Will be modified in-place to contain metadata about the plugins being
194 run. If `None`, an empty `~lsst.daf.base.PropertyList` will be
195 created.
196 **kwargs
197 Additional arguments passed to the superclass constructor.
198
199 Notes
200 -----
201 Plugins may either take an entire catalog to work on at a time, or work on
202 individual records.
203 """
204 ConfigClass = DiaObjectCalculationConfig
205 _DefaultName = "diaObjectCalculation"
206
207 def __init__(self, plugMetadata=None, **kwargs):
208 lsst.pipe.base.Task.__init__(self, **kwargs)
209 if plugMetadata is None:
210 plugMetadata = lsst.daf.base.PropertyList()
211 self.plugMetadataplugMetadata = plugMetadata
213 self.outputCols = []
214
216
218 """Initialize the plugins according to the configuration.
219 """
220
221 pluginType = namedtuple('pluginType', 'single multi')
223 # Read the properties for each plugin. Allocate a dictionary entry for
224 # each run level. Verify that the plugins are above the minimum run
225 # level for an catalogCalculation plugin. For each run level, the
226 # plugins are sorted into either single record, or multi record groups
227 # to later be run appropriately
228 for executionOrder, name, config, PluginClass in sorted(self.config.plugins.apply()):
229 if executionOrder not in self.executionDictexecutionDict:
230 self.executionDictexecutionDict[executionOrder] = pluginType(single=[], multi=[])
231 if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_CATALOGCALCULATION:
232 plug = PluginClass(config, name, metadata=self.plugMetadataplugMetadata)
233
234 self._validatePluginCols(plug)
235
236 self.pluginsplugins[name] = plug
237 if plug.plugType == 'single':
238 self.executionDictexecutionDict[executionOrder].single.append(plug)
239 elif plug.plugType == 'multi':
240 self.executionDictexecutionDict[executionOrder].multi.append(plug)
241 else:
242 errorTuple = (PluginClass, PluginClass.getExecutionOrder(),
243 BasePlugin.DEFAULT_CATALOGCALCULATION)
244 raise ValueError("{} has an execution order less than the minimum for an catalogCalculation "
245 "plugin. Value {} : Minimum {}".format(*errorTuple))
246
247 def _validatePluginCols(self, plug):
248 """Assert that output columns are not duplicated and input columns
249 exist for dependent plugins.
250
251 Parameters
252 ----------
253 plug : `lsst.ap.association.DiaCalculationPlugin`
254 Plugin to test for output collisions and input needs.
255 """
256 for inputName in plug.inputCols:
257 if inputName not in self.outputCols:
258 errorTuple = (plug.name, plug.getExecutionOrder(),
259 inputName)
260 raise ValueError(
261 "Plugin, {} with execution order {} requires DiaObject "
262 "column {} to exist. Check the execution order of the "
263 "plugin and make sure it runs after a plugin creating "
264 "the column is run.".format(*errorTuple))
265 for outputName in plug.outputCols:
266 if outputName in self.outputCols:
267 errorTuple = (plug.name, plug.getExecutionOrder(),
268 outputName)
269 raise ValueError(
270 "Plugin, {} with execution order {} is attempting to "
271 "output a column {}, however the column is already being "
272 "produced by another plugin. Check other plugins for "
273 "collisions with this one.".format(*errorTuple))
274 else:
275 self.outputCols.append(outputName)
276
277 @timeMethod
278 def run(self,
279 diaObjectCat,
280 diaSourceCat,
281 updatedDiaObjectIds,
282 filterNames):
283 """The entry point for the DIA catalog calculation task.
284
285 Run method both updates the values in the diaObjectCat and appends
286 newly created DiaObjects to the catalog. For catalog column names
287 see the SDM schema definitions for the DiaObject and DiaSource
288 tables (http://github.com/lsst/sdm_schemas/).
289
290 Parameters
291 ----------
292 diaObjectCat : `pandas.DataFrame`
293 DiaObjects to update values of and append new objects to. DataFrame
294 should be indexed on "diaObjectId".
295 diaSourceCat : `pandas.DataFrame`
296 DiaSources associated with the DiaObjects in diaObjectCat.
297 DataFrame should be indexed on
298 ["diaObjectId", "band", "diaSourceId"].
299 updatedDiaObjectIds : `numpy.ndarray`
300 Integer ids of the DiaObjects to update and create.
301 filterNames : `list` of `str`
302 List of string names of filters to be being processed.
303
304 Returns
305 -------
306 returnStruct : `lsst.pipe.base.Struct`
307 Struct containing:
308
309 ``diaObjectCat``
310 Full set of DiaObjects including both un-updated and
311 updated/new DiaObjects (`pandas.DataFrame`).
312 ``updatedDiaObjects``
313 Catalog of DiaObjects that were updated or created by this
314 task (`pandas.DataFrame`).
315 """
316 if diaObjectCat.index.name is None:
317 diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
318 elif diaObjectCat.index.name != "diaObjectId":
319 self.loglog.warning(
320 "Input diaObjectCat is indexed on column(s) incompatible with "
321 "this task. Should be indexed on 'diaObjectId'. Trying to set "
322 "index regardless")
323 diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
324
325 # ``names`` by default is FrozenList([None]) hence we access the first
326 # element and test for None.
327 if diaSourceCat.index.names[0] is None:
328 diaSourceCat.set_index(
329 ["diaObjectId", "band", "diaSourceId"],
330 inplace=True,
331 drop=False)
332 elif (diaSourceCat.index.names
333 != ["diaObjectId", "band", "diaSourceId"]):
334 diaSourceCat.reset_index(inplace=True)
335 diaSourceCat.set_index(
336 ["diaObjectId", "band", "diaSourceId"],
337 inplace=True,
338 drop=False)
339
340 return self.callComputecallCompute(diaObjectCat,
341 diaSourceCat,
342 updatedDiaObjectIds,
343 filterNames)
344
345 @timeMethod
346 def callCompute(self,
347 diaObjectCat,
348 diaSourceCat,
349 updatedDiaObjectIds,
350 filterNames):
351 """Run each of the plugins on the catalog.
352
353 For catalog column names see the SDM schema definitions for the
354 DiaObject and DiaSource tables (http://github.com/lsst/sdm_schemas/).
355
356 Parameters
357 ----------
358 diaObjectCat : `pandas.DataFrame`
359 DiaObjects to update values of and append new objects to. DataFrame
360 should be indexed on "diaObjectId".
361 diaSourceCat : `pandas.DataFrame`
362 DiaSources associated with the DiaObjects in diaObjectCat.
363 DataFrame must be indexed on
364 ["diaObjectId", "band", "diaSourceId"].
365 updatedDiaObjectIds : `numpy.ndarray`
366 Integer ids of the DiaObjects to update and create.
367 filterNames : `list` of `str`
368 List of string names of filters to be being processed.
369
370 Returns
371 -------
372 returnStruct : `lsst.pipe.base.Struct`
373 Struct containing:
374
375 ``diaObjectCat``
376 Full set of DiaObjects including both un-updated and
377 updated/new DiaObjects (`pandas.DataFrame`).
378 ``updatedDiaObjects``
379 Catalog of DiaObjects that were updated or created by this
380 task (`pandas.DataFrame`).
381
382 Raises
383 ------
384 KeyError
385 Raises if `pandas.DataFrame` indexing is not properly set.
386 """
387 # DiaObjects will be updated in place.
388 diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
389 self.loglog.info("Calculating summary stats for %i DiaObjects",
390 len(diaObjectsToUpdate))
391
392 updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
393 diaSourcesGB = updatingDiaSources.groupby(level=0)
394 self._run_all_plugins(updatedDiaObjectIds,
395 diaObjectsToUpdate,
396 updatingDiaSources,
397 diaSourcesGB,
398 None)
399 for band in filterNames:
400 self._run_all_plugins(updatedDiaObjectIds,
401 diaObjectsToUpdate,
402 updatingDiaSources,
403 diaSourcesGB,
404 band)
405 # Need to store the newly updated diaObjects directly as the editing
406 # a view into diaObjectsToUpdate does not update the values of
407 # diaObjectCat.
408 diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
409 return lsst.pipe.base.Struct(
410 diaObjectCat=diaObjectCat,
411 updatedDiaObjects=diaObjectsToUpdate)
412
414 updatedDiaObjectIds,
415 diaObjectsToUpdate,
416 updatingDiaSources,
417 diaSourcesGB,
418 band):
419 """Run each of the plugins on specific data and band.
420
421 All catalogs are modified in-place.
422
423 Parameters
424 ----------
425 updatedDiaObjectIds : `numpy.ndarray`
426 Integer ids of the DiaObjects to update and create.
427 diaObjectsToUpdate : indexer for a `pandas.DataFrame`
428 A modifiable subset of DiaObjects to update values of and append
429 new objects to. Should be indexed on "diaObjectId".
430 updatingDiaSources : indexer for a `pandas.DataFrame`
431 A modifiable subset of DiaSources associated with the DiaObjects in
432 diaObjectCat. Must be indexed on ["diaObjectId", "band", "diaSourceId"].
433 diaSourcesGB :
434 A copy of ``updatingDiaSources`` grouped by diaObjectId.
435 band : `str` or `None`
436 The filter to process, or `None` to run filter-agnostic plugins.
437 """
438 if band:
439 try:
440 updatingFilterDiaSources = updatingDiaSources.loc[
441 (slice(None), band), :
442 ]
443 except KeyError:
444 self.loglog.warning("No DiaSource data with fitler=%s. "
445 "Continuing...", band)
446 return
447 # Level=0 here groups by diaObjectId.
448 filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
449 else:
450 filterDiaSourcesGB = None
451
452 log_band = "band " + band if band else "no band"
453 for runlevel in sorted(self.executionDictexecutionDict):
454 for plug in self.executionDictexecutionDict[runlevel].single:
455 if plug.needsFilter ^ bool(band):
456 continue
457 self.loglog.verbose("Running plugin %s on %s.", plug.name, log_band)
458
459 for updatedDiaObjectId in updatedDiaObjectIds:
460 # Sub-select diaSources associated with this diaObject.
461 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
462
463 # Sub-select on diaSources observed in the current filter.
464 if band:
465 try:
466 filterObjDiaSources = objDiaSources.loc[band]
467 except KeyError:
468 self.loglog.warning(
469 "DiaObjectId={updatedDiaObjectId} has no "
470 "DiaSources for filter=%s. "
471 "Continuing...", band)
472 continue
473 else:
474 filterObjDiaSources = None
475 with CCContext(plug, updatedDiaObjectId, self.loglog):
476 # We feed the catalog we need to update and the id
477 # so as to get a few into the catalog and not a copy.
478 # This updates the values in the catalog.
479 plug.calculate(diaObjects=diaObjectsToUpdate,
480 diaObjectId=updatedDiaObjectId,
481 diaSources=objDiaSources,
482 filterDiaSources=filterObjDiaSources,
483 band=band)
484 for plug in self.executionDictexecutionDict[runlevel].multi:
485 if plug.needsFilter ^ bool(band):
486 continue
487 self.loglog.verbose("Running plugin %s on %s.", plug.name, log_band)
488 with CCContext(plug, diaObjectsToUpdate, self.loglog):
489 plug.calculate(diaObjects=diaObjectsToUpdate,
490 diaSources=diaSourcesGB,
491 filterDiaSources=filterDiaSourcesGB,
492 band=band)
493
494 def _initialize_dia_object(self, objId):
495 """Create a new DiaObject with values required to be initialized by the
496 Apdb.
497
498 Parameters
499 ----------
500 objid : `int`
501 ``diaObjectId`` value for the of the new DiaObject.
502
503 Returns
504 -------
505 diaObject : `dict`
506 Newly created DiaObject with keys:
507
508 ``diaObjectId``
509 Unique DiaObjectId (`int`).
510 ``pmParallaxNdata``
511 Number of data points used for parallax calculation (`int`).
512 ``nearbyObj1``
513 Id of the a nearbyObject in the Object table (`int`).
514 ``nearbyObj2``
515 Id of the a nearbyObject in the Object table (`int`).
516 ``nearbyObj3``
517 Id of the a nearbyObject in the Object table (`int`).
518 ``?_psfFluxNdata``
519 Number of data points used to calculate point source flux
520 summary statistics in each bandpass (`int`).
521 """
522 new_dia_object = {"diaObjectId": objId,
523 "pmParallaxNdata": 0,
524 "nearbyObj1": 0,
525 "nearbyObj2": 0,
526 "nearbyObj3": 0}
527 for f in ["u", "g", "r", "i", "z", "y"]:
528 new_dia_object["%s_psfFluxNdata" % f] = 0
529 return new_dia_object
Class for storing ordered metadata with comments.
calculate(self, diaObject, diaSources, filterDiaFluxes=None, band=None, **kwargs)
run(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
_run_all_plugins(self, updatedDiaObjectIds, diaObjectsToUpdate, updatingDiaSources, diaSourcesGB, band)
callCompute(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)