LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.pipe.tasks.functors.CompositeFunctor Class Reference
Inheritance diagram for lsst.pipe.tasks.functors.CompositeFunctor:
lsst.pipe.tasks.functors.Functor

Public Member Functions

 __init__ (self, funcs, **kwargs)
 
 filt (self)
 
 filt (self, filt)
 
 update (self, new)
 
 columns (self)
 
 multilevelColumns (self, data, **kwargs)
 
 __call__ (self, data, **kwargs)
 
 renameCol (cls, col, renameRules)
 
 from_file (cls, filename, **kwargs)
 
 from_yaml (cls, translationDefinition, **kwargs)
 
 name (self)
 
 noDup (self)
 
 difference (self, data1, data2, **kwargs)
 
 fail (self, df)
 
 shortname (self)
 

Public Attributes

dict funcDict = funcs
 
str dataset = dataset if dataset is not None else self._defaultDataset
 
 log = logging.getLogger(type(self).__name__)
 
 name
 

Protected Member Functions

 _get_data_columnLevels (self, data, columnIndex=None)
 
 _get_data_columnLevelNames (self, data, columnIndex=None)
 
 _colsFromDict (self, colDict, columnIndex=None)
 
 _func (self, df, dropna=True)
 
 _get_columnIndex (self, data)
 
 _get_data (self, data)
 
 _setLevels (self, df)
 
 _dropna (self, vals)
 

Protected Attributes

 _filt = None
 
 _noDup = noDup
 

Static Protected Attributes

str _defaultDataset = 'ref'
 
tuple _dfLevels = ('column',)
 
bool _defaultNoDup = False
 

Detailed Description

Perform multiple calculations at once on a catalog.

The role of a `CompositeFunctor` is to group together computations from
multiple functors.
Instead of returning `~pandas.Series` a `CompositeFunctor` returns a
`~pandas.DataFrame`, with the column names being the keys of ``funcDict``.

The `columns` attribute of a `CompositeFunctor` is the union of all columns
in all the component functors.

A `CompositeFunctor` does not use a `_func` method itself; rather, when a
`CompositeFunctor` is called, all its columns are loaded at once, and the
resulting DataFrame is passed to the `_func` method of each component
functor.
This has the advantage of only doing I/O (reading from parquet file) once,
and works because each individual `_func` method of each component functor
does not care if there are *extra* columns in the DataFrame being passed;
only that it must contain *at least* the `columns` it expects.

An important and useful class method is `from_yaml`, which takes as an
argument the path to a YAML file specifying a collection of functors.

Parameters
----------
funcs : `dict` or `list`
    Dictionary or list of functors.
    If a list, then it will be converted into a dictonary according to the
    `.shortname` attribute of each functor.

Definition at line 380 of file functors.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.pipe.tasks.functors.CompositeFunctor.__init__ ( self,
funcs,
** kwargs )

Definition at line 413 of file functors.py.

413 def __init__(self, funcs, **kwargs):
414
415 if type(funcs) is dict:
416 self.funcDict = funcs
417 else:
418 self.funcDict = {f.shortname: f for f in funcs}
419
420 self._filt = None
421
422 super().__init__(**kwargs)
423

Member Function Documentation

◆ __call__()

lsst.pipe.tasks.functors.CompositeFunctor.__call__ ( self,
data,
** kwargs )
Apply the functor to the data table.

Parameters
----------
data : various
    The data represented as `~lsst.daf.butler.DeferredDatasetHandle`,
    `~lsst.pipe.base.InMemoryDatasetHandle`, or `~pandas.DataFrame`.
    The table or a pointer to a table on disk from which columns can
    be accessed.

Definition at line 467 of file functors.py.

