22__all__ = [
"ParquetTable",
"MultilevelParquetTable"]
25Implementation of thin wrappers to pyarrow.ParquetFile.
30from itertools
import product
38 """Thin wrapper to pyarrow's ParquetFile object
40 Call `toDataFrame` method to get a `pandas.DataFrame` object,
41 optionally passing specific columns.
43 The main purpose of having this wrapper rather than directly
44 using `pyarrow.ParquetFile` is to make it nicer to load
45 selected subsets of columns, especially
from dataframes
with multi-level
48 Instantiated
with either a path to a parquet file
or a dataFrame
52 filename : str, optional
54 dataFrame : dataFrame, optional
57 def __init__(self, filename=None, dataFrame=None):
59 if filename
is not None:
60 self.
_pf = pyarrow.parquet.ParquetFile(filename)
63 elif dataFrame
is not None:
67 raise ValueError(
"Either filename or dataFrame must be passed.")
73 """Write pandas dataframe to parquet
78 Path to which to write.
81 raise ValueError(
"df property must be defined to write.")
82 table = pyarrow.Table.from_pandas(self.
_df)
83 pyarrow.parquet.write_table(table, filename)
88 raise AttributeError(
"This property is only accessible if ._pf is set.")
90 self.
_pandasMd = json.loads(self.
_pf.metadata.metadata[b
"pandas"])
95 """Columns as a pandas Index
101 def _getColumnIndex(self):
102 if self.
_df is not None:
103 return self.
_df.columns
109 """List of column names (or column index if df is set)
111 This may either be a list of column names, or a
112 pandas.Index object describing the column index, depending
113 on whether the ParquetTable object
is wrapping a ParquetFile
120 def _getColumns(self):
121 if self.
_df is not None:
124 return self.
_pf.metadata.schema.names
126 def _sanitizeColumns(self, columns):
127 return [c
for c
in columns
if c
in self.
columnIndex]
130 """Get table (or specified columns) as a pandas DataFrame
134 columns : list, optional
135 Desired columns. If `None`, then all columns will be
142 return self.
_df[columns]
145 return self.
_pf.
read().to_pandas()
147 df = self.
_pf.
read(columns=columns, use_pandas_metadata=
True).to_pandas()
152 """Wrapper to access dataframe with multi-level column index from Parquet
154 This subclass of `ParquetTable` to handle the multi-level is necessary
155 because there
is not a convenient way to request specific table subsets
156 by level via Parquet through pyarrow,
as there
is with a `pandas.DataFrame`.
158 Additionally, pyarrow stores multilevel index information
in a very strange
159 way. Pandas stores it
as a tuple, so that one can access a single column
160 from a pandas dataframe
as `df[(
'ref',
'HSC-G',
'coord_ra')]`. However,
for
161 some reason pyarrow saves these indices
as "stringified" tuples, such that
162 in order to read thissame column
from a table written to Parquet, you would
163 have to do the following:
165 pf = pyarrow.ParquetFile(filename)
166 df = pf.read(columns=[
"('ref', 'HSC-G', 'coord_ra')"])
168 See also https://github.com/apache/arrow/issues/1771, where we
've raised
171 As multilevel-indexed dataframes can be very useful to store data like
172 multiple filters' worth of data in the same table, this case deserves a
173 wrapper to enable easier access;
174 that's what this object is for. For example,
177 columnDict = {'dataset':
'meas',
179 'column':[
'coord_ra',
'coord_dec']}
180 df = parq.toDataFrame(columns=columnDict)
182 will
return just the coordinate columns; the equivalent of calling
183 `df[
'meas'][
'HSC-G'][[
'coord_ra',
'coord_dec']]` on the total dataframe,
184 but without having to load the whole frame into memory---this reads just
185 those columns
from disk. You can also request a sub-table; e.g.,
188 columnDict = {
'dataset':
'meas',
190 df = parq.toDataFrame(columns=columnDict)
192 and this will be the equivalent of `df[
'meas'][
'HSC-G']` on the total dataframe.
196 filename : str, optional
197 Path to Parquet file.
198 dataFrame : dataFrame, optional
202 super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
210 level:
list(np.unique(np.array(self.
columns)[:, i]))
217 """Names of levels in column index
221 def _getColumnIndex(self):
222 if self.
_df is not None:
223 return super()._getColumnIndex()
225 levelNames = [f[
"name"]
for f
in self.
pandasMd[
"column_indexes"]]
226 return pd.MultiIndex.from_tuples(self.
columns, names=levelNames)
228 def _getColumns(self):
229 if self.
_df is not None:
230 return super()._getColumns()
232 columns = self.
_pf.metadata.schema.names
233 n = len(self.
pandasMd[
"column_indexes"])
234 pattern = re.compile(
", ".join([
"'(.*)'"] * n))
235 matches = [re.search(pattern, c)
for c
in columns]
236 return [m.groups()
for m
in matches
if m
is not None]
239 """Get table (or specified columns) as a pandas DataFrame
241 To get specific columns in specified sub-levels:
244 columnDict = {
'dataset':
'meas',
246 'column':[
'coord_ra',
'coord_dec']}
247 df = parq.toDataFrame(columns=columnDict)
249 Or, to get an entire subtable, leave out one level name:
252 columnDict = {
'dataset':
'meas',
254 df = parq.toDataFrame(columns=columnDict)
258 columns : list
or dict, optional
259 Desired columns. If `
None`, then all columns will be
260 returned. If a list, then the names of the columns must
261 be *exactly*
as stored by pyarrow; that
is, stringified tuples.
262 If a dictionary, then the entries of the dictionary must
263 correspond to the level names of the column multi-index
264 (that
is, the `columnLevels` attribute). Not every level
265 must be passed;
if any level
is left out, then all entries
266 in that level will be implicitly included.
268 If
True drop levels of column index that have just one entry
275 return self.
_pf.
read().to_pandas()
277 if isinstance(columns, dict):
282 df = self.
_df[columns]
283 except (AttributeError, KeyError):
284 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
286 raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
287 df = self.
_df[newColumns]
291 df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
292 except (AttributeError, KeyError):
293 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
295 raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
297 df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
301 levelsToDrop = [n
for lev, n
in zip(df.columns.levels, df.columns.names)
if len(lev) == 1]
304 if len(levelsToDrop) == len(df.columns.names):
305 levelsToDrop.remove(df.columns.names[-1])
307 df.columns = df.columns.droplevel(levelsToDrop)
311 def _colsFromDict(self, colDict):
315 if isinstance(colDict[lev], str):
316 new_colDict[lev] = [colDict[lev]]
318 new_colDict[lev] = colDict[lev]
322 levelCols = [new_colDict[lev]
for lev
in self.
columnLevels]
323 cols = product(*levelCols)
326 def _stringify(self, cols):
327 return [
str(c)
for c
in cols]
def __init__(self, *args, **kwargs)
def toDataFrame(self, columns=None, droplevels=True)
def _stringify(self, cols)
def _colsFromDict(self, colDict)
def columnLevelNames(self)
def __init__(self, filename=None, dataFrame=None)
def _getColumnIndex(self)
def _sanitizeColumns(self, columns)
def write(self, filename)
def toDataFrame(self, columns=None)
daf::base::PropertyList * list
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override