6 import astropy.units
as u
9 from .parquetTable
import MultilevelParquetTable
12 def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors', typeKey='functor'):
13 """Initialize an object defined in a dictionary 15 The object needs to be importable as 16 '{0}.{1}'.format(basePath, initDict[typeKey]) 17 The positional and keyword arguments (if any) are contained in 18 "args" and "kwargs" entries in the dictionary, respectively. 19 This is used in `functors.CompositeFunctor.from_yaml` to initialize 20 a composite functor from a specification in a YAML file. 25 Dictionary describing object's initialization. Must contain 26 an entry keyed by ``typeKey`` that is the name of the object, 27 relative to ``basePath``. 29 Path relative to module in which ``initDict[typeKey]`` is defined. 31 Key of ``initDict`` that is the name of the object 32 (relative to `basePath`). 34 initDict = initDict.copy()
36 pythonType =
doImport(
'{0}.{1}'.
format(basePath, initDict.pop(typeKey)))
38 if 'args' in initDict:
39 args = initDict.pop(
'args')
40 if isinstance(args, str):
43 return pythonType(*args, **initDict)
47 """Define and execute a calculation on a ParquetTable 49 The `__call__` method accepts a `ParquetTable` object, and returns the 50 result of the calculation as a single column. Each functor defines what 51 columns are needed for the calculation, and only these columns are read 52 from the `ParquetTable`. 54 The action of `__call__` consists of two steps: first, loading the 55 necessary columns from disk into memory as a `pandas.DataFrame` object; 56 and second, performing the computation on this dataframe and returning the 60 To define a new `Functor`, a subclass must define a `_func` method, 61 that takes a `pandas.DataFrame` and returns result in a `pandas.Series`. 62 In addition, it must define the following attributes 64 * `_columns`: The columns necessary to perform the calculation 65 * `name`: A name appropriate for a figure axis label 66 * `shortname`: A name appropriate for use as a dictionary key 68 On initialization, a `Functor` should declare what filter (`filt` kwarg) 69 and dataset (e.g. `'ref'`, `'meas'`, `'forced_src'`) it is intended to be 70 applied to. This enables the `_get_cols` method to extract the proper 71 columns from the parquet file. If not specified, the dataset will fall back 72 on the `_defaultDataset`attribute. If filter is not specified and `dataset` 73 is anything other than `'ref'`, then an error will be raised when trying to 74 perform the calculation. 76 As currently implemented, `Functor` is only set up to expect a 77 `ParquetTable` of the format of the `deepCoadd_obj` dataset; that is, a 78 `MultilevelParquetTable` with the levels of the column index being `filter`, 79 `dataset`, and `column`. This is defined in the `_columnLevels` attribute, 80 as well as being implicit in the role of the `filt` and `dataset` attributes 81 defined at initialization. In addition, the `_get_cols` method that reads 82 the dataframe from the `ParquetTable` will return a dataframe with column 83 index levels defined by the `_dfLevels` attribute; by default, this is 86 The `_columnLevels` and `_dfLevels` attributes should generally not need to 87 be changed, unless `_func` needs columns from multiple filters or datasets 88 to do the calculation. 89 An example of this is the `lsst.pipe.tasks.functors.Color` functor, for 90 which `_dfLevels = ('filter', 'column')`, and `_func` expects the dataframe 91 it gets to have those levels in the column index. 96 Filter upon which to do the calculation 99 Dataset upon which to do the calculation 100 (e.g., 'ref', 'meas', 'forced_src'). 104 _defaultDataset =
'ref' 105 _columnLevels = (
'filter',
'dataset',
'column')
106 _dfLevels = (
'column',)
107 _defaultNoDup =
False 109 def __init__(self, filt=None, dataset=None, noDup=None):
116 if self.
_noDup is not None:
123 """Columns required to perform calculation 125 if not hasattr(self,
'_columns'):
126 raise NotImplementedError(
'Must define columns property or _columns attribute')
131 raise ValueError(
'ParquetTable does not have the expected column levels. ' +
134 columnDict = {
'column': self.
columns,
136 if self.
filt is None:
137 if 'filter' in parq.columnLevels:
139 columnDict[
'filter'] = parq.columnLevelNames[
'filter'][0]
141 raise ValueError(
"'filt' not set for functor {}".
format(self.
name) +
143 "and ParquetTable " +
144 "contains multiple filters in column index. " +
145 "Set 'filt' or set 'dataset' to 'ref'.")
147 columnDict[
'filter'] = self.
filt 149 return parq._colsFromDict(columnDict)
151 def _func(self, df, dropna=True):
152 raise NotImplementedError(
'Must define calculation on dataframe')
154 def _get_cols(self, parq):
155 """Retrieve dataframe necessary for calculation. 157 Returns dataframe upon which `self._func` can act. 159 if isinstance(parq, MultilevelParquetTable):
161 df = parq.toDataFrame(columns=columns, droplevels=
False)
165 df = parq.toDataFrame(columns=columns)
169 def _setLevels(self, df):
170 levelsToDrop = [n
for n
in df.columns.names
if n
not in self.
_dfLevels]
171 df.columns = df.columns.droplevel(levelsToDrop)
174 def _dropna(self, vals):
180 vals = self.
_func(df)
189 return pd.Series(np.full(len(df), np.nan), index=df.index)
193 """Full name of functor (suitable for figure labels) 195 return NotImplementedError
199 """Short name of functor (suitable for column name/dict key) 205 """Perform multiple calculations at once on a catalog 207 The role of a `CompositeFunctor` is to group together computations from 208 multiple functors. Instead of returning `pandas.Series` a 209 `CompositeFunctor` returns a `pandas.Dataframe`, with the column names 210 being the keys of `funcDict`. 212 The `columns` attribute of a `CompositeFunctor` is the union of all columns 213 in all the component functors. 215 A `CompositeFunctor` does not use a `_func` method itself; rather, 216 when a `CompositeFunctor` is called, all its columns are loaded 217 at once, and the resulting dataframe is passed to the `_func` method of each component 218 functor. This has the advantage of only doing I/O (reading from parquet file) once, 219 and works because each individual `_func` method of each component functor does not 220 care if there are *extra* columns in the dataframe being passed; only that it must contain 221 *at least* the `columns` it expects. 223 An important and useful class method is `from_yaml`, which takes as argument the path to a YAML 224 file specifying a collection of functors. 228 funcs : `dict` or `list` 229 Dictionary or list of functors. If a list, then it will be converted 230 into a dictonary according to the `.shortname` attribute of each functor. 237 if type(funcs) == dict:
240 self.
funcDict = {f.shortname: f
for f
in funcs}
258 if isinstance(new, dict):
260 elif isinstance(new, CompositeFunctor):
263 raise TypeError(
'Can only update with dictionary or CompositeFunctor.')
266 if self.
filt is not None:
271 return list(
set([x
for y
in [f.columns
for f
in self.
funcDict.values()]
for x
in y]))
274 return list(
set([x
for y
in [f.multilevelColumns(parq)
275 for f
in self.
funcDict.values()]
for x
in y]))
278 if isinstance(parq, MultilevelParquetTable):
280 df = parq.toDataFrame(columns=columns, droplevels=
False)
284 subdf = f._setLevels(df[f.multilevelColumns(parq)])
285 valDict[k] = f._func(subdf)
287 valDict[k] = f.fail(subdf)
290 df = parq.toDataFrame(columns=columns)
294 valDf = pd.concat(valDict, axis=1)
296 print([(k,
type(v))
for k, v
in valDict.items()])
299 if kwargs.get(
'dropna',
False):
300 valDf = valDf.dropna(how=
'any')
306 if renameRules
is None:
308 for old, new
in renameRules:
309 if col.startswith(old):
310 col = col.replace(old, new)
315 with open(filename)
as f:
316 translationDefinition = yaml.safe_load(f)
318 return cls.
from_yaml(translationDefinition, **kwargs)
323 for func, val
in translationDefinition[
'funcs'].
items():
326 if 'flag_rename_rules' in translationDefinition:
327 renameRules = translationDefinition[
'flag_rename_rules']
331 if 'flags' in translationDefinition:
332 for flag
in translationDefinition[
'flags']:
335 return cls(funcs, **kwargs)
339 """Evaluate an expression on a DataFrame, knowing what the 'mag' function means 341 Builds on `pandas.DataFrame.eval`, which parses and executes math on dataframes. 345 df : pandas.DataFrame 346 Dataframe on which to evaluate expression. 352 expr_new = re.sub(
r'mag\((\w+)\)',
r'-2.5*log(\g<1>)/log(10)', expr)
353 val = df.eval(expr_new, truediv=
True)
355 expr_new = re.sub(
r'mag\((\w+)\)',
r'-2.5*log(\g<1>_instFlux)/log(10)', expr)
356 val = df.eval(expr_new, truediv=
True)
361 """Arbitrary computation on a catalog 363 Column names (and thus the columns to be loaded from catalog) are found 364 by finding all words and trying to ignore all "math-y" words. 369 Expression to evaluate, to be parsed and executed by `mag_aware_eval`. 371 _ignore_words = (
'mag',
'sin',
'cos',
'exp',
'log',
'sqrt')
383 flux_cols = re.findall(
r'mag\(\s*(\w+)\s*\)', self.
expr)
385 cols = [c
for c
in re.findall(
r'[a-zA-Z_]+', self.
expr)
if c
not in self.
_ignore_words]
388 if not re.search(
'_instFlux$', c):
389 cols.append(
'{}_instFlux'.
format(c))
394 return list(
set([c
for c
in cols
if c
not in not_a_col]))
401 """Get column with specified name 421 """Return the value of the index for each object 424 columns = [
'coord_ra']
425 _defaultDataset =
'ref' 429 return pd.Series(df.index, index=df.index)
434 _allow_difference =
False 438 return pd.Series(df.index, index=df.index)
442 col =
'base_Footprint_nPix' 446 """Base class for coordinate column, in degrees 448 _allow_difference =
False 452 def __init__(self, col, calculate=False, **kwargs):
464 """Right Ascension, in degrees 469 super().
__init__(
'coord_ra', **kwargs)
472 return super().
__call__(catalog, **kwargs)
476 """Declination, in degrees 481 super().
__init__(
'coord_dec', **kwargs)
484 return super().
__call__(catalog, **kwargs)
488 if not col.endswith(
'_instFlux'):
494 if not col.endswith(
'_instFluxErr'):
495 col +=
'_instFluxErr' 500 """Compute calibrated magnitude 502 Takes a `calib` argument, which returns the flux at mag=0 503 as `calib.getFluxMag0()`. If not provided, then the default 504 `fluxMag0` is 63095734448.0194, which is default for HSC. 505 This default should be removed in DM-21955 507 This calculation hides warnings about invalid values and dividing by zero. 509 As for all functors, a `dataset` and `filt` kwarg should be provided upon 510 initialization. Unlike the default `Functor`, however, the default dataset 511 for a `Mag` is `'meas'`, rather than `'ref'`. 516 Name of flux column from which to compute magnitude. Can be parseable 517 by `lsst.pipe.tasks.functors.fluxName` function---that is, you can pass 518 `'modelfit_CModel'` instead of `'modelfit_CModel_instFlux'`) and it will 520 calib : `lsst.afw.image.calib.Calib` (optional) 521 Object that knows zero point. 523 _defaultDataset =
'meas' 528 if calib
is not None:
541 with np.warnings.catch_warnings():
542 np.warnings.filterwarnings(
'ignore',
r'invalid value encountered')
543 np.warnings.filterwarnings(
'ignore',
r'divide by zero')
552 """Compute calibrated magnitude uncertainty 554 Takes the same `calib` object as `lsst.pipe.tasks.functors.Mag`. 559 calib : `lsst.afw.image.calib.Calib` (optional) 560 Object that knows zero point. 565 if self.
calib is not None:
572 return [self.
col, self.
col +
'Err']
575 with np.warnings.catch_warnings():
576 np.warnings.filterwarnings(
'ignore',
r'invalid value encountered')
577 np.warnings.filterwarnings(
'ignore',
r'divide by zero')
578 fluxCol, fluxErrCol = self.
columns 579 x = df[fluxErrCol] / df[fluxCol]
581 magErr = (2.5 / np.log(10.)) * np.sqrt(x*x + y*y)
586 return super().name +
'_err' 598 _defaultDataset =
'meas' 600 """Functor to calculate magnitude difference""" 612 with np.warnings.catch_warnings():
613 np.warnings.filterwarnings(
'ignore',
r'invalid value encountered')
614 np.warnings.filterwarnings(
'ignore',
r'divide by zero')
615 return -2.5*np.log10(df[self.
col1]/df[self.
col2])
627 """Compute the color between two filters 629 Computes color by initializing two different `Mag` 630 functors based on the `col` and filters provided, and 631 then returning the difference. 633 This is enabled by the `_func` expecting a dataframe with a 634 multilevel column index, with both `'filter'` and `'column'`, 635 instead of just `'column'`, which is the `Functor` default. 636 This is controlled by the `_dfLevels` attribute. 638 Also of note, the default dataset for `Color` is `forced_src'`, 639 whereas for `Mag` it is `'meas'`. 644 Name of flux column from which to compute; same as would be passed to 645 `lsst.pipe.tasks.functors.Mag`. 648 Filters from which to compute magnitude difference. 649 Color computed is `Mag(filt2) - Mag(filt1)`. 651 _defaultDataset =
'forced_src' 652 _dfLevels = (
'filter',
'column')
658 raise RuntimeError(
"Cannot compute Color for %s: %s - %s " % (col, filt2, filt1))
676 mag2 = self.mag2._func(df[self.filt2])
677 mag1 = self.mag1._func(df[self.filt1])
682 return [self.
mag1.col, self.
mag2.col]
694 return '{0}_{1}m{2}'.
format(self.
col, self.
filt2.replace(
'-',
''),
695 self.
filt1.replace(
'-',
''))
699 """Main function of this subclass is to override the dropna=True 702 _allow_difference =
False 707 return super().
__call__(parq, dropna=
False, **kwargs)
711 _columns = [
"base_ClassificationExtendedness_value"]
712 _column =
"base_ClassificationExtendedness_value" 717 test = (x < 0.5).astype(int)
718 test = test.mask(mask, 2)
723 label = pd.Series(pd.Categorical.from_codes(test, categories=categories),
724 index=x.index, name=
'label')
726 label = label.astype(str)
731 _columns = [
'numStarFlags']
732 labels = {
"star": 0,
"maybe": 1,
"notStar": 2}
738 n = len(x.unique()) - 1
740 labels = [
'noStar',
'maybe',
'star']
741 label = pd.Series(pd.cut(x, [-1, 0, n-1, n], labels=labels),
742 index=x.index, name=
'label')
745 label = label.astype(str)
751 name =
'Deconvolved Moments' 752 shortname =
'deconvolvedMoments' 753 _columns = (
"ext_shapeHSM_HsmSourceMoments_xx",
754 "ext_shapeHSM_HsmSourceMoments_yy",
755 "base_SdssShape_xx",
"base_SdssShape_yy",
756 "ext_shapeHSM_HsmPsfMoments_xx",
757 "ext_shapeHSM_HsmPsfMoments_yy")
760 """Calculate deconvolved moments""" 761 if "ext_shapeHSM_HsmSourceMoments_xx" in df.columns:
762 hsm = df[
"ext_shapeHSM_HsmSourceMoments_xx"] + df[
"ext_shapeHSM_HsmSourceMoments_yy"]
764 hsm = np.ones(len(df))*np.nan
765 sdss = df[
"base_SdssShape_xx"] + df[
"base_SdssShape_yy"]
766 if "ext_shapeHSM_HsmPsfMoments_xx" in df.columns:
767 psf = df[
"ext_shapeHSM_HsmPsfMoments_xx"] + df[
"ext_shapeHSM_HsmPsfMoments_yy"]
772 raise RuntimeError(
'No psf shape parameter found in catalog')
774 return hsm.where(np.isfinite(hsm), sdss) - psf
778 """Functor to calculate SDSS trace radius size for sources""" 779 name =
"SDSS Trace Size" 780 shortname =
'sdssTrace' 781 _columns = (
"base_SdssShape_xx",
"base_SdssShape_yy")
784 srcSize = np.sqrt(0.5*(df[
"base_SdssShape_xx"] + df[
"base_SdssShape_yy"]))
789 """Functor to calculate SDSS trace radius size difference (%) between object and psf model""" 790 name =
"PSF - SDSS Trace Size" 791 shortname =
'psf_sdssTrace' 792 _columns = (
"base_SdssShape_xx",
"base_SdssShape_yy",
793 "base_SdssShape_psf_xx",
"base_SdssShape_psf_yy")
796 srcSize = np.sqrt(0.5*(df[
"base_SdssShape_xx"] + df[
"base_SdssShape_yy"]))
797 psfSize = np.sqrt(0.5*(df[
"base_SdssShape_psf_xx"] + df[
"base_SdssShape_psf_yy"]))
798 sizeDiff = 100*(srcSize - psfSize)/(0.5*(srcSize + psfSize))
803 """Functor to calculate HSM trace radius size for sources""" 804 name =
'HSM Trace Size' 805 shortname =
'hsmTrace' 806 _columns = (
"ext_shapeHSM_HsmSourceMoments_xx",
807 "ext_shapeHSM_HsmSourceMoments_yy")
810 srcSize = np.sqrt(0.5*(df[
"ext_shapeHSM_HsmSourceMoments_xx"] +
811 df[
"ext_shapeHSM_HsmSourceMoments_yy"]))
816 """Functor to calculate HSM trace radius size difference (%) between object and psf model""" 817 name =
'PSF - HSM Trace Size' 818 shortname =
'psf_HsmTrace' 819 _columns = (
"ext_shapeHSM_HsmSourceMoments_xx",
820 "ext_shapeHSM_HsmSourceMoments_yy",
821 "ext_shapeHSM_HsmPsfMoments_xx",
822 "ext_shapeHSM_HsmPsfMoments_yy")
825 srcSize = np.sqrt(0.5*(df[
"ext_shapeHSM_HsmSourceMoments_xx"] +
826 df[
"ext_shapeHSM_HsmSourceMoments_yy"]))
827 psfSize = np.sqrt(0.5*(df[
"ext_shapeHSM_HsmPsfMoments_xx"] +
828 df[
"ext_shapeHSM_HsmPsfMoments_yy"]))
829 sizeDiff = 100*(srcSize - psfSize)/(0.5*(srcSize + psfSize))
834 name =
'HSM Psf FWHM' 835 _columns = (
'ext_shapeHSM_HsmPsfMoments_xx',
'ext_shapeHSM_HsmPsfMoments_yy')
838 SIGMA2FWHM = 2*np.sqrt(2*np.log(2))
842 0.5*(df[
'ext_shapeHSM_HsmPsfMoments_xx'] + df[
'ext_shapeHSM_HsmPsfMoments_yy']))
846 name =
"Distortion Ellipticity (e1)" 847 shortname =
"Distortion" 865 name =
"Ellipticity e2" 898 name =
'Reference Band' 899 shortname =
'refBand' 903 return [
"merge_measurement_i",
904 "merge_measurement_r",
905 "merge_measurement_z",
906 "merge_measurement_y",
907 "merge_measurement_g"]
910 def getFilterAliasName(row):
912 colName = row.idxmax()
913 return colName.replace(
'merge_measurement_',
'')
915 return df[self.
columns].apply(getFilterAliasName, axis=1)
920 AB_FLUX_SCALE = (0 * u.ABmag).to_value(u.nJy)
921 LOG_AB_FLUX_SCALE = 12.56
922 FIVE_OVER_2LOG10 = 1.085736204758129569
926 def __init__(self, colFlux, colFluxErr=None, calib=None, **kwargs):
932 if calib
is not None:
950 if np.abs(a) < np.abs(b):
955 return np.abs(a) * np.sqrt(1. + q*q)
961 with np.warnings.catch_warnings():
962 np.warnings.filterwarnings(
'ignore',
r'invalid value encountered')
963 np.warnings.filterwarnings(
'ignore',
r'divide by zero')
964 return -2.5 * np.log10(dn/fluxMag0)
967 retVal = self.
vhypot(dn * fluxMag0Err, dnErr * fluxMag0)
972 retVal = self.
dn2fluxErr(dn, dnErr, fluxMag0, fluxMag0Err) / self.
dn2flux(dn, fluxMag0)
988 return pd.Series(retArr, index=df.index)
1001 def _func(self, df):
1003 return pd.Series(retArr, index=df.index)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __init__(self, expr, kwargs)
def __init__(self, col, calculate=False, kwargs)
def __call__(self, parq, dropna=False)
std::vector< SchemaItem< Flag > > * items
def __init__(self, col, kwargs)
def dn2MagErr(self, dn, dnErr, fluxMag0, fluxMag0Err)
def __call__(self, catalog, kwargs)
def _func(self, df, dropna=True)
def __call__(self, parq, kwargs)
daf::base::PropertySet * set
def __call__(self, catalog, kwargs)
def __init__(self, colXX, colXY, colYY, kwargs)
def __init__(self, col1, col2, kwargs)
def multilevelColumns(self, parq)
def __init__(self, col, filt2, filt1, kwargs)
def __call__(self, parq, dropna=False, kwargs)
def mag_aware_eval(df, expr)
def renameCol(cls, col, renameRules)
def __init__(self, filt=None, dataset=None, noDup=None)
def __init__(self, colXX, colXY, colYY, kwargs)
def _get_cols(self, parq)
def from_yaml(cls, translationDefinition, kwargs)
def from_file(cls, filename, kwargs)
def __init__(self, colFlux, colFluxErr=None, calib=None, kwargs)
def __init__(self, kwargs)
def __init__(self, kwargs)
def dn2mag(self, dn, fluxMag0)
def __init__(self, col, calib=None, kwargs)
def __init__(self, args, kwargs)
def dn2fluxErr(self, dn, dnErr, fluxMag0, fluxMag0Err)
def multilevelColumns(self, parq)
def dn2flux(self, dn, fluxMag0)
def multilevelColumns(self, parq)
def __init__(self, funcs, kwargs)
def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors', typeKey='functor')
def __init__(self, colXX, colXY, colYY, kwargs)
daf::base::PropertyList * list