LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
diaCalculation.py
Go to the documentation of this file.
1 # This file is part of ap_association.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 from collections import namedtuple
23 import numpy as np
24 import pandas as pd
25 
26 from .catalogCalculation import (CatalogCalculationPluginConfig,
27  CatalogCalculationPlugin,
28  CatalogCalculationConfig,
29  CatalogCalculationTask,
30  CCContext)
31 from .pluginsBase import BasePlugin
32 from .pluginRegistry import (PluginRegistry, PluginMap)
33 import lsst.pipe.base
34 
35 # Enforce an error for unsafe column/array value setting in pandas.
36 pd.options.mode.chained_assignment = 'raise'
37 
38 __all__ = ("DiaObjectCalculationPlugin", "DiaObjectCalculationPluginConfig",
39  "DiaObjectCalculationTask", "DiaObjectCalculationConfig")
40 
41 
43  """Default configuration class for DIA catalog calculation plugins.
44  """
45  pass
46 
47 
49  """Base class for DIA catalog calculation plugins.
50 
51  Task follows CatalogCalculationPlugin with modifications for use in AP.
52 
53  Parameters
54  ----------
55  config : `DiaObjectCalculationPlugin.ConfigClass`
56  Plugin configuration.
57  name : `str`
58  The string the plugin was registered with.
59  metadata : `lsst.daf.base.PropertySet`
60  Plugin metadata that will be attached to the output catalog
61  """
62 
63  ConfigClass = DiaObjectCalculationPluginConfig
64 
65  registry = PluginRegistry(DiaObjectCalculationPluginConfig)
66  """List of available plugins (`lsst.meas.base.PluginRegistry`).
67  """
68 
69  FLUX_MOMENTS_CALCULATED = 5.0
70  """Add order after flux means and stds are calculated.
71  """
72 
73  plugType = 'single'
74  """Does the plugin operate on a single source or the whole catalog (`str`)?
75  If the plugin operates on a single source at a time, this should be set to
76  ``"single"``; if it expects the whoe catalog, to ``"multi"``. If the
77  plugin is of type ``"multi"``, the `fail` method must be implemented to
78  accept the whole catalog. If the plugin is of type ``"single"``, `fail`
79  should accept a single source record.
80  """
81 
82  inputCols = []
83  """DiaObject column names required by the plugin in order to run and
84  complete its calculation. DiaCalculationTask should raise an error is a
85  plugin is instantiated without the needed column available. Input columns
86  should be defined in the DPDD/cat/Apdb schema. Filter dependent columns
87  should be specified without the filter name perpended to them. eg
88  ``PSFluxMean`` instead of ``uPSFluxMean``.
89  """
90  outputCols = []
91  """DiaObject column names output by the plugin. DiaCalculationTask should
92  raise an error if another pluging is run output to the same column.
93  Output columns should be defined in the DPDD/cat/Apdb schema. Filter
94  dependent columns should be specified without the filter name perpended to
95  them. eg ``PSFluxMean`` instead of ``uPSFluxMean``.
96  """
97 
98  needsFilter = True
99  """This plugin requires a filter to be specified. Plugin's using filter
100  names usually deal with fluxes and only a sub-set of the DiaSource
101  catalog. Plugins that to not use the filter name usually run over a value
102  common across all observations/detections such as position.
103  """
104 
105  def __init__(self, config, name, metadata):
106  BasePlugin.__init__(self, config, name)
107 
108  def calculate(self,
109  diaObject,
110  diaSources,
111  filterDiaFluxes=None,
112  filterName=None,
113  **kwargs):
114  """Perform the calculation specified by this plugin.
115 
116  This method can either be used to operate on a single catalog record
117  or a whole catalog, populating it with the output defined by this
118  plugin.
119 
120  Note that results may be added to catalog records as new columns, or
121  may result in changes to existing values.
122 
123  Parameters
124  ----------
125  diaObject : `dict`
126  Summary object to store values in.
127  diaSources : `pandas.DataFrame`
128  DataFrame representing all diaSources associated with this
129  diaObject.
130  filterDiaFluxes : `pandas.DataFrame`
131  DataFrame representing diaSources associated with this
132  diaObject that are observed in the band pass ``filterName``.
133  filterName : `str`
134  Simple name of the filter for the flux being calculated.
135  **kwargs
136  Any additional keyword arguments that may be passed to the plugin.
137  """
138  raise NotImplementedError()
139 
140  def fail(self, diaObject, columns, error=None):
141  """Set diaObject position values to nan.
142 
143  Parameters
144  ----------
145  diaObject : `dict`
146  Summary object to store values in.
147  columns : `list` of `str`
148  List of string names of columns to write a the failed value.
149  error : `BaseException` or `None`
150  Error to pass. Kept for consistency with CatologCalculationPlugin.
151  Unused.
152  """
153  for colName in columns:
154  diaObject[colName] = np.nan
155 
156 
158  """Config class for the catalog calculation driver task.
159 
160  Specifies which plugins will execute when the `CatalogCalculationTask`
161  associated with this configuration is run.
162  """
163 
164  plugins = DiaObjectCalculationPlugin.registry.makeField(
165  multi=True,
166  default=["ap_meanPosition",
167  "ap_meanFlux"],
168  doc="Plugins to be run and their configuration")
169 
170 
172  """Run plugins which operate on a catalog of DIA sources.
173 
174  This task facilitates running plugins which will operate on a source
175  catalog. These plugins may do things such as classifying an object based
176  on source record entries inserted during a measurement task.
177 
178  This task differs from CatalogCaculationTask in the following ways:
179 
180  -No multi mode is available for plugins. All plugins are assumed to run
181  in single mode.
182 
183  -Input and output catalog types are assumed to be `pandas.DataFrames` with
184  columns following those used in the Apdb.
185 
186  -No schema argument is passed to the plugins. Each plugin specifies
187  output columns and required inputs.
188 
189  Parameters
190  ----------
191  plugMetaData : `lsst.daf.base.PropertyList` or `None`
192  Will be modified in-place to contain metadata about the plugins being
193  run. If `None`, an empty `~lsst.daf.base.PropertyList` will be
194  created.
195  **kwargs
196  Additional arguments passed to the superclass constructor.
197 
198  Notes
199  -----
200  Plugins may either take an entire catalog to work on at a time, or work on
201  individual records.
202  """
203  ConfigClass = DiaObjectCalculationConfig
204  _DefaultName = "diaObjectCalculation"
205 
206  def __init__(self, plugMetadata=None, **kwargs):
207  lsst.pipe.base.Task.__init__(self, **kwargs)
208  if plugMetadata is None:
209  plugMetadata = lsst.daf.base.PropertyList()
210  self.plugMetadataplugMetadataplugMetadata = plugMetadata
211  self.pluginspluginsplugins = PluginMap()
212  self.outputColsoutputCols = []
213 
214  self.initializePluginsinitializePluginsinitializePlugins()
215 
216  def initializePlugins(self):
217  """Initialize the plugins according to the configuration.
218  """
219 
220  pluginType = namedtuple('pluginType', 'single multi')
221  self.executionDictexecutionDictexecutionDict = {}
222  # Read the properties for each plugin. Allocate a dictionary entry for
223  # each run level. Verify that the plugins are above the minimum run
224  # level for an catalogCalculation plugin. For each run level, the
225  # plugins are sorted into either single record, or multi record groups
226  # to later be run appropriately
227  for executionOrder, name, config, PluginClass in sorted(self.config.plugins.apply()):
228  if executionOrder not in self.executionDictexecutionDictexecutionDict:
229  self.executionDictexecutionDictexecutionDict[executionOrder] = pluginType(single=[], multi=[])
230  if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_CATALOGCALCULATION:
231  plug = PluginClass(config, name, metadata=self.plugMetadataplugMetadataplugMetadata)
232 
233  self._validatePluginCols_validatePluginCols(plug)
234 
235  self.pluginspluginsplugins[name] = plug
236  if plug.plugType == 'single':
237  self.executionDictexecutionDictexecutionDict[executionOrder].single.append(plug)
238  elif plug.plugType == 'multi':
239  self.executionDictexecutionDictexecutionDict[executionOrder].multi.append(plug)
240  else:
241  errorTuple = (PluginClass, PluginClass.getExecutionOrder(),
242  BasePlugin.DEFAULT_CATALOGCALCULATION)
243  raise ValueError("{} has an execution order less than the minimum for an catalogCalculation "
244  "plugin. Value {} : Minimum {}".format(*errorTuple))
245 
246  def _validatePluginCols(self, plug):
247  """Assert that output columns are not duplicated and input columns
248  exist for dependent plugins.
249 
250  Parameters
251  ----------
252  plug : `lsst.ap.association.DiaCalculationPlugin`
253  Plugin to test for output collisions and input needs.
254  """
255  for inputName in plug.inputCols:
256  if inputName not in self.outputColsoutputCols:
257  errorTuple = (plug.name, plug.getExecutionOrder(),
258  inputName)
259  raise ValueError(
260  "Plugin, {} with execution order {} requires DiaObject "
261  "column {} to exist. Check the execution order of the "
262  "plugin and make sure it runs after a plugin creating "
263  "the column is run.".format(*errorTuple))
264  for outputName in plug.outputCols:
265  if outputName in self.outputColsoutputCols:
266  errorTuple = (plug.name, plug.getExecutionOrder(),
267  outputName)
268  raise ValueError(
269  "Plugin, {} with execution order {} is attempting to "
270  "output a column {}, however the column is already being "
271  "produced by another plugin. Check other plugins for "
272  "collisions with this one.".format(*errorTuple))
273  else:
274  self.outputColsoutputCols.append(outputName)
275 
276  @lsst.pipe.base.timeMethod
277  def run(self,
278  diaObjectCat,
279  diaSourceCat,
280  updatedDiaObjectIds,
281  filterNames):
282  """The entry point for the DIA catalog calculation task.
283 
284  Run method both updates the values in the diaObjectCat and appends
285  newly created DiaObjects to the catalog. For catalog column names
286  see the lsst.cat schema definitions for the DiaObject and DiaSource
287  tables (http://github.com/lsst/cat).
288 
289  Parameters
290  ----------
291  diaObjectCat : `pandas.DataFrame`
292  DiaObjects to update values of and append new objects to. DataFrame
293  should be indexed on "diaObjectId"
294  diaSourceCat : `pandas.DataFrame`
295  DiaSources associated with the DiaObjects in diaObjectCat.
296  DataFrame should be indexed on
297  `["diaObjectId", "filterName", "diaSourceId"]`
298  updatedDiaObjectIds : `numpy.ndarray`
299  Integer ids of the DiaObjects to update and create.
300  filterNames : `list` of `str`
301  List of string names of filters to be being processed.
302 
303  Returns
304  -------
305  returnStruct : `lsst.pipe.base.Struct`
306  Struct containing:
307 
308  ``diaObjectCat``
309  Full set of DiaObjects including both un-updated and
310  updated/new DiaObjects (`pandas.DataFrame`).
311  ``updatedDiaObjects``
312  Catalog of DiaObjects that were updated or created by this
313  task (`pandas.DataFrame`).
314  """
315  if diaObjectCat.index.name is None:
316  diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
317  elif diaObjectCat.index.name != "diaObjectId":
318  self.log.warning(
319  "Input diaObjectCat is indexed on column(s) incompatible with "
320  "this task. Should be indexed on 'diaObjectId'. Trying to set "
321  "index regardless")
322  diaObjectCat.set_index("diaObjectId", inplace=True, drop=False)
323 
324  # ``names`` by default is FrozenList([None]) hence we access the first
325  # element and test for None.
326  if diaSourceCat.index.names[0] is None:
327  diaSourceCat.set_index(
328  ["diaObjectId", "filterName", "diaSourceId"],
329  inplace=True,
330  drop=False)
331  elif (diaSourceCat.index.names
332  != ["diaObjectId", "filterName", "diaSourceId"]):
333  diaSourceCat.reset_index(inplace=True)
334  diaSourceCat.set_index(
335  ["diaObjectId", "filterName", "diaSourceId"],
336  inplace=True,
337  drop=False)
338 
339  return self.callComputecallComputecallCompute(diaObjectCat,
340  diaSourceCat,
341  updatedDiaObjectIds,
342  filterNames)
343 
344  @lsst.pipe.base.timeMethod
345  def callCompute(self,
346  diaObjectCat,
347  diaSourceCat,
348  updatedDiaObjectIds,
349  filterNames):
350  """Run each of the plugins on the catalog.
351 
352  For catalog column names see the lsst.cat schema definitions for the
353  DiaObject and DiaSource tables (http://github.com/lsst/cat).
354 
355  Parameters
356  ----------
357  diaObjectCat : `pandas.DataFrame`
358  DiaObjects to update values of and append new objects to. DataFrame
359  should be indexed on "diaObjectId"
360  diaSourceCat : `pandas.DataFrame`
361  DiaSources associated with the DiaObjects in diaObjectCat.
362  DataFrame must be indexed on
363  ["diaObjectId", "filterName", "diaSourceId"]`
364  updatedDiaObjectIds : `numpy.ndarray`
365  Integer ids of the DiaObjects to update and create.
366  filterNames : `list` of `str`
367  List of string names of filters to be being processed.
368 
369  Returns
370  -------
371  returnStruct : `lsst.pipe.base.Struct`
372  Struct containing:
373 
374  ``diaObjectCat``
375  Full set of DiaObjects including both un-updated and
376  updated/new DiaObjects (`pandas.DataFrame`).
377  ``updatedDiaObjects``
378  Catalog of DiaObjects that were updated or created by this
379  task (`pandas.DataFrame`).
380 
381  Raises
382  ------
383  KeyError
384  Raises if `pandas.DataFrame` indexing is not properly set.
385  """
386  # DiaObjects will be updated in place.
387  diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
388  self.log.info("Calculating summary stats for %i DiaObjects",
389  len(diaObjectsToUpdate))
390 
391  updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
392  diaSourcesGB = updatingDiaSources.groupby(level=0)
393  for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
394  for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
395  if plug.needsFilter:
396  continue
397  for updatedDiaObjectId in updatedDiaObjectIds:
398 
399  # Sub-select diaSources associated with this diaObject.
400  objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
401 
402  # Sub-select on diaSources observed in the current filter.
403  with CCContext(plug, updatedDiaObjectId, self.log):
404  # We feed the catalog we need to update and the id
405  # so as to get a few into the catalog and not a copy.
406  # This updates the values in the catalog.
407  plug.calculate(diaObjects=diaObjectsToUpdate,
408  diaObjectId=updatedDiaObjectId,
409  diaSources=objDiaSources,
410  filterDiaSources=None,
411  filterName=None)
412  for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
413  if plug.needsFilter:
414  continue
415  with CCContext(plug, diaObjectsToUpdate, self.log):
416  plug.calculate(diaObjects=diaObjectsToUpdate,
417  diaSources=diaSourcesGB,
418  filterDiaSources=None,
419  filterName=None)
420 
421  for filterName in filterNames:
422  try:
423  updatingFilterDiaSources = updatingDiaSources.loc[
424  (slice(None), filterName), :
425  ]
426  except KeyError:
427  self.log.warning("No DiaSource data with fitler=%s. "
428  "Continuing...", filterName)
429  continue
430  # Level=0 here groups by diaObjectId.
431  filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
432 
433  for runlevel in sorted(self.executionDictexecutionDictexecutionDict):
434  for plug in self.executionDictexecutionDictexecutionDict[runlevel].single:
435  if not plug.needsFilter:
436  continue
437  for updatedDiaObjectId in updatedDiaObjectIds:
438 
439  # Sub-select diaSources associated with this diaObject.
440  objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
441 
442  # Sub-select on diaSources observed in the current filter.
443  try:
444  filterObjDiaSources = objDiaSources.loc[filterName]
445  except KeyError:
446  self.log.warning(
447  "DiaObjectId={updatedDiaObjectId} has no "
448  "DiaSources for filter=%s. "
449  "Continuing...", filterName)
450  with CCContext(plug, updatedDiaObjectId, self.log):
451  # We feed the catalog we need to update and the id
452  # so as to get a few into the catalog and not a copy.
453  # This updates the values in the catalog.
454  plug.calculate(diaObjects=diaObjectsToUpdate,
455  diaObjectId=updatedDiaObjectId,
456  diaSources=objDiaSources,
457  filterDiaSources=filterObjDiaSources,
458  filterName=filterName)
459  for plug in self.executionDictexecutionDictexecutionDict[runlevel].multi:
460  if not plug.needsFilter:
461  continue
462  with CCContext(plug, diaObjectsToUpdate, self.log):
463  plug.calculate(diaObjects=diaObjectsToUpdate,
464  diaSources=diaSourcesGB,
465  filterDiaSources=filterDiaSourcesGB,
466  filterName=filterName)
467  # Need to store the newly updated diaObjects directly as the editing
468  # a view into diaObjectsToUpdate does not update the values of
469  # diaObjectCat.
470  diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
471  return lsst.pipe.base.Struct(
472  diaObjectCat=diaObjectCat,
473  updatedDiaObjects=diaObjectsToUpdate)
474 
475  def _initialize_dia_object(self, objId):
476  """Create a new DiaObject with values required to be initialized by the
477  Apdb.
478 
479  Parameters
480  ----------
481  objid : `int`
482  ``diaObjectId`` value for the of the new DiaObject.
483 
484  Returns
485  -------
486  diaObject : `dict`
487  Newly created DiaObject with keys:
488 
489  ``diaObjectId``
490  Unique DiaObjectId (`int`).
491  ``pmParallaxNdata``
492  Number of data points used for parallax calculation (`int`).
493  ``nearbyObj1``
494  Id of the a nearbyObject in the Object table (`int`).
495  ``nearbyObj2``
496  Id of the a nearbyObject in the Object table (`int`).
497  ``nearbyObj3``
498  Id of the a nearbyObject in the Object table (`int`).
499  ``?PSFluxData``
500  Number of data points used to calculate point source flux
501  summary statistics in each bandpass (`int`).
502  """
503  new_dia_object = {"diaObjectId": objId,
504  "pmParallaxNdata": 0,
505  "nearbyObj1": 0,
506  "nearbyObj2": 0,
507  "nearbyObj3": 0}
508  for f in ["u", "g", "r", "i", "z", "y"]:
509  new_dia_object["%sPSFluxNdata" % f] = 0
510  return new_dia_object
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
def calculate(self, diaObject, diaSources, filterDiaFluxes=None, filterName=None, **kwargs)
def fail(self, diaObject, columns, error=None)
def __init__(self, plugMetadata=None, **kwargs)
def run(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
def callCompute(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174