467 def __call__(self, data, **kwargs):
468 """Apply the functor to the data table.
469
470 Parameters
471 ----------
472 data : various
473 The data represented as `~lsst.daf.butler.DeferredDatasetHandle`,
474 `~lsst.pipe.base.InMemoryDatasetHandle`, or `~pandas.DataFrame`.
475 The table or a pointer to a table on disk from which columns can
476 be accessed.
477 """
478 if isinstance(data, pd.DataFrame):
479 _data = InMemoryDatasetHandle(data, storageClass="DataFrame")
480 elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
481 _data = data
482 else:
483 raise RuntimeError(f"Unexpected type provided for data. Got {get_full_type_name(data)}.")
484
485 columnIndex = self._get_columnIndex(_data)
486
487 if isinstance(columnIndex, pd.MultiIndex):
488 columns = self.multilevelColumns(_data, columnIndex=columnIndex)
489 df = _data.get(parameters={"columns": columns})
490
491 valDict = {}
492 for k, f in self.funcDict.items():
493 try:
494 subdf = f._setLevels(
495 df[f.multilevelColumns(_data, returnTuple=True, columnIndex=columnIndex)]
496 )
497 valDict[k] = f._func(subdf)
498 except Exception as e:
499 self.log.exception(
500 "Exception in %s (funcs: %s) call: %s",
501 self.name,
502 str(list(self.funcDict.keys())),
503 type(e).__name__,
504 )
505 try:
506 valDict[k] = f.fail(subdf)
507 except NameError:
508 raise e
509
510 else:
511 df = _data.get(parameters={"columns": self.columns})
512
513 valDict = {k: f._func(df) for k, f in self.funcDict.items()}
514
515 # Check that output columns are actually columns.
516 for name, colVal in valDict.items():
517 if len(colVal.shape) != 1:
518 raise RuntimeError("Transformed column '%s' is not the shape of a column. "
519 "It is shaped %s and type %s." % (name, colVal.shape, type(colVal)))
520
521 try:
522 valDf = pd.concat(valDict, axis=1)
523 except TypeError:
524 print([(k, type(v)) for k, v in valDict.items()])
525 raise
526
527 if kwargs.get('dropna', False):
528 valDf = valDf.dropna(how='any')
529
530 return valDf
531

◆ _colsFromDict()

lsst.pipe.tasks.functors.Functor._colsFromDict ( self,
colDict,
columnIndex = None )
protectedinherited
Converts dictionary column specficiation to a list of columns.

Definition at line 218 of file functors.py.

218 def _colsFromDict(self, colDict, columnIndex=None):
219 """Converts dictionary column specficiation to a list of columns."""
220 new_colDict = {}
221 columnLevels = self._get_data_columnLevels(None, columnIndex=columnIndex)
222
223 for i, lev in enumerate(columnLevels):
224 if lev in colDict:
225 if isinstance(colDict[lev], str):
226 new_colDict[lev] = [colDict[lev]]
227 else:
228 new_colDict[lev] = colDict[lev]
229 else:
230 new_colDict[lev] = columnIndex.levels[i]
231
232 levelCols = [new_colDict[lev] for lev in columnLevels]
233 cols = list(product(*levelCols))
234 colsAvailable = [col for col in cols if col in columnIndex]
235 return colsAvailable
236

◆ _dropna()

lsst.pipe.tasks.functors.Functor._dropna ( self,
vals )
protectedinherited

Definition at line 345 of file functors.py.

345 def _dropna(self, vals):
346 return vals.dropna()
347

◆ _func()

lsst.pipe.tasks.functors.Functor._func ( self,
df,
dropna = True )
protectedinherited

Reimplemented in lsst.pipe.tasks.functors.Color, lsst.pipe.tasks.functors.Column, lsst.pipe.tasks.functors.ComputePixelScale, lsst.pipe.tasks.functors.ConvertDetectorAngleToPositionAngle, lsst.pipe.tasks.functors.ConvertPixelSqToArcsecondsSq, lsst.pipe.tasks.functors.ConvertPixelToArcseconds, lsst.pipe.tasks.functors.CoordColumn, lsst.pipe.tasks.functors.CustomFunctor, lsst.pipe.tasks.functors.DeconvolvedMoments, lsst.pipe.tasks.functors.E1, lsst.pipe.tasks.functors.E2, lsst.pipe.tasks.functors.Ebv, lsst.pipe.tasks.functors.HsmFwhm, lsst.pipe.tasks.functors.HsmTraceSize, lsst.pipe.tasks.functors.HtmIndex20, lsst.pipe.tasks.functors.Index, lsst.pipe.tasks.functors.LocalDipoleDiffFlux, lsst.pipe.tasks.functors.LocalDipoleDiffFluxErr, lsst.pipe.tasks.functors.LocalDipoleMeanFlux, lsst.pipe.tasks.functors.LocalDipoleMeanFluxErr, lsst.pipe.tasks.functors.LocalNanojansky, lsst.pipe.tasks.functors.LocalNanojanskyErr, lsst.pipe.tasks.functors.Mag, lsst.pipe.tasks.functors.MagDiff, lsst.pipe.tasks.functors.MagErr, lsst.pipe.tasks.functors.MomentsIuuSky, lsst.pipe.tasks.functors.MomentsIuvSky, lsst.pipe.tasks.functors.MomentsIvvSky, lsst.pipe.tasks.functors.MultibandSinglePrecisionFloatColumn, lsst.pipe.tasks.functors.NanoJansky, lsst.pipe.tasks.functors.NanoJanskyErr, lsst.pipe.tasks.functors.PositionAngleFromMoments, lsst.pipe.tasks.functors.PsfHsmTraceSizeDiff, lsst.pipe.tasks.functors.PsfSdssTraceSizeDiff, lsst.pipe.tasks.functors.RADecCovColumn, lsst.pipe.tasks.functors.RadiusFromQuadrupole, lsst.pipe.tasks.functors.ReferenceBand, lsst.pipe.tasks.functors.SdssTraceSize, lsst.pipe.tasks.functors.SemimajorAxisFromMoments, lsst.pipe.tasks.functors.SemiminorAxisFromMoments, and lsst.pipe.tasks.functors.SinglePrecisionFloatColumn.

