22Implementation of thin wrappers to pyarrow.ParquetFile.
27from itertools
import product
35 """Thin wrapper to pyarrow's ParquetFile object
37 Call `toDataFrame` method to get a `pandas.DataFrame` object,
38 optionally passing specific columns.
40 The main purpose of having this wrapper rather than directly
41 using `pyarrow.ParquetFile` is to make it nicer to load
42 selected subsets of columns, especially
from dataframes
with multi-level
45 Instantiated
with either a path to a parquet file
or a dataFrame
49 filename : str, optional
51 dataFrame : dataFrame, optional
54 def __init__(self, filename=None, dataFrame=None):
56 if filename
is not None:
57 self.
_pf_pf = pyarrow.parquet.ParquetFile(filename)
60 elif dataFrame
is not None:
61 self.
_df_df = dataFrame
64 raise ValueError(
"Either filename or dataFrame must be passed.")
70 """Write pandas dataframe to parquet
75 Path to which to write.
77 if self.
_df_df
is None:
78 raise ValueError(
"df property must be defined to write.")
79 table = pyarrow.Table.from_pandas(self.
_df_df)
80 pyarrow.parquet.write_table(table, filename)
84 if self.
_pf_pf
is None:
85 raise AttributeError(
"This property is only accessible if ._pf is set.")
87 self.
_pandasMd_pandasMd = json.loads(self.
_pf_pf.metadata.metadata[b
"pandas"])
92 """Columns as a pandas Index
98 def _getColumnIndex(self):
99 if self.
_df_df
is not None:
100 return self.
_df_df.columns
102 return pd.Index(self.
columnscolumns)
106 """List of column names (or column index if df is set)
108 This may either be a list of column names, or a
109 pandas.Index object describing the column index, depending
110 on whether the ParquetTable object
is wrapping a ParquetFile
117 def _getColumns(self):
118 if self.
_df_df
is not None:
121 return self.
_pf_pf.metadata.schema.names
123 def _sanitizeColumns(self, columns):
124 return [c
for c
in columns
if c
in self.
columnIndexcolumnIndex]
127 """Get table (or specified columns) as a pandas DataFrame
131 columns : list, optional
132 Desired columns. If `None`, then all columns will be
135 if self.
_pf_pf
is None:
139 return self.
_df_df[columns]
142 return self.
_pf_pf.
read().to_pandas()
144 df = self.
_pf_pf.
read(columns=columns, use_pandas_metadata=
True).to_pandas()
149 """Wrapper to access dataframe with multi-level column index from Parquet
151 This subclass of `ParquetTable` to handle the multi-level is necessary
152 because there
is not a convenient way to request specific table subsets
153 by level via Parquet through pyarrow,
as there
is with a `pandas.DataFrame`.
155 Additionally, pyarrow stores multilevel index information
in a very strange
156 way. Pandas stores it
as a tuple, so that one can access a single column
157 from a pandas dataframe
as `df[(
'ref',
'HSC-G',
'coord_ra')]`. However,
for
158 some reason pyarrow saves these indices
as "stringified" tuples, such that
159 in order to read thissame column
from a table written to Parquet, you would
160 have to do the following:
162 pf = pyarrow.ParquetFile(filename)
163 df = pf.read(columns=[
"('ref', 'HSC-G', 'coord_ra')"])
165 See also https://github.com/apache/arrow/issues/1771, where we
've raised
168 As multilevel-indexed dataframes can be very useful to store data like
169 multiple filters' worth of data in the same table, this case deserves a
170 wrapper to enable easier access;
171 that's what this object is for. For example,
174 columnDict = {'dataset':
'meas',
176 'column':[
'coord_ra',
'coord_dec']}
177 df = parq.toDataFrame(columns=columnDict)
179 will
return just the coordinate columns; the equivalent of calling
180 `df[
'meas'][
'HSC-G'][[
'coord_ra',
'coord_dec']]` on the total dataframe,
181 but without having to load the whole frame into memory---this reads just
182 those columns
from disk. You can also request a sub-table; e.g.,
185 columnDict = {
'dataset':
'meas',
187 df = parq.toDataFrame(columns=columnDict)
189 and this will be the equivalent of `df[
'meas'][
'HSC-G']` on the total dataframe.
193 filename : str, optional
194 Path to Parquet file.
195 dataFrame : dataFrame, optional
199 super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
207 level:
list(np.unique(np.array(self.
columnscolumns)[:, i]))
208 for i, level
in enumerate(self.
columnLevelscolumnLevels)
214 """Names of levels in column index
218 def _getColumnIndex(self):
219 if self.
_df_df
is not None:
220 return super()._getColumnIndex()
222 levelNames = [f[
"name"]
for f
in self.
pandasMdpandasMd[
"column_indexes"]]
223 return pd.MultiIndex.from_tuples(self.
columnscolumns, names=levelNames)
225 def _getColumns(self):
226 if self.
_df_df
is not None:
227 return super()._getColumns()
229 columns = self.
_pf_pf.metadata.schema.names
230 n = len(self.
pandasMdpandasMd[
"column_indexes"])
231 pattern = re.compile(
", ".join([
"'(.*)'"] * n))
232 matches = [re.search(pattern, c)
for c
in columns]
233 return [m.groups()
for m
in matches
if m
is not None]
236 """Get table (or specified columns) as a pandas DataFrame
238 To get specific columns in specified sub-levels:
241 columnDict = {
'dataset':
'meas',
243 'column':[
'coord_ra',
'coord_dec']}
244 df = parq.toDataFrame(columns=columnDict)
246 Or, to get an entire subtable, leave out one level name:
249 columnDict = {
'dataset':
'meas',
251 df = parq.toDataFrame(columns=columnDict)
255 columns : list
or dict, optional
256 Desired columns. If `
None`, then all columns will be
257 returned. If a list, then the names of the columns must
258 be *exactly*
as stored by pyarrow; that
is, stringified tuples.
259 If a dictionary, then the entries of the dictionary must
260 correspond to the level names of the column multi-index
261 (that
is, the `columnLevels` attribute). Not every level
262 must be passed;
if any level
is left out, then all entries
263 in that level will be implicitly included.
265 If
True drop levels of column index that have just one entry
269 if self.
_pf_pf
is None:
272 return self.
_pf_pf.
read().to_pandas()
274 if isinstance(columns, dict):
277 if self.
_pf_pf
is None:
279 df = self.
_df_df[columns]
280 except (AttributeError, KeyError):
281 newColumns = [c
for c
in columns
if c
in self.
columnIndexcolumnIndex]
283 raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
284 df = self.
_df_df[newColumns]
286 pfColumns = self.
_stringify_stringify(columns)
288 df = self.
_pf_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
289 except (AttributeError, KeyError):
290 newColumns = [c
for c
in columns
if c
in self.
columnIndexcolumnIndex]
292 raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
293 pfColumns = self.
_stringify_stringify(newColumns)
294 df = self.
_pf_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
298 levelsToDrop = [n
for lev, n
in zip(df.columns.levels, df.columns.names)
if len(lev) == 1]
301 if len(levelsToDrop) == len(df.columns.names):
302 levelsToDrop.remove(df.columns.names[-1])
304 df.columns = df.columns.droplevel(levelsToDrop)
308 def _colsFromDict(self, colDict):
312 if isinstance(colDict[lev], str):
313 new_colDict[lev] = [colDict[lev]]
315 new_colDict[lev] = colDict[lev]
317 new_colDict[lev] = self.
columnIndexcolumnIndex.levels[i]
319 levelCols = [new_colDict[lev]
for lev
in self.
columnLevelscolumnLevels]
320 cols = product(*levelCols)
323 def _stringify(self, cols):
324 return [
str(c)
for c
in cols]
def __init__(self, *args, **kwargs)
def toDataFrame(self, columns=None, droplevels=True)
def _stringify(self, cols)
def _colsFromDict(self, colDict)
def columnLevelNames(self)
def __init__(self, filename=None, dataFrame=None)
def _getColumnIndex(self)
def _sanitizeColumns(self, columns)
def write(self, filename)
def toDataFrame(self, columns=None)
daf::base::PropertyList * list
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override