LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
diaCalculation.py
Go to the documentation of this file.
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22from collections import namedtuple
23import numpy as np
24import pandas as pd
25
26from .catalogCalculation import (CatalogCalculationPluginConfig,
27 CatalogCalculationPlugin,
28 CatalogCalculationConfig,
29 CatalogCalculationTask,
30 CCContext)
31from .pluginsBase import BasePlugin
32from .pluginRegistry import (PluginRegistry, PluginMap)
33import lsst.pipe.base
34from lsst.utils.timer import timeMethod
35
36# Enforce an error for unsafe column/array value setting in pandas.
37pd.options.mode.chained_assignment = 'raise'
38
39__all__ = ("DiaObjectCalculationPlugin", "DiaObjectCalculationPluginConfig",
40 "DiaObjectCalculationTask", "DiaObjectCalculationConfig")
41
42
44 """Default configuration class for DIA catalog calculation plugins.
45 """
46 pass
47
48
50 """Base class for DIA catalog calculation plugins.
51
52 Task follows CatalogCalculationPlugin with modifications for use in AP.
53
54 Parameters
55 ----------
56 config : `DiaObjectCalculationPlugin.ConfigClass`
57 Plugin configuration.
58 name : `str`
59 The string the plugin was registered with.
60 metadata : `lsst.daf.base.PropertySet`
61 Plugin metadata that will be attached to the output catalog
62 """
63
64 ConfigClass = DiaObjectCalculationPluginConfig
65
66 registry = PluginRegistry(DiaObjectCalculationPluginConfig)
67 """List of available plugins (`lsst.meas.base.PluginRegistry`).
68 """
69
70 FLUX_MOMENTS_CALCULATED = 5.0
71 """Add order after flux means and stds are calculated.
72 """
73
74 plugType = 'single'
75 """Does the plugin operate on a single source or the whole catalog (`str`)?
76 If the plugin operates on a single source at a time, this should be set to
77 ``"single"``; if it expects the whoe catalog, to ``"multi"``. If the
78 plugin is of type ``"multi"``, the `fail` method must be implemented to
79 accept the whole catalog. If the plugin is of type ``"single"``, `fail`
80 should accept a single source record.
81 """
82
83 inputCols = []
84 """DiaObject column names required by the plugin in order to run and
85 complete its calculation. DiaCalculationTask should raise an error is a
86 plugin is instantiated without the needed column available. Input columns
87 should be defined in the DPDD/cat/Apdb schema. Filter dependent columns
88 should be specified without the filter name perpended to them. eg
89 ``PSFluxMean`` instead of ``uPSFluxMean``.
90 """
91 outputCols = []
92 """DiaObject column names output by the plugin. DiaCalculationTask should
93 raise an error if another pluging is run output to the same column.
94 Output columns should be defined in the DPDD/cat/Apdb schema. Filter
95 dependent columns should be specified without the filter name perpended to
96 them. eg ``PSFluxMean`` instead of ``uPSFluxMean``.
97 """
98
99 needsFilter = True
100 """This plugin requires a filter to be specified. Plugin's using filter
101 names usually deal with fluxes and only a sub-set of the DiaSource
102 catalog. Plugins that to not use the filter name usually run over a value
103 common across all observations/detections such as position.
104 """
105
106 def __init__(self, config, name, metadata):
107 BasePlugin.__init__(self, config, name)
108
109 def calculate(self,
110 diaObject,
111 diaSources,
112 filterDiaFluxes=None,
113 filterName=None,
114 **kwargs):
115 """Perform the calculation specified by this plugin.
116
117 This method can either be used to operate on a single catalog record
118 or a whole catalog, populating it with the output defined by this
119 plugin.
120
121 Note that results may be added to catalog records as new columns, or
122 may result in changes to existing values.
123
124 Parameters
125 ----------
126 diaObject : `dict`
127 Summary object to store values in.
128 diaSources : `pandas.DataFrame`
129 DataFrame representing all diaSources associated with this
130 diaObject.
131 filterDiaFluxes : `pandas.DataFrame`
132 DataFrame representing diaSources associated with this
133 diaObject that are observed in the band pass ``filterName``.
134 filterName : `str`
135 Simple name of the filter for the flux being calculated.
136 **kwargs
137 Any additional keyword arguments that may be passed to the plugin.
138 """
139 raise NotImplementedError()
140
141 def fail(self, diaObject, columns, error=None):
142 """Set diaObject position values to nan.
143
144 Parameters
145 ----------
146 diaObject : `dict`
147 Summary object to store values in.
148 columns : `list` of `str`
149 List of string names of columns to write a the failed value.
150 error : `BaseException` or `None`
151 Error to pass. Kept for consistency with CatologCalculationPlugin.
152 Unused.
153 """
154 for colName in columns:
155 diaObject[colName] = np.nan
156
157
159 """Config class for the catalog calculation driver task.
160
161 Specifies which plugins will execute when the `CatalogCalculationTask`
162 associated with this configuration is run.
163 """
164
165 plugins = DiaObjectCalculationPlugin.registry.makeField(
166 multi=True,
167 default=["ap_meanPosition",
168 "ap_meanFlux"],
169 doc="Plugins to be run and their configuration")
170
171
173 """Run plugins which operate on a catalog of DIA sources.
174
175 This task facilitates running plugins which will operate on a source
176 catalog. These plugins may do things such as classifying an object based
177 on source record entries inserted during a measurement task.
178
179 This task differs from CatalogCaculationTask in the following ways:
180
181 -No multi mode is available for plugins. All plugins are assumed to run
182 in single mode.
183
184 -Input and output catalog types are assumed to be `pandas.DataFrames` with
185 columns following those used in the Apdb.
186
187 -No schema argument is passed to the plugins. Each plugin specifies
188 output columns and required inputs.
189
190 Parameters
191 ----------
192 plugMetaData : `lsst.daf.base.PropertyList` or `None`
193 Will be modified in-place to contain metadata about the plugins being
194 run. If `None`, an empty `~lsst.daf.base.PropertyList` will be
195 created.
196 **kwargs
197 Additional arguments passed to the superclass constructor.
198
199 Notes
200 -----
201 Plugins may either take an entire catalog to work on at a time, or work on
202 individual records.
203 """
204 ConfigClass = DiaObjectCalculationConfig
205 _DefaultName = "diaObjectCalculation"
206
207 def __init__(self, plugMetadata=None, **kwargs):
208 lsst.pipe.base.Task.__init__(self, **kwargs)
209 if plugMetadata is None:
210 plugMetadata = lsst.daf.base.PropertyList()
211 self.plugMetadataplugMetadataplugMetadata = plugMetadata
212 self.pluginspluginsplugins = PluginMap()
213 self.outputColsoutputCols = []
214
215 self.initializePluginsinitializePluginsinitializePlugins()
216
218 """Initialize the plugins according to the configuration.
219 """
220
221 pluginType = namedtuple('pluginType', 'single multi')
222 self.executionDictexecutionDictexecutionDict = {}
223 # Read the properties for each plugin. Allocate a dictionary entry for
224 # each run level. Verify that the plugins are above the minimum run
225 # level for an catalogCalculation plugin. For each run level, the
226 # plugins are sorted into either single record, or multi record groups
227 # to later be run appropriately
228 for executionOrder, name, config, PluginClass in sorted(self.config.plugins.apply()):
229 if executionOrder not in self.executionDictexecutionDictexecutionDict:
230 self.executionDictexecutionDictexecutionDict[executionOrder] = pluginType(single=[], multi=[])
231 if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_CATALOGCALCULATION:
232 plug = PluginClass(config, name, metadata=self.plugMetadataplugMetadataplugMetadata)
233
234 self._validatePluginCols_validatePluginCols(plug)
235
236 self.pluginspluginsplugins[name] = plug
237 if plug.plugType == 'single':
238 self.executionDictexecutionDictexecutionDict[executionOrder].single.append(plug)
239 elif plug.plugType == 'multi':
240 self.executionDictexecutionDictexecutionDict[executionOrder].multi.append(plug)
241 else:
242 errorTuple = (PluginClass, PluginClass.getExecutionOrder(),
243 BasePlugin.DEFAULT_CATALOGCALCULATION)
244 raise ValueError("{} has an execution order less than the minimum for an catalogCalculation "
245 "plugin. Value {} : Minimum {}".format(*errorTuple))
246
247 def _validatePluginCols(self, plug):
248 """Assert that output columns are not duplicated and input columns
249 exist for dependent plugins.
250
251 Parameters
252 ----------
253 plug : `lsst.ap.association.DiaCalculationPlugin`
254 Plugin to test for output collisions and input needs.
255 """
256 for inputName in plug.inputCols:
257 if inputName not in self.outputColsoutputCols:
258 errorTuple = (plug.name, plug.getExecutionOrder(),
259 inputName)
260 raise ValueError(
261 "Plugin, {} with execution order {} requires DiaObject "
262 "column {} to exist. Check the execution order of the "
263 "plugin and make sure it runs after a plugin creating "
264 "the column is run.".format(*errorTuple))
265 for outputName in plug.outputCols:
266 if outputName in self.outputColsoutputCols:
267 errorTuple = (plug.name, plug.getExecutionOrder(),
268 outputName)
269 raise ValueError(
270 "Plugin, {} with execution order {} is attempting to "
271 "output a column {}, however the column is already being "
272 "produced by another plugin. Check other plugins for "
273 "collisions with this one.".format(*errorTuple))
274 else:
275 self.outputColsoutputCols.append(outputName)
276
277 @timeMethod
278 def run(self,
279 diaObjectCat,
280 diaSourceCat,
281 updatedDiaObjectIds,
282 filterNames):
283 """The entry point for the DIA catalog calculation task.
284
285 Run method both updates the values in the diaObjectCat and appends
286 newly created DiaObjects to the catalog. For catalog column names
287 see the lsst.cat schema definitions for the DiaObject and DiaSource
288 tables (http://github.com/lsst/cat).
289
290 Parameters
291 ----------
292 diaObjectCat : `pandas.DataFrame`
293 DiaObjects to update values of and append new objects to. DataFrame
294 should be indexed on "diaObjectId"
295 diaSourceCat : `pandas.DataFrame`
296 DiaSources associated with the DiaObjects in diaObjectCat.
297 DataFrame should be indexed on
298 `["diaObjectId", "filterName", "diaSourceId"]`
299 updatedDiaObjectIds : `numpy.ndarray`
300 Integer ids of the DiaObjects to update and create.
301 filterNames : `list` of `str`
302 List of string names of filters to be being processed.
303
304 Returns
305 -------
306 returnStruct : `lsst.pipe.base.Struct`
307 Struct containing:
308
309 ``diaObjectCat``
310 Full set of DiaObjects including both un-updated and
311 updated/new DiaObjects (`pandas.DataFrame`).
312 ``updatedDiaObjects``
313 Catalog of DiaObjects that were updated or created by this
314 task (`pandas.DataFrame`).
315 """
316 if diaObjectCat.index.name is None:
317 diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
318 elif diaObjectCat.index.name != "diaObjectId":
319 self.log.warning(
320 "Input diaObjectCat is indexed on column(s) incompatible with "
321 "this task. Should be indexed on 'diaObjectId'. Trying to set "
322 "index regardless")
323 diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
324
325 # ``names`` by default is FrozenList([None]) hence we access the first
326 # element and test for None.
327 if diaSourceCat.index.names[0] is None:
328 diaSourceCat.set_index(
329 ["diaObjectId", "filterName", "diaSourceId"],
330 inplace=True,
331 drop=False)
332 elif (diaSourceCat.index.names
333 != ["diaObjectId", "filterName", "diaSourceId"]):
334 diaSourceCat.reset_index(inplace=True)
335 diaSourceCat.set_index(
336 ["diaObjectId", "filterName", "diaSourceId"],
337 inplace=True,
338 drop=False)
339
340 return self.callComputecallComputecallCompute(diaObjectCat,
341 diaSourceCat,
342 updatedDiaObjectIds,
343 filterNames)
344
345 @timeMethod
346 def callCompute(self,
347 diaObjectCat,
348 diaSourceCat,
349 updatedDiaObjectIds,
350 filterNames):
351 """Run each of the plugins on the catalog.
352
353 For catalog column names see the lsst.cat schema definitions for the
354 DiaObject and DiaSource tables (http://github.com/lsst/cat).
355
356 Parameters
357 ----------
358 diaObjectCat : `pandas.DataFrame`
359 DiaObjects to update values of and append new objects to. DataFrame
360 should be indexed on "diaObjectId"
361 diaSourceCat : `pandas.DataFrame`
362 DiaSources associated with the DiaObjects in diaObjectCat.
363 DataFrame must be indexed on
364 ["diaObjectId", "filterName", "diaSourceId"]`
365 updatedDiaObjectIds : `numpy.ndarray`
366 Integer ids of the DiaObjects to update and create.
367 filterNames : `list` of `str`
368 List of string names of filters to be being processed.
369
370 Returns
371 -------
372 returnStruct : `lsst.pipe.base.Struct`
373 Struct containing:
374
375 ``diaObjectCat``
376 Full set of DiaObjects including both un-updated and
377 updated/new DiaObjects (`pandas.DataFrame`).
378 ``updatedDiaObjects``
379 Catalog of DiaObjects that were updated or created by this
380 task (`pandas.DataFrame`).
381
382 Raises
383 ------
384 KeyError
385 Raises if `pandas.DataFrame` indexing is not properly set.
386 """
387 # DiaObjects will be updated in place.
388 diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
389 self.log.info("Calculating summary stats for %i DiaObjects",
390 len(diaObjectsToUpdate))
391
392 updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
393 diaSourcesGB = updatingDiaSources.groupby(level=0)
394 for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
395 for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
396 if plug.needsFilter:
397 continue
398 for updatedDiaObjectId in updatedDiaObjectIds:
399
400 # Sub-select diaSources associated with this diaObject.
401 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
402
403 # Sub-select on diaSources observed in the current filter.
404 with CCContext(plug, updatedDiaObjectId, self.log):
405 # We feed the catalog we need to update and the id
406 # so as to get a few into the catalog and not a copy.
407 # This updates the values in the catalog.
408 plug.calculate(diaObjects=diaObjectsToUpdate,
409 diaObjectId=updatedDiaObjectId,
410 diaSources=objDiaSources,
411 filterDiaSources=None,
412 filterName=None)
413 for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
414 if plug.needsFilter:
415 continue
416 with CCContext(plug, diaObjectsToUpdate, self.log):
417 plug.calculate(diaObjects=diaObjectsToUpdate,
418 diaSources=diaSourcesGB,
419 filterDiaSources=None,
420 filterName=None)
421
422 for filterName in filterNames:
423 try:
424 updatingFilterDiaSources = updatingDiaSources.loc[
425 (slice(None), filterName), :
426 ]
427 except KeyError:
428 self.log.warning("No DiaSource data with fitler=%s. "
429 "Continuing...", filterName)
430 continue
431 # Level=0 here groups by diaObjectId.
432 filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
433
434 for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
435 for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
436 if not plug.needsFilter:
437 continue
438 for updatedDiaObjectId in updatedDiaObjectIds:
439
440 # Sub-select diaSources associated with this diaObject.
441 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
442
443 # Sub-select on diaSources observed in the current filter.
444 try:
445 filterObjDiaSources = objDiaSources.loc[filterName]
446 except KeyError:
447 self.log.warning(
448 "DiaObjectId={updatedDiaObjectId} has no "
449 "DiaSources for filter=%s. "
450 "Continuing...", filterName)
451 with CCContext(plug, updatedDiaObjectId, self.log):
452 # We feed the catalog we need to update and the id
453 # so as to get a few into the catalog and not a copy.
454 # This updates the values in the catalog.
455 plug.calculate(diaObjects=diaObjectsToUpdate,
456 diaObjectId=updatedDiaObjectId,
457 diaSources=objDiaSources,
458 filterDiaSources=filterObjDiaSources,
459 filterName=filterName)
460 for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
461 if not plug.needsFilter:
462 continue
463 with CCContext(plug, diaObjectsToUpdate, self.log):
464 plug.calculate(diaObjects=diaObjectsToUpdate,
465 diaSources=diaSourcesGB,
466 filterDiaSources=filterDiaSourcesGB,
467 filterName=filterName)
468 # Need to store the newly updated diaObjects directly as the editing
469 # a view into diaObjectsToUpdate does not update the values of
470 # diaObjectCat.
471 diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
472 return lsst.pipe.base.Struct(
473 diaObjectCat=diaObjectCat,
474 updatedDiaObjects=diaObjectsToUpdate)
475
476 def _initialize_dia_object(self, objId):
477 """Create a new DiaObject with values required to be initialized by the
478 Apdb.
479
480 Parameters
481 ----------
482 objid : `int`
483 ``diaObjectId`` value for the of the new DiaObject.
484
485 Returns
486 -------
487 diaObject : `dict`
488 Newly created DiaObject with keys:
489
490 ``diaObjectId``
491 Unique DiaObjectId (`int`).
492 ``pmParallaxNdata``
493 Number of data points used for parallax calculation (`int`).
494 ``nearbyObj1``
495 Id of the a nearbyObject in the Object table (`int`).
496 ``nearbyObj2``
497 Id of the a nearbyObject in the Object table (`int`).
498 ``nearbyObj3``
499 Id of the a nearbyObject in the Object table (`int`).
500 ``?PSFluxData``
501 Number of data points used to calculate point source flux
502 summary statistics in each bandpass (`int`).
503 """
504 new_dia_object = {"diaObjectId": objId,
505 "pmParallaxNdata": 0,
506 "nearbyObj1": 0,
507 "nearbyObj2": 0,
508 "nearbyObj3": 0}
509 for f in ["u", "g", "r", "i", "z", "y"]:
510 new_dia_object["%sPSFluxNdata" % f] = 0
511 return new_dia_object
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
Class for storing generic metadata.
Definition: PropertySet.h:66
def calculate(self, diaObject, diaSources, filterDiaFluxes=None, filterName=None, **kwargs)
def fail(self, diaObject, columns, error=None)
def __init__(self, plugMetadata=None, **kwargs)
def run(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
def callCompute(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174