22 Implementation of thin wrappers to pyarrow.ParquetFile. 
   27 from itertools 
import product
 
   29 import pyarrow.parquet
 
   35     """Thin wrapper to pyarrow's ParquetFile object 
   37     Call `toDataFrame` method to get a `pandas.DataFrame` object, 
   38     optionally passing specific columns. 
   40     The main purpose of having this wrapper rather than directly 
   41     using `pyarrow.ParquetFile` is to make it nicer to load 
   42     selected subsets of columns, especially from dataframes with multi-level 
   45     Instantiated with either a path to a parquet file or a dataFrame 
   49     filename : str, optional 
   51     dataFrame : dataFrame, optional 
   54     def __init__(self, filename=None, dataFrame=None):
 
   56         if filename 
is not None:
 
   57             self.
_pf = pyarrow.parquet.ParquetFile(filename)
 
   60         elif dataFrame 
is not None:
 
   64             raise ValueError(
"Either filename or dataFrame must be passed.")
 
   70         """Write pandas dataframe to parquet 
   75             Path to which to write. 
   78             raise ValueError(
"df property must be defined to write.")
 
   79         table = pyarrow.Table.from_pandas(self.
_df)
 
   80         pyarrow.parquet.write_table(table, filename, compression=
"none")
 
   85             raise AttributeError(
"This property is only accessible if ._pf is set.")
 
   87             self.
_pandasMd = json.loads(self.
_pf.metadata.metadata[b
"pandas"])
 
   92         """Columns as a pandas Index 
   98     def _getColumnIndex(self):
 
   99         if self.
_df is not None:
 
  100             return self.
_df.columns
 
  106         """List of column names (or column index if df is set) 
  108         This may either be a list of column names, or a 
  109         pandas.Index object describing the column index, depending 
  110         on whether the ParquetTable object is wrapping a ParquetFile 
  117     def _getColumns(self):
 
  118         if self.
_df is not None:
 
  121             return self.
_pf.metadata.schema.names
 
  123     def _sanitizeColumns(self, columns):
 
  124         return [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  127         """Get table (or specified columns) as a pandas DataFrame 
  131         columns : list, optional 
  132             Desired columns.  If `None`, then all columns will be 
  139                 return self.
_df[columns]
 
  142             return self.
_pf.read().to_pandas()
 
  144         df = self.
_pf.read(columns=columns, use_pandas_metadata=
True).to_pandas()
 
  149     """Wrapper to access dataframe with multi-level column index from Parquet 
  151     This subclass of `ParquetTable` to handle the multi-level is necessary 
  152     because there is not a convenient way to request specific table subsets 
  153     by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`. 
  155     Additionally, pyarrow stores multilevel index information in a very strange 
  156     way. Pandas stores it as a tuple, so that one can access a single column 
  157     from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`.  However, for 
  158     some reason pyarrow saves these indices as "stringified" tuples, such that 
  159     in order to read thissame column from a table written to Parquet, you would 
  160     have to do the following: 
  162         pf = pyarrow.ParquetFile(filename) 
  163         df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"]) 
  165     See also https://github.com/apache/arrow/issues/1771, where we've raised 
  168     As multilevel-indexed dataframes can be very useful to store data like 
  169     multiple filters' worth of data in the same table, this case deserves a 
  170     wrapper to enable easier access; 
  171     that's what this object is for.  For example, 
  173         parq = MultilevelParquetTable(filename) 
  174         columnDict = {'dataset':'meas', 
  176                       'column':['coord_ra', 'coord_dec']} 
  177         df = parq.toDataFrame(columns=columnDict) 
  179     will return just the coordinate columns; the equivalent of calling 
  180     `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe, 
  181     but without having to load the whole frame into memory---this reads just 
  182     those columns from disk.  You can also request a sub-table; e.g., 
  184         parq = MultilevelParquetTable(filename) 
  185         columnDict = {'dataset':'meas', 
  187         df = parq.toDataFrame(columns=columnDict) 
  189     and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe. 
  193     filename : str, optional 
  194         Path to Parquet file. 
  195     dataFrame : dataFrame, optional 
  199         super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
 
  207                 level: 
list(np.unique(np.array(self.
columns)[:, i]))
 
  214         """Names of levels in column index 
  218     def _getColumnIndex(self):
 
  219         if self.
_df is not None:
 
  220             return super()._getColumnIndex()
 
  222             levelNames = [f[
"name"] 
for f 
in self.
pandasMd[
"column_indexes"]]
 
  223             return pd.MultiIndex.from_tuples(self.
columns, names=levelNames)
 
  225     def _getColumns(self):
 
  226         if self.
_df is not None:
 
  227             return super()._getColumns()
 
  229             columns = self.
_pf.metadata.schema.names
 
  230             n = len(self.
pandasMd[
"column_indexes"])
 
  231             pattern = re.compile(
", ".join([
"'(.*)'"] * n))
 
  232             matches = [re.search(pattern, c) 
for c 
in columns]
 
  233             return [m.groups() 
for m 
in matches 
if m 
is not None]
 
  236         """Get table (or specified columns) as a pandas DataFrame 
  238         To get specific columns in specified sub-levels: 
  240             parq = MultilevelParquetTable(filename) 
  241             columnDict = {'dataset':'meas', 
  243                       'column':['coord_ra', 'coord_dec']} 
  244             df = parq.toDataFrame(columns=columnDict) 
  246         Or, to get an entire subtable, leave out one level name: 
  248             parq = MultilevelParquetTable(filename) 
  249             columnDict = {'dataset':'meas', 
  251             df = parq.toDataFrame(columns=columnDict) 
  255         columns : list or dict, optional 
  256             Desired columns.  If `None`, then all columns will be 
  257             returned.  If a list, then the names of the columns must 
  258             be *exactly* as stored by pyarrow; that is, stringified tuples. 
  259             If a dictionary, then the entries of the dictionary must 
  260             correspond to the level names of the column multi-index 
  261             (that is, the `columnLevels` attribute).  Not every level 
  262             must be passed; if any level is left out, then all entries 
  263             in that level will be implicitly included. 
  265             If True drop levels of column index that have just one entry 
  272                 return self.
_pf.read().to_pandas()
 
  274         if isinstance(columns, dict):
 
  279                 df = self.
_df[columns]
 
  280             except (AttributeError, KeyError):
 
  281                 newColumns = [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  283                     raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
 
  284                 df = self.
_df[columns]
 
  288                 df = self.
_pf.read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
 
  289             except (AttributeError, KeyError):
 
  290                 newColumns = [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  292                     raise ValueError(
"None of the requested columns ({}) are available!".
format(columns))
 
  294                 df = self.
_pf.read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
 
  298             levelsToDrop = [n 
for l, n 
in zip(df.columns.levels, df.columns.names) 
if len(l) == 1]
 
  301             if len(levelsToDrop) == len(df.columns.names):
 
  302                 levelsToDrop.remove(df.columns.names[-1])
 
  304             df.columns = df.columns.droplevel(levelsToDrop)
 
  308     def _colsFromDict(self, colDict):
 
  312                 if isinstance(colDict[l], str):
 
  313                     new_colDict[l] = [colDict[l]]
 
  315                     new_colDict[l] = colDict[l]
 
  320         cols = product(*levelCols)
 
  323     def _stringify(self, cols):
 
  324         return [str(c) 
for c 
in cols]