22 Implementation of thin wrappers to pyarrow.ParquetFile.
27 from itertools
import product
29 import pyarrow.parquet
35 """Thin wrapper to pyarrow's ParquetFile object
37 Call `toDataFrame` method to get a `pandas.DataFrame` object,
38 optionally passing specific columns.
40 The main purpose of having this wrapper rather than directly
41 using `pyarrow.ParquetFile` is to make it nicer to load
42 selected subsets of columns, especially from dataframes with multi-level
45 Instantiated with either a path to a parquet file or a dataFrame
49 filename : str, optional
51 dataFrame : dataFrame, optional
54 def __init__(self, filename=None, dataFrame=None):
56 if filename
is not None:
57 self.
_pf = pyarrow.parquet.ParquetFile(filename)
60 elif dataFrame
is not None:
64 raise ValueError(
"Either filename or dataFrame must be passed.")
70 """Write pandas dataframe to parquet
75 Path to which to write.
78 raise ValueError(
"df property must be defined to write.")
79 table = pyarrow.Table.from_pandas(self.
_df)
80 pyarrow.parquet.write_table(table, filename, compression=
"none")
85 raise AttributeError(
"This property is only accessible if ._pf is set.")
87 self.
_pandasMd = json.loads(self.
_pf.metadata.metadata[b
"pandas"])
92 """Columns as a pandas Index
98 def _getColumnIndex(self):
99 if self.
_df is not None:
100 return self.
_df.columns
106 """List of column names (or column index if df is set)
108 This may either be a list of column names, or a
109 pandas.Index object describing the column index, depending
110 on whether the ParquetTable object is wrapping a ParquetFile
117 def _getColumns(self):
118 if self.
_df is not None:
121 return self.
_pf.metadata.schema.names
123 def _sanitizeColumns(self, columns):
124 return [c
for c
in columns
if c
in self.
columnIndex]
127 """Get table (or specified columns) as a pandas DataFrame
131 columns : list, optional
132 Desired columns. If `None`, then all columns will be
139 return self.
_df[columns]
142 return self.
_pf.read().to_pandas()
144 df = self.
_pf.read(columns=columns, use_pandas_metadata=
True).to_pandas()
149 """Wrapper to access dataframe with multi-level column index from Parquet
151 This subclass of `ParquetTable` to handle the multi-level is necessary
152 because there is not a convenient way to request specific table subsets
153 by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
155 Additionally, pyarrow stores multilevel index information in a very strange
156 way. Pandas stores it as a tuple, so that one can access a single column
157 from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
158 some reason pyarrow saves these indices as "stringified" tuples, such that
159 in order to read thissame column from a table written to Parquet, you would
160 have to do the following:
162 pf = pyarrow.ParquetFile(filename)
163 df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
165 See also https://github.com/apache/arrow/issues/1771, where we've raised
168 As multilevel-indexed dataframes can be very useful to store data like
169 multiple filters' worth of data in the same table, this case deserves a
170 wrapper to enable easier access;
171 that's what this object is for. For example,
173 parq = MultilevelParquetTable(filename)
174 columnDict = {'dataset':'meas',
176 'column':['coord_ra', 'coord_dec']}
177 df = parq.toDataFrame(columns=columnDict)
179 will return just the coordinate columns; the equivalent of calling
180 `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
181 but without having to load the whole frame into memory---this reads just
182 those columns from disk. You can also request a sub-table; e.g.,
184 parq = MultilevelParquetTable(filename)
185 columnDict = {'dataset':'meas',
187 df = parq.toDataFrame(columns=columnDict)
189 and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
193 filename : str, optional
194 Path to Parquet file.
195 dataFrame : dataFrame, optional
199 super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
207 level:
list(np.unique(np.array(self.
columns)[:, i]))
214 """Names of levels in column index
218 def _getColumnIndex(self):
219 if self.
_df is not None:
220 return super()._getColumnIndex()
222 levelNames = [f[
"name"]
for f
in self.
pandasMd[
"column_indexes"]]
223 return pd.MultiIndex.from_tuples(self.
columns, names=levelNames)
225 def _getColumns(self):
226 if self.
_df is not None:
227 return super()._getColumns()
229 columns = self.
_pf.metadata.schema.names
230 n = len(self.
pandasMd[
"column_indexes"])
231 pattern = re.compile(
", ".join([
"'(.*)'"] * n))
232 matches = [re.search(pattern, c)
for c
in columns]
233 return [m.groups()
for m
in matches
if m
is not None]
236 """Get table (or specified columns) as a pandas DataFrame
238 To get specific columns in specified sub-levels:
240 parq = MultilevelParquetTable(filename)
241 columnDict = {'dataset':'meas',
243 'column':['coord_ra', 'coord_dec']}
244 df = parq.toDataFrame(columns=columnDict)
246 Or, to get an entire subtable, leave out one level name:
248 parq = MultilevelParquetTable(filename)
249 columnDict = {'dataset':'meas',
251 df = parq.toDataFrame(columns=columnDict)
255 columns : list or dict, optional
256 Desired columns. If `None`, then all columns will be
257 returned. If a list, then the names of the columns must
258 be *exactly* as stored by pyarrow; that is, stringified tuples.
259 If a dictionary, then the entries of the dictionary must
260 correspond to the level names of the column multi-index
261 (that is, the `columnLevels` attribute). Not every level
262 must be passed; if any level is left out, then all entries
263 in that level will be implicitly included.
265 If True drop levels of column index that have just one entry
272 return self.
_pf.read().to_pandas()
274 if isinstance(columns, dict):
279 df = self.
_df[columns]
280 except (AttributeError, KeyError):
281 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
283 raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
284 df = self.
_df[columns]
288 df = self.
_pf.read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
289 except (AttributeError, KeyError):
290 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
292 raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
294 df = self.
_pf.read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
298 levelsToDrop = [n
for l, n
in zip(df.columns.levels, df.columns.names)
if len(l) == 1]
301 if len(levelsToDrop) == len(df.columns.names):
302 levelsToDrop.remove(df.columns.names[-1])
304 df.columns = df.columns.droplevel(levelsToDrop)
308 def _colsFromDict(self, colDict):
312 if isinstance(colDict[l], str):
313 new_colDict[l] = [colDict[l]]
315 new_colDict[l] = colDict[l]
320 cols = product(*levelCols)
323 def _stringify(self, cols):
324 return [str(c)
for c
in cols]