22 from collections
import namedtuple
26 from .catalogCalculation
import (CatalogCalculationPluginConfig,
27 CatalogCalculationPlugin,
28 CatalogCalculationConfig,
29 CatalogCalculationTask,
31 from .pluginsBase
import BasePlugin
32 from .pluginRegistry
import (PluginRegistry, PluginMap)
34 from lsst.utils.timer
import timeMethod
37 pd.options.mode.chained_assignment =
'raise'
39 __all__ = (
"DiaObjectCalculationPlugin",
"DiaObjectCalculationPluginConfig",
40 "DiaObjectCalculationTask",
"DiaObjectCalculationConfig")
44 """Default configuration class for DIA catalog calculation plugins.
50 """Base class for DIA catalog calculation plugins.
52 Task follows CatalogCalculationPlugin with modifications for use in AP.
56 config : `DiaObjectCalculationPlugin.ConfigClass`
59 The string the plugin was registered with.
60 metadata : `lsst.daf.base.PropertySet`
61 Plugin metadata that will be attached to the output catalog
64 ConfigClass = DiaObjectCalculationPluginConfig
67 """List of available plugins (`lsst.meas.base.PluginRegistry`).
70 FLUX_MOMENTS_CALCULATED = 5.0
71 """Add order after flux means and stds are calculated.
75 """Does the plugin operate on a single source or the whole catalog (`str`)?
76 If the plugin operates on a single source at a time, this should be set to
77 ``"single"``; if it expects the whoe catalog, to ``"multi"``. If the
78 plugin is of type ``"multi"``, the `fail` method must be implemented to
79 accept the whole catalog. If the plugin is of type ``"single"``, `fail`
80 should accept a single source record.
84 """DiaObject column names required by the plugin in order to run and
85 complete its calculation. DiaCalculationTask should raise an error is a
86 plugin is instantiated without the needed column available. Input columns
87 should be defined in the DPDD/cat/Apdb schema. Filter dependent columns
88 should be specified without the filter name perpended to them. eg
89 ``PSFluxMean`` instead of ``uPSFluxMean``.
92 """DiaObject column names output by the plugin. DiaCalculationTask should
93 raise an error if another pluging is run output to the same column.
94 Output columns should be defined in the DPDD/cat/Apdb schema. Filter
95 dependent columns should be specified without the filter name perpended to
96 them. eg ``PSFluxMean`` instead of ``uPSFluxMean``.
100 """This plugin requires a filter to be specified. Plugin's using filter
101 names usually deal with fluxes and only a sub-set of the DiaSource
102 catalog. Plugins that to not use the filter name usually run over a value
103 common across all observations/detections such as position.
107 BasePlugin.__init__(self, config, name)
112 filterDiaFluxes=None,
115 """Perform the calculation specified by this plugin.
117 This method can either be used to operate on a single catalog record
118 or a whole catalog, populating it with the output defined by this
121 Note that results may be added to catalog records as new columns, or
122 may result in changes to existing values.
127 Summary object to store values in.
128 diaSources : `pandas.DataFrame`
129 DataFrame representing all diaSources associated with this
131 filterDiaFluxes : `pandas.DataFrame`
132 DataFrame representing diaSources associated with this
133 diaObject that are observed in the band pass ``filterName``.
135 Simple name of the filter for the flux being calculated.
137 Any additional keyword arguments that may be passed to the plugin.
139 raise NotImplementedError()
141 def fail(self, diaObject, columns, error=None):
142 """Set diaObject position values to nan.
147 Summary object to store values in.
148 columns : `list` of `str`
149 List of string names of columns to write a the failed value.
150 error : `BaseException` or `None`
151 Error to pass. Kept for consistency with CatologCalculationPlugin.
154 for colName
in columns:
155 diaObject[colName] = np.nan
159 """Config class for the catalog calculation driver task.
161 Specifies which plugins will execute when the `CatalogCalculationTask`
162 associated with this configuration is run.
165 plugins = DiaObjectCalculationPlugin.registry.makeField(
167 default=[
"ap_meanPosition",
169 doc=
"Plugins to be run and their configuration")
173 """Run plugins which operate on a catalog of DIA sources.
175 This task facilitates running plugins which will operate on a source
176 catalog. These plugins may do things such as classifying an object based
177 on source record entries inserted during a measurement task.
179 This task differs from CatalogCaculationTask in the following ways:
181 -No multi mode is available for plugins. All plugins are assumed to run
184 -Input and output catalog types are assumed to be `pandas.DataFrames` with
185 columns following those used in the Apdb.
187 -No schema argument is passed to the plugins. Each plugin specifies
188 output columns and required inputs.
192 plugMetaData : `lsst.daf.base.PropertyList` or `None`
193 Will be modified in-place to contain metadata about the plugins being
194 run. If `None`, an empty `~lsst.daf.base.PropertyList` will be
197 Additional arguments passed to the superclass constructor.
201 Plugins may either take an entire catalog to work on at a time, or work on
204 ConfigClass = DiaObjectCalculationConfig
205 _DefaultName =
"diaObjectCalculation"
208 lsst.pipe.base.Task.__init__(self, **kwargs)
209 if plugMetadata
is None:
218 """Initialize the plugins according to the configuration.
221 pluginType = namedtuple(
'pluginType',
'single multi')
228 for executionOrder, name, config, PluginClass
in sorted(self.config.plugins.apply()):
231 if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_CATALOGCALCULATION:
237 if plug.plugType ==
'single':
239 elif plug.plugType ==
'multi':
242 errorTuple = (PluginClass, PluginClass.getExecutionOrder(),
243 BasePlugin.DEFAULT_CATALOGCALCULATION)
244 raise ValueError(
"{} has an execution order less than the minimum for an catalogCalculation "
245 "plugin. Value {} : Minimum {}".
format(*errorTuple))
247 def _validatePluginCols(self, plug):
248 """Assert that output columns are not duplicated and input columns
249 exist for dependent plugins.
253 plug : `lsst.ap.association.DiaCalculationPlugin`
254 Plugin to test for output collisions and input needs.
256 for inputName
in plug.inputCols:
257 if inputName
not in self.
outputColsoutputCols:
258 errorTuple = (plug.name, plug.getExecutionOrder(),
261 "Plugin, {} with execution order {} requires DiaObject "
262 "column {} to exist. Check the execution order of the "
263 "plugin and make sure it runs after a plugin creating "
264 "the column is run.".
format(*errorTuple))
265 for outputName
in plug.outputCols:
267 errorTuple = (plug.name, plug.getExecutionOrder(),
270 "Plugin, {} with execution order {} is attempting to "
271 "output a column {}, however the column is already being "
272 "produced by another plugin. Check other plugins for "
273 "collisions with this one.".
format(*errorTuple))
283 """The entry point for the DIA catalog calculation task.
285 Run method both updates the values in the diaObjectCat and appends
286 newly created DiaObjects to the catalog. For catalog column names
287 see the lsst.cat schema definitions for the DiaObject and DiaSource
288 tables (http://github.com/lsst/cat).
292 diaObjectCat : `pandas.DataFrame`
293 DiaObjects to update values of and append new objects to. DataFrame
294 should be indexed on "diaObjectId"
295 diaSourceCat : `pandas.DataFrame`
296 DiaSources associated with the DiaObjects in diaObjectCat.
297 DataFrame should be indexed on
298 `["diaObjectId", "filterName", "diaSourceId"]`
299 updatedDiaObjectIds : `numpy.ndarray`
300 Integer ids of the DiaObjects to update and create.
301 filterNames : `list` of `str`
302 List of string names of filters to be being processed.
306 returnStruct : `lsst.pipe.base.Struct`
310 Full set of DiaObjects including both un-updated and
311 updated/new DiaObjects (`pandas.DataFrame`).
312 ``updatedDiaObjects``
313 Catalog of DiaObjects that were updated or created by this
314 task (`pandas.DataFrame`).
316 if diaObjectCat.index.name
is None:
317 diaObjectCat.set_index(
"diaObjectId", inplace=
True, drop=
False)
318 elif diaObjectCat.index.name !=
"diaObjectId":
320 "Input diaObjectCat is indexed on column(s) incompatible with "
321 "this task. Should be indexed on 'diaObjectId'. Trying to set "
323 diaObjectCat.set_index(
"diaObjectId", inplace=
True, drop=
False)
327 if diaSourceCat.index.names[0]
is None:
328 diaSourceCat.set_index(
329 [
"diaObjectId",
"filterName",
"diaSourceId"],
332 elif (diaSourceCat.index.names
333 != [
"diaObjectId",
"filterName",
"diaSourceId"]):
334 diaSourceCat.reset_index(inplace=
True)
335 diaSourceCat.set_index(
336 [
"diaObjectId",
"filterName",
"diaSourceId"],
351 """Run each of the plugins on the catalog.
353 For catalog column names see the lsst.cat schema definitions for the
354 DiaObject and DiaSource tables (http://github.com/lsst/cat).
358 diaObjectCat : `pandas.DataFrame`
359 DiaObjects to update values of and append new objects to. DataFrame
360 should be indexed on "diaObjectId"
361 diaSourceCat : `pandas.DataFrame`
362 DiaSources associated with the DiaObjects in diaObjectCat.
363 DataFrame must be indexed on
364 ["diaObjectId", "filterName", "diaSourceId"]`
365 updatedDiaObjectIds : `numpy.ndarray`
366 Integer ids of the DiaObjects to update and create.
367 filterNames : `list` of `str`
368 List of string names of filters to be being processed.
372 returnStruct : `lsst.pipe.base.Struct`
376 Full set of DiaObjects including both un-updated and
377 updated/new DiaObjects (`pandas.DataFrame`).
378 ``updatedDiaObjects``
379 Catalog of DiaObjects that were updated or created by this
380 task (`pandas.DataFrame`).
385 Raises if `pandas.DataFrame` indexing is not properly set.
388 diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
389 self.log.
info(
"Calculating summary stats for %i DiaObjects",
390 len(diaObjectsToUpdate))
392 updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
393 diaSourcesGB = updatingDiaSources.groupby(level=0)
398 for updatedDiaObjectId
in updatedDiaObjectIds:
401 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
404 with CCContext(plug, updatedDiaObjectId, self.log):
408 plug.calculate(diaObjects=diaObjectsToUpdate,
409 diaObjectId=updatedDiaObjectId,
410 diaSources=objDiaSources,
411 filterDiaSources=
None,
416 with CCContext(plug, diaObjectsToUpdate, self.log):
417 plug.calculate(diaObjects=diaObjectsToUpdate,
418 diaSources=diaSourcesGB,
419 filterDiaSources=
None,
422 for filterName
in filterNames:
424 updatingFilterDiaSources = updatingDiaSources.loc[
425 (slice(
None), filterName), :
428 self.log.
warning(
"No DiaSource data with fitler=%s. "
429 "Continuing...", filterName)
432 filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
436 if not plug.needsFilter:
438 for updatedDiaObjectId
in updatedDiaObjectIds:
441 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
445 filterObjDiaSources = objDiaSources.loc[filterName]
448 "DiaObjectId={updatedDiaObjectId} has no "
449 "DiaSources for filter=%s. "
450 "Continuing...", filterName)
451 with CCContext(plug, updatedDiaObjectId, self.log):
455 plug.calculate(diaObjects=diaObjectsToUpdate,
456 diaObjectId=updatedDiaObjectId,
457 diaSources=objDiaSources,
458 filterDiaSources=filterObjDiaSources,
459 filterName=filterName)
461 if not plug.needsFilter:
463 with CCContext(plug, diaObjectsToUpdate, self.log):
464 plug.calculate(diaObjects=diaObjectsToUpdate,
465 diaSources=diaSourcesGB,
466 filterDiaSources=filterDiaSourcesGB,
467 filterName=filterName)
471 diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
472 return lsst.pipe.base.Struct(
473 diaObjectCat=diaObjectCat,
474 updatedDiaObjects=diaObjectsToUpdate)
476 def _initialize_dia_object(self, objId):
477 """Create a new DiaObject with values required to be initialized by the
483 ``diaObjectId`` value for the of the new DiaObject.
488 Newly created DiaObject with keys:
491 Unique DiaObjectId (`int`).
493 Number of data points used for parallax calculation (`int`).
495 Id of the a nearbyObject in the Object table (`int`).
497 Id of the a nearbyObject in the Object table (`int`).
499 Id of the a nearbyObject in the Object table (`int`).
501 Number of data points used to calculate point source flux
502 summary statistics in each bandpass (`int`).
504 new_dia_object = {
"diaObjectId": objId,
505 "pmParallaxNdata": 0,
509 for f
in [
"u",
"g",
"r",
"i",
"z",
"y"]:
510 new_dia_object[
"%sPSFluxNdata" % f] = 0
511 return new_dia_object
Class for storing ordered metadata with comments.
def callCompute(self, catalog)
def initializePlugins(self)
def __init__(self, config, name, metadata)
def calculate(self, diaObject, diaSources, filterDiaFluxes=None, filterName=None, **kwargs)
def fail(self, diaObject, columns, error=None)
def __init__(self, plugMetadata=None, **kwargs)
def _validatePluginCols(self, plug)
def run(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
def initializePlugins(self)
def callCompute(self, diaObjectCat, diaSourceCat, updatedDiaObjectIds, filterNames)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)