Definition at line 291 of file functors.py.

291 def _func(self, df, dropna=True):
292 raise NotImplementedError('Must define calculation on DataFrame')
293

◆ _get_columnIndex()

lsst.pipe.tasks.functors.Functor._get_columnIndex ( self,
data )
protectedinherited
Return columnIndex.

Definition at line 294 of file functors.py.

294 def _get_columnIndex(self, data):
295 """Return columnIndex."""
296
297 if isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
298 return data.get(component="columns")
299 else:
300 return None
301

◆ _get_data()

lsst.pipe.tasks.functors.Functor._get_data ( self,
data )
protectedinherited
Retrieve DataFrame necessary for calculation.

The data argument can be a `~pandas.DataFrame`, a
`~lsst.daf.butler.DeferredDatasetHandle`, or
an `~lsst.pipe.base.InMemoryDatasetHandle`.

Returns a DataFrame upon which `self._func` can act.

Definition at line 302 of file functors.py.

302 def _get_data(self, data):
303 """Retrieve DataFrame necessary for calculation.
304
305 The data argument can be a `~pandas.DataFrame`, a
306 `~lsst.daf.butler.DeferredDatasetHandle`, or
307 an `~lsst.pipe.base.InMemoryDatasetHandle`.
308
309 Returns a DataFrame upon which `self._func` can act.
310 """
311 # We wrap a DataFrame in a handle here to take advantage of the
312 # DataFrame delegate DataFrame column wrangling abilities.
313 if isinstance(data, pd.DataFrame):
314 _data = InMemoryDatasetHandle(data, storageClass="DataFrame")
315 elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
316 _data = data
317 else:
318 raise RuntimeError(f"Unexpected type provided for data. Got {get_full_type_name(data)}.")
319
320 # First thing to do: check to see if the data source has a multilevel
321 # column index or not.
322 columnIndex = self._get_columnIndex(_data)
323 is_multiLevel = isinstance(columnIndex, pd.MultiIndex)
324
325 # Get proper columns specification for this functor.
326 if is_multiLevel:
327 columns = self.multilevelColumns(_data, columnIndex=columnIndex)
328 else:
329 columns = self.columns
330
331 # Load in-memory DataFrame with appropriate columns the gen3 way.
332 df = _data.get(parameters={"columns": columns})
333
334 # Drop unnecessary column levels.
335 if is_multiLevel:
336 df = self._setLevels(df)
337
338 return df
339

◆ _get_data_columnLevelNames()

lsst.pipe.tasks.functors.Functor._get_data_columnLevelNames ( self,
data,
columnIndex = None )
protectedinherited
Gets the content of each of the column levels for a multilevel
table.

Definition at line 204 of file functors.py.

204 def _get_data_columnLevelNames(self, data, columnIndex=None):
205 """Gets the content of each of the column levels for a multilevel
206 table.
207 """
208 if columnIndex is None:
209 columnIndex = data.get(component="columns")
210
211 columnLevels = columnIndex.names
212 columnLevelNames = {
213 level: list(np.unique(np.array([c for c in columnIndex])[:, i]))
214 for i, level in enumerate(columnLevels)
215 }
216 return columnLevelNames
217

◆ _get_data_columnLevels()

lsst.pipe.tasks.functors.Functor._get_data_columnLevels ( self,
data,
columnIndex = None )
protectedinherited
Gets the names of the column index levels.

This should only be called in the context of a multilevel table.

Parameters
----------
data : various
    The data to be read, can be a
    `~lsst.daf.butler.DeferredDatasetHandle` or
    `~lsst.pipe.base.InMemoryDatasetHandle`.
columnIndex (optional): pandas `~pandas.Index` object
    If not passed, then it is read from the
    `~lsst.daf.butler.DeferredDatasetHandle`
    for `~lsst.pipe.base.InMemoryDatasetHandle`.

