_defaultRefFlags = []
_defaultFuncs = ()
def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
self.handles = handles
self.functors = functors
self.filt = filt
self.flags = list(flags) if flags is not None else []
self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
self.refFlags = list(self._defaultRefFlags)
if refFlags is not None:
self.refFlags += list(refFlags)
self._df = None
@property
def defaultFuncs(self):
funcs = dict(self._defaultFuncs)
return funcs
@property
def func(self):
additionalFuncs = self.defaultFuncs
additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
if isinstance(self.functors, CompositeFunctor):
func = self.functors
else:
func = CompositeFunctor(self.functors)
func.funcDict.update(additionalFuncs)
func.filt = self.filt
return func
@property
def noDupCols(self):
return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == "ref"]
@property
def df(self):
if self._df is None:
self.compute()
return self._df
def compute(self, dropna=False, pool=None):
# map over multiple handles
if type(self.handles) in (list, tuple):
if pool is None:
dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
else:
# TODO: Figure out why this doesn't work (pyarrow pickling
# issues?)
dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
self._df = pd.concat(dflist)
else:
self._df = self.func(self.handles, dropna=dropna)
return self._df
class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
dimensions=()):
inputCatalog = connectionTypes.Input(
name="",
storageClass="DataFrame",
)
outputCatalog = connectionTypes.Output(
name="",
storageClass="DataFrame",
)
class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=TransformCatalogBaseConnections):
functorFile = pexConfig.Field(
dtype=str,
doc="Path to YAML file specifying Science Data Model functors to use "
"when copying columns and computing calibrated values.",
default=None,
optional=True
)
primaryKey = pexConfig.Field(
dtype=str,
doc="Name of column to be set as the DataFrame index. If None, the index"
"will be named `id`",
default=None,
optional=True
)
columnsFromDataId = pexConfig.ListField(
dtype=str,
default=None,
optional=True,
doc="Columns to extract from the dataId",
)
class TransformCatalogBaseTask(pipeBase.PipelineTask):
Definition at line 590 of file postprocess.py.