LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
diaCalculation.py
Go to the documentation of this file.
1 # This file is part of ap_association.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 from collections import namedtuple
23 import numpy as np
24 import pandas as pd
25 
26 from .catalogCalculation import (CatalogCalculationPluginConfig,
27  CatalogCalculationPlugin,
28  CatalogCalculationConfig,
29  CatalogCalculationTask,
30  CCContext)
31 from .pluginsBase import BasePlugin
32 from .pluginRegistry import (PluginRegistry, PluginMap)
33 import lsst.pipe.base
34 from lsst.utils.timer import timeMethod
35 
36 # Enforce an error for unsafe column/array value setting in pandas.
37 pd.options.mode.chained_assignment = 'raise'
38 
39 __all__ = ("DiaObjectCalculationPlugin", "DiaObjectCalculationPluginConfig",
40  "DiaObjectCalculationTask", "DiaObjectCalculationConfig")
41 
42 
44  """Default configuration class for DIA catalog calculation plugins.
45  """
46  pass
47 
48 
50  """Base class for DIA catalog calculation plugins.
51 
52  Task follows CatalogCalculationPlugin with modifications for use in AP.
53 
54  Parameters
55  ----------
56  config : `DiaObjectCalculationPlugin.ConfigClass`
57  Plugin configuration.
58  name : `str`
59  The string the plugin was registered with.
60  metadata : `lsst.daf.base.PropertySet`
61  Plugin metadata that will be attached to the output catalog
62  """
63 
64  ConfigClass = DiaObjectCalculationPluginConfig
65 
66  registry = PluginRegistry(DiaObjectCalculationPluginConfig)
67  """List of available plugins (`lsst.meas.base.PluginRegistry`).
68  """
69 
70  FLUX_MOMENTS_CALCULATED = 5.0
71  """Add order after flux means and stds are calculated.
72  """
73 
74  plugType = 'single'
75  """Does the plugin operate on a single source or the whole catalog (`str`)?
76  If the plugin operates on a single source at a time, this should be set to
77  ``"single"``; if it expects the whoe catalog, to ``"multi"``. If the
78  plugin is of type ``"multi"``, the `fail` method must be implemented to
79  accept the whole catalog. If the plugin is of type ``"single"``, `fail`
80  should accept a single source record.
81  """
82 
83  inputCols = []
84  """DiaObject column names required by the plugin in order to run and
85  complete its calculation. DiaCalculationTask should raise an error is a
86  plugin is instantiated without the needed column available. Input columns
87  should be defined in the DPDD/cat/Apdb schema. Filter dependent columns
88  should be specified without the filter name perpended to them. eg
89  ``PSFluxMean`` instead of ``uPSFluxMean``.
90  """
91  outputCols = []
92  """DiaObject column names output by the plugin. DiaCalculationTask should
93  raise an error if another pluging is run output to the same column.
94  Output columns should be defined in the DPDD/cat/Apdb schema. Filter
95  dependent columns should be specified without the filter name perpended to
96  them. eg ``PSFluxMean`` instead of ``uPSFluxMean``.
97  """
98 
99  needsFilter = True
100  """This plugin requires a filter to be specified. Plugin's using filter
101  names usually deal with fluxes and only a sub-set of the DiaSource
102  catalog. Plugins that to not use the filter name usually run over a value
103  common across all observations/detections such as position.
104  """
105 
106  def __init__(self, config, name, metadata):
107  BasePlugin.__init__(self, config, name)
108 
109  def calculate(self,
110  diaObject,
111  diaSources,
112  filterDiaFluxes=None,
113  filterName=None,
114  **kwargs):
115  """Perform the calculation specified by this plugin.
116 
117  This method can either be used to operate on a single catalog record
118  or a whole catalog, populating it with the output defined by this
119  plugin.
120 
121  Note that results may be added to catalog records as new columns, or
122  may result in changes to existing values.
123 
124  Parameters
125  ----------
126  diaObject : `dict`
127  Summary object to store values in.
128  diaSources : `pandas.DataFrame`
129  DataFrame representing all diaSources associated with this
130  diaObject.
131  filterDiaFluxes : `pandas.DataFrame`
132  DataFrame representing diaSources associated with this
133  diaObject that are observed in the band pass ``filterName``.
134  filterName : `str`
135  Simple name of the filter for the flux being calculated.
136  **kwargs
137  Any additional keyword arguments that may be passed to the plugin.
138  """
139  raise NotImplementedError()
140 
141  def fail(self, diaObject, columns, error=None):
142  """Set diaObject position values to nan.
143 
144  Parameters
145  ----------
146  diaObject : `dict`
147  Summary object to store values in.
148  columns : `list` of `str`
149  List of string names of columns to write a the failed value.
150  error : `BaseException` or `None`
151  Error to pass. Kept for consistency with CatologCalculationPlugin.
152  Unused.
153  """
154  for colName in columns:
155  diaObject[colName] = np.nan
156 
157 
159  """Config class for the catalog calculation driver task.
160 
161  Specifies which plugins will execute when the `CatalogCalculationTask`
162  associated with this configuration is run.
163  """
164 
165  plugins = DiaObjectCalculationPlugin.registry.makeField(
166  multi=True,
167  default=["ap_meanPosition",
168  "ap_meanFlux"],
169  doc="Plugins to be run and their configuration")
170 
171 
173  """Run plugins which operate on a catalog of DIA sources.
174 
175  This task facilitates running plugins which will operate on a source
176  catalog. These plugins may do things such as classifying an object based
177  on source record entries inserted during a measurement task.
178 
179  This task differs from CatalogCaculationTask in the following ways:
180 
181  -No multi mode is available for plugins. All plugins are assumed to run
182  in single mode.
183 
184  -Input and output catalog types are assumed to be `pandas.DataFrames` with
185  columns following those used in the Apdb.
186 
187  -No schema argument is passed to the plugins. Each plugin specifies
188  output columns and required inputs.
189 
190  Parameters
191  ----------
192  plugMetaData : `lsst.daf.base.PropertyList` or `None`
193  Will be modified in-place to contain metadata about the plugins being
194  run. If `None`, an empty `~lsst.daf.base.PropertyList` will be
195  created.
196  **kwargs
197  Additional arguments passed to the superclass constructor.
198 
199  Notes
200  -----
201  Plugins may either take an entire catalog to work on at a time, or work on
202  individual records.
203  """
204  ConfigClass = DiaObjectCalculationConfig
205  _DefaultName = "diaObjectCalculation"
206 
207  def __init__(self, plugMetadata=None, **kwargs):
208  lsst.pipe.base.Task.__init__(self, **kwargs)
209  if plugMetadata is None:
210  plugMetadata = lsst.daf.base.PropertyList()
211  self.plugMetadataplugMetadataplugMetadata = plugMetadata
212  self.pluginspluginsplugins = PluginMap()
213  self.outputColsoutputCols = []
214 
215  self.initializePluginsinitializePluginsinitializePlugins()
216 
217  def initializePlugins(self):
218  """Initialize the plugins according to the configuration.
219  """
220 
221  pluginType = namedtuple('pluginType', 'single multi')
222  self.executionDictexecutionDictexecutionDict = {}
223  # Read the properties for each plugin. Allocate a dictionary entry for
224  # each run level. Verify that the plugins are above the minimum run
225  # level for an catalogCalculation plugin. For each run level, the
226  # plugins are sorted into either single record, or multi record groups
227  # to later be run appropriately
228  for executionOrder, name, config, PluginClass in sorted(self.config.plugins.apply()):
229  if executionOrder not in self.executionDictexecutionDictexecutionDict:
230  self.executionDictexecutionDictexecutionDict[executionOrder] = pluginType(single=[], multi=[])
231  if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_CATALOGCALCULATION:
232  plug = PluginClass(config, name, metadata=self.plugMetadataplugMetadataplugMetadata)
233 
234  self._validatePluginCols_validatePluginCols(plug)
235 
236  self.pluginspluginsplugins[name] = plug
237  if plug.plugType == 'single':
238  self.executionDictexecutionDictexecutionDict[executionOrder].single.append(plug)
239  elif plug.plugType == 'multi':
240  self.executionDictexecutionDictexecutionDict[executionOrder].multi.append(plug)
241  else:
242  errorTuple = (PluginClass, PluginClass.getExecutionOrder(),
243  BasePlugin.DEFAULT_CATALOGCALCULATION)
244  raise ValueError("{} has an execution order less than the minimum for an catalogCalculation "
245  "plugin. Value {} : Minimum {}".format(*errorTuple))
246 
247  def _validatePluginCols(self, plug):
248  """Assert that output columns are not duplicated and input columns
249  exist for dependent plugins.
250 
251  Parameters
252  ----------
253  plug : `lsst.ap.association.DiaCalculationPlugin`
254  Plugin to test for output collisions and input needs.
255  """
256  for inputName in plug.inputCols:
257  if inputName not in self.outputColsoutputCols:
258  errorTuple = (plug.name, plug.getExecutionOrder(),
259  inputName)
260  raise ValueError(
261  "Plugin, {} with execution order {} requires DiaObject "
262  "column {} to exist. Check the execution order of the "
263  "plugin and make sure it runs after a plugin creating "
264  "the column is run.".format(*errorTuple))
265  for outputName in plug.outputCols:
266  if outputName in self.outputColsoutputCols:
267  errorTuple = (plug.name, plug.getExecutionOrder(),
268  outputName)
269  raise ValueError(
270  "Plugin, {} with execution order {} is attempting to "
271  "output a column {}, however the column is already being "
272  "produced by another plugin. Check other plugins for "
273  "collisions with this one.".format(*errorTuple))
274  else:
275  self.outputColsoutputCols.append(outputName)
276 
277  @timeMethod
278  def run(self,
279  diaObjectCat,
280  diaSourceCat,
281  updatedDiaObjectIds,
282  filterNames):
283  """The entry point for the DIA catalog calculation task.
284 
285  Run method both updates the values in the diaObjectCat and appends
286  newly created DiaObjects to the catalog. For catalog column names
287  see the lsst.cat schema definitions for the DiaObject and DiaSource
288  tables (http://github.com/lsst/cat).
289 
290  Parameters
291  ----------
292  diaObjectCat : `pandas.DataFrame`
293  DiaObjects to update values of and append new objects to. DataFrame
294  should be indexed on "diaObjectId"
295  diaSourceCat : `pandas.DataFrame`
296  DiaSources associated with the DiaObjects in diaObjectCat.
297  DataFrame should be indexed on
298  `["diaObjectId", "filterName", "diaSourceId"]`
299  updatedDiaObjectIds : `numpy.ndarray`
300  Integer ids of the DiaObjects to update and create.
301  filterNames : `list` of `str`
302  List of string names of filters to be being processed.
303 
304  Returns
305  -------
306  returnStruct : `lsst.pipe.base.Struct`
307  Struct containing:
308 
309  ``diaObjectCat``
310  Full set of DiaObjects including both un-updated and
311  updated/new DiaObjects (`pandas.DataFrame`).
312  ``updatedDiaObjects``
313  Catalog of DiaObjects that were updated or created by this
314  task (`pandas.DataFrame`).
315  """
316  if diaObjectCat.index.name is None:
317  diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
318  elif diaObjectCat.index.name != "diaObjectId":
319  self.log.warning(
320  "Input diaObjectCat is indexed on column(s) incompatible with "
321  "this task. Should be indexed on 'diaObjectId'. Trying to set "
322  "index regardless")
323  diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
324 
325  # ``names`` by default is FrozenList([None]) hence we access the first
326  # element and test for None.
327  if diaSourceCat.index.names[0] is None:
328  diaSourceCat.set_index(
329  ["diaObjectId", "filterName", "diaSourceId"],
330  inplace=True,
331  drop=False)
332  elif (diaSourceCat.index.names
333  != ["diaObjectId", "filterName", "diaSourceId"]):
334  diaSourceCat.reset_index(inplace=True)
335  diaSourceCat.set_index(
336  ["diaObjectId", "filterName", "diaSourceId"],
337  inplace=True,
338  drop=False)
339 
340  return self.callComputecallComputecallCompute(diaObjectCat,
341  diaSourceCat,
342  updatedDiaObjectIds,
343  filterNames)
344 
345  @timeMethod
346  def callCompute(self,
347  diaObjectCat,
348  diaSourceCat,
349  updatedDiaObjectIds,
350  filterNames):
351  """Run each of the plugins on the catalog.
352 
353  For catalog column names see the lsst.cat schema definitions for the
354  DiaObject and DiaSource tables (http://github.com/lsst/cat).
355 
356  Parameters
357  ----------
358  diaObjectCat : `pandas.DataFrame`
359  DiaObjects to update values of and append new objects to. DataFrame
360  should be indexed on "diaObjectId"
361  diaSourceCat : `pandas.DataFrame`
362  DiaSources associated with the DiaObjects in diaObjectCat.
363  DataFrame must be indexed on
364  ["diaObjectId", "filterName", "diaSourceId"]`
365  updatedDiaObjectIds : `numpy.ndarray`
366  Integer ids of the DiaObjects to update and create.
367  filterNames : `list` of `str`
368  List of string names of filters to be being processed.
369 
370  Returns
371  -------
372  returnStruct : `lsst.pipe.base.Struct`
373  Struct containing:
374 
375  ``diaObjectCat``
376  Full set of DiaObjects including both un-updated and
377  updated/new DiaObjects (`pandas.DataFrame`).
378  ``updatedDiaObjects``
379  Catalog of DiaObjects that were updated or created by this
380  task (`pandas.DataFrame`).
381 
382  Raises
383  ------
384  KeyError
385  Raises if `pandas.DataFrame` indexing is not properly set.
386  """
387  # DiaObjects will be updated in place.
388  diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
389  self.log.info("Calculating summary stats for %i DiaObjects",
390  len(diaObjectsToUpdate))
391 
392  updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
393  diaSourcesGB = updatingDiaSources.groupby(level=0)
394  for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
395  for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
396  if plug.needsFilter:
397  continue
398  for updatedDiaObjectId in updatedDiaObjectIds:
399 
400  # Sub-select diaSources associated with this diaObject.
401  objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
402 
403  # Sub-select on diaSources observed in the current filter.
404  with CCContext(plug, updatedDiaObjectId, self.log):
405  # We feed the catalog we need to update and the id
406  # so as to get a few into the catalog and not a copy.
407  # This updates the values in the catalog.
408  plug.calculate(diaObjects=diaObjectsToUpdate,
409  diaObjectId=updatedDiaObjectId,
410  diaSources=objDiaSources,
411  filterDiaSources=None,
412  filterName=None)
413  for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
414  if plug.needsFilter:
415  continue
416  with CCContext(plug, diaObjectsToUpdate, self.log):
417  plug.calculate(diaObjects=diaObjectsToUpdate,
418  diaSources=diaSourcesGB,
419  filterDiaSources=None,
420  filterName=None)
421 
422  for filterName in filterNames:
423  try:
424  updatingFilterDiaSources = updatingDiaSources.loc[
425  (slice(None), filterName), :
426  ]
427  except KeyError:
428  self.log.warning("No DiaSource data with fitler=%s. "
429  "Continuing...", filterName)
430  continue
431  # Level=0 here groups by diaObjectId.
432  filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
433 
434  for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
435  for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
436  if not plug.needsFilter:
437  continue
438  for updatedDiaObjectId in updatedDiaObjectIds:
439 
440  # Sub-select diaSources associated with this diaObject.
441  objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
442 
443  # Sub-select on diaSources observed in the current filter.
444  try:
445  filterObjDiaSources = objDiaSources.loc[filterName]
446  except KeyError:
447  self.log.warning(
448  "DiaObjectId={updatedDiaObjectId} has no "
449  "DiaSources for filter=%s. "
450  "Continuing...", filterName)
451  with CCContext(plug, updatedDiaObjectId, self.log):
452  # We feed the catalog we need to update and the id
453  # so as to get a few into the catalog and not a copy.
454  # This updates the values in the catalog.
455  plug.calculate(diaObjects=diaObjectsToUpdate,
456  diaObjectId=updatedDiaObjectId,
457  diaSources=objDiaSources,
458  filterDiaSources=filterObjDiaSources,
459  filterName=filterName)
460  for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
461  if not plug.needsFilter:
462  continue
463  with CCContext(plug, diaObjectsToUpdate, self.log):
464  plug.calculate(diaObjects=diaObjectsToUpdate,
465  diaSources=diaSourcesGB,
466  filterDiaSources=filterDiaSourcesGB,
467  filterName=filterName)
468  # Need to store the newly updated diaObjects directly as the editing
469  # a view into diaObjectsToUpdate does not update the values of
470  # diaObjectCat.
471  diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
472  return lsst.pipe.base.Struct(
473  diaObjectCat=diaObjectCat,
474  updatedDiaObjects=diaObjectsToUpdate)
475 
476  def _initialize_dia_object(self, objId):
477  """Create a new DiaObject with values required to be initialized by the
478  Apdb.
479 
480  Parameters
481  ----------
482  objid : `int`
483  ``diaObjectId`` value for the of the new DiaObject.
484 
485  Returns
486  -------
487  diaObject : `dict`
488  Newly created DiaObject with keys:
489 
490  ``diaObjectId``
491  Unique DiaObjectId (`int`).
492  ``pmParallaxNdata``
493  Number of data points used for parallax calculation (`int`).
494  ``nearbyObj1``
495  Id of the a nearbyObject in the Object table (`int`).
496  ``nearbyObj2``
497  Id of the a nearbyObject in the Object table (`int`).
498  ``nearbyObj3``
499  Id of the a nearbyObject in the Object table (`int`).
500  ``?PSFluxData``
501  Number of data points used to calculate point source flux
502  summary statistics in each bandpass (`int`).
503  """
504  new_dia_object = {"diaObjectId": objId,
505  "pmParallaxNdata": 0,
506  "nearbyObj1": 0,
507  "nearbyObj2": 0,
508  "nearbyObj3": 0}
509  for f in ["u", "g", "r", "i", "z", "y"]:
510  new_dia_object["%sPSFluxNdata" % f] = 0
511  return new_dia_object
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
def calculate(self, diaObject, diaSources, filterDiaFluxes=None, filterName=None, **kwargs)
def fail(self, diaObject, columns, error=None)
def __init__(self, plugMetadata=None, **kwargs)
def run(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
def callCompute(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174