Definition at line 184 of file functors.py.

184 def _get_data_columnLevels(self, data, columnIndex=None):
185 """Gets the names of the column index levels.
186
187 This should only be called in the context of a multilevel table.
188
189 Parameters
190 ----------
191 data : various
192 The data to be read, can be a
193 `~lsst.daf.butler.DeferredDatasetHandle` or
194 `~lsst.pipe.base.InMemoryDatasetHandle`.
195 columnIndex (optional): pandas `~pandas.Index` object
196 If not passed, then it is read from the
197 `~lsst.daf.butler.DeferredDatasetHandle`
198 for `~lsst.pipe.base.InMemoryDatasetHandle`.
199 """
200 if columnIndex is None:
201 columnIndex = data.get(component="columns")
202 return columnIndex.names
203

◆ _setLevels()

lsst.pipe.tasks.functors.Functor._setLevels ( self,
df )
protectedinherited

Definition at line 340 of file functors.py.

340 def _setLevels(self, df):
341 levelsToDrop = [n for n in df.columns.names if n not in self._dfLevels]
342 df.columns = df.columns.droplevel(levelsToDrop)
343 return df
344

◆ columns()

lsst.pipe.tasks.functors.CompositeFunctor.columns ( self)
Columns required to perform calculation.

Reimplemented from lsst.pipe.tasks.functors.Functor.

Definition at line 449 of file functors.py.

449 def columns(self):
450 return list(set([x for y in [f.columns for f in self.funcDict.values()] for x in y]))
451

◆ difference()

lsst.pipe.tasks.functors.Functor.difference ( self,
data1,
data2,
** kwargs )
inherited
Computes difference between functor called on two different
DataFrame/Handle objects.

Definition at line 360 of file functors.py.

360 def difference(self, data1, data2, **kwargs):
361 """Computes difference between functor called on two different
362 DataFrame/Handle objects.
363 """
364 return self(data1, **kwargs) - self(data2, **kwargs)
365

◆ fail()

lsst.pipe.tasks.functors.Functor.fail ( self,
df )
inherited

Definition at line 366 of file functors.py.

366 def fail(self, df):
367 return pd.Series(np.full(len(df), np.nan), index=df.index)
368

◆ filt() [1/2]

lsst.pipe.tasks.functors.CompositeFunctor.filt ( self)

Reimplemented from lsst.pipe.tasks.functors.Functor.

Definition at line 425 of file functors.py.

425 def filt(self):
426 return self._filt
427

◆ filt() [2/2]

lsst.pipe.tasks.functors.CompositeFunctor.filt ( self,
filt )

Reimplemented from lsst.pipe.tasks.functors.Functor.

Definition at line 429 of file functors.py.

429 def filt(self, filt):
430 if filt is not None:
431 for _, f in self.funcDict.items():
432 f.filt = filt
433 self._filt = filt
434

◆ from_file()

lsst.pipe.tasks.functors.CompositeFunctor.from_file ( cls,
filename,
** kwargs )

Definition at line 542 of file functors.py.

542 def from_file(cls, filename, **kwargs):
543 # Allow environment variables in the filename.
544 filename = os.path.expandvars(filename)
545 with open(filename) as f:
546 translationDefinition = yaml.safe_load(f)
547
548 return cls.from_yaml(translationDefinition, **kwargs)
549

◆ from_yaml()

lsst.pipe.tasks.functors.CompositeFunctor.from_yaml ( cls,
translationDefinition,
** kwargs )

Definition at line 551 of file functors.py.

551 def from_yaml(cls, translationDefinition, **kwargs):
552 funcs = {}
553 for func, val in translationDefinition['funcs'].items():
554 funcs[func] = init_fromDict(val, name=func)
555
556 if 'flag_rename_rules' in translationDefinition:
557 renameRules = translationDefinition['flag_rename_rules']
558 else:
559 renameRules = None
560
561 if 'calexpFlags' in translationDefinition:
562 for flag in translationDefinition['calexpFlags']:
563 funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='calexp')
564
565 if 'refFlags' in translationDefinition:
566 for flag in translationDefinition['refFlags']:
567 funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='ref')
568
569 if 'forcedFlags' in translationDefinition:
570 for flag in translationDefinition['forcedFlags']:
571 funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='forced_src')
572
573 if 'flags' in translationDefinition:
574 for flag in translationDefinition['flags']:
575 funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='meas')
576
577 return cls(funcs, **kwargs)
578
579

◆ multilevelColumns()

lsst.pipe.tasks.functors.CompositeFunctor.multilevelColumns ( self,
data,
** columnIndex )
Returns columns needed by functor from multilevel dataset.

To access tables with multilevel column structure, the
`~lsst.daf.butler.DeferredDatasetHandle` or
`~lsst.pipe.base.InMemoryDatasetHandle` needs to be passed
either a list of tuples or a dictionary.

Parameters
----------
data : various
    The data as either `~lsst.daf.butler.DeferredDatasetHandle`, or
    `~lsst.pipe.base.InMemoryDatasetHandle`.
columnIndex (optional): pandas `~pandas.Index` object
    Either passed or read in from
    `~lsst.daf.butler.DeferredDatasetHandle`.
`returnTuple` : `bool`
    If true, then return a list of tuples rather than the column
    dictionary specification.
    This is set to `True` by `CompositeFunctor` in order to be able to
    combine columns from the various component functors.

Reimplemented from lsst.pipe.tasks.functors.Functor.

Definition at line 452 of file functors.py.

452 def multilevelColumns(self, data, **kwargs):
453 # Get the union of columns for all component functors.
454 # Note the need to have `returnTuple=True` here.
455 return list(
456 set(
457 [
458 x
459 for y in [
460 f.multilevelColumns(data, returnTuple=True, **kwargs) for f in self.funcDict.values()
461 ]
462 for x in y
463 ]
464 )
465 )
466

◆ name()

◆ noDup()

lsst.pipe.tasks.functors.Functor.noDup ( self)
inherited
Do not explode by band if used on object table.

Definition at line 170 of file functors.py.

170 def noDup(self):
171 """Do not explode by band if used on object table."""
172 if self._noDup is not None:
173 return self._noDup
174 else:
175 return self._defaultNoDup
176

◆ renameCol()

lsst.pipe.tasks.functors.CompositeFunctor.renameCol ( cls,
col,
renameRules )

Definition at line 533 of file functors.py.

533 def renameCol(cls, col, renameRules):
534 if renameRules is None:
535 return col
536 for old, new in renameRules:
537 if col.startswith(old):
538 col = col.replace(old, new)
539 return col
540

◆ shortname()

lsst.pipe.tasks.functors.Functor.shortname ( self)
inherited
Short name of functor (suitable for column name/dict key).

Reimplemented in lsst.pipe.tasks.functors.Color, and lsst.pipe.tasks.functors.MagDiff.

Definition at line 375 of file functors.py.

375 def shortname(self):
376 """Short name of functor (suitable for column name/dict key)."""
377 return self.name
378
379

◆ update()

lsst.pipe.tasks.functors.CompositeFunctor.update ( self,
new )
Update the functor with new functors.

Definition at line 435 of file functors.py.

435 def update(self, new):
436 """Update the functor with new functors."""
437 if isinstance(new, dict):
438 self.funcDict.update(new)
439 elif isinstance(new, CompositeFunctor):
440 self.funcDict.update(new.funcDict)
441 else:
442 raise TypeError('Can only update with dictionary or CompositeFunctor.')
443
444 # Make sure new functors have the same 'filt' set.
445 if self.filt is not None:
446 self.filt = self.filt
447

Member Data Documentation

◆ _defaultDataset

str lsst.pipe.tasks.functors.Functor._defaultDataset = 'ref'
staticprotectedinherited

Definition at line 159 of file functors.py.

◆ _defaultNoDup

bool lsst.pipe.tasks.functors.Functor._defaultNoDup = False
staticprotectedinherited

Definition at line 161 of file functors.py.

◆ _dfLevels

tuple lsst.pipe.tasks.functors.Functor._dfLevels = ('column',)
staticprotectedinherited

Definition at line 160 of file functors.py.

◆ _filt

lsst.pipe.tasks.functors.CompositeFunctor._filt = None
protected

Definition at line 420 of file functors.py.

◆ _noDup

lsst.pipe.tasks.functors.Functor._noDup = noDup
protectedinherited

Definition at line 166 of file functors.py.

◆ dataset

str lsst.pipe.tasks.functors.Functor.dataset = dataset if dataset is not None else self._defaultDataset
inherited

Definition at line 165 of file functors.py.

◆ funcDict

dict lsst.pipe.tasks.functors.CompositeFunctor.funcDict = funcs

Definition at line 416 of file functors.py.

◆ log

lsst.pipe.tasks.functors.Functor.log = logging.getLogger(type(self).__name__)
inherited

Definition at line 167 of file functors.py.

◆ name


The documentation for this class was generated from the following file: