22__all__ = [
"ParquetTable", 
"MultilevelParquetTable"]
 
   25Implementation of thin wrappers to pyarrow.ParquetFile. 
   30from itertools 
import product
 
   38    """Thin wrapper to pyarrow's ParquetFile object 
   40    Call `toDataFrame` method to get a `pandas.DataFrame` object, 
   41    optionally passing specific columns. 
   43    The main purpose of having this wrapper rather than directly 
   44    using `pyarrow.ParquetFile` is to make it nicer to load
 
   45    selected subsets of columns, especially 
from dataframes 
with multi-level
 
   48    Instantiated 
with either a path to a parquet file 
or a dataFrame
 
   52    filename : str, optional
 
   54    dataFrame : dataFrame, optional
 
   57    def __init__(self, filename=None, dataFrame=None):
 
   59        if filename 
is not None:
 
   60            self.
_pf = pyarrow.parquet.ParquetFile(filename)
 
   63        elif dataFrame 
is not None:
 
   67            raise ValueError(
"Either filename or dataFrame must be passed.")
 
   73        """Write pandas dataframe to parquet 
   78            Path to which to write. 
   81            raise ValueError(
"df property must be defined to write.")
 
   82        table = pyarrow.Table.from_pandas(self.
_df)
 
   83        pyarrow.parquet.write_table(table, filename)
 
   88            raise AttributeError(
"This property is only accessible if ._pf is set.")
 
   90            self.
_pandasMd = json.loads(self.
_pf.metadata.metadata[b
"pandas"])
 
   95        """Columns as a pandas Index 
  101    def _getColumnIndex(self):
 
  102        if self.
_df is not None:
 
  103            return self.
_df.columns
 
  109        """List of column names (or column index if df is set) 
  111        This may either be a list of column names, or a
 
  112        pandas.Index object describing the column index, depending
 
  113        on whether the ParquetTable object 
is wrapping a ParquetFile
 
  120    def _getColumns(self):
 
  121        if self.
_df is not None:
 
  124            return self.
_pf.metadata.schema.names
 
  126    def _sanitizeColumns(self, columns):
 
  127        return [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  130        """Get table (or specified columns) as a pandas DataFrame 
  134        columns : list, optional 
  135            Desired columns.  If `None`, then all columns will be
 
  142                return self.
_df[columns]
 
  145            return self.
_pf.
read().to_pandas()
 
  147        df = self.
_pf.
read(columns=columns, use_pandas_metadata=
True).to_pandas()
 
  152    """Wrapper to access dataframe with multi-level column index from Parquet 
  154    This subclass of `ParquetTable` to handle the multi-level is necessary
 
  155    because there 
is not a convenient way to request specific table subsets
 
  156    by level via Parquet through pyarrow, 
as there 
is with a `pandas.DataFrame`.
 
  158    Additionally, pyarrow stores multilevel index information 
in a very strange
 
  159    way. Pandas stores it 
as a tuple, so that one can access a single column
 
  160    from a pandas dataframe 
as `df[(
'ref', 
'HSC-G', 
'coord_ra')]`.  However, 
for 
  161    some reason pyarrow saves these indices 
as "stringified" tuples, such that
 
  162    in order to read thissame column 
from a table written to Parquet, you would
 
  163    have to do the following:
 
  165        pf = pyarrow.ParquetFile(filename)
 
  166        df = pf.read(columns=[
"('ref', 'HSC-G', 'coord_ra')"])
 
  168    See also https://github.com/apache/arrow/issues/1771, where we
've raised 
  171    As multilevel-indexed dataframes can be very useful to store data like 
  172    multiple filters' worth of data in the same table, this case deserves a 
  173    wrapper to enable easier access; 
  174    that's what this object is for.  For example, 
  177        columnDict = {'dataset':
'meas',
 
  179                      'column':[
'coord_ra', 
'coord_dec']}
 
  180        df = parq.toDataFrame(columns=columnDict)
 
  182    will 
return just the coordinate columns; the equivalent of calling
 
  183    `df[
'meas'][
'HSC-G'][[
'coord_ra', 
'coord_dec']]` on the total dataframe,
 
  184    but without having to load the whole frame into memory---this reads just
 
  185    those columns 
from disk.  You can also request a sub-table; e.g.,
 
  188        columnDict = {
'dataset':
'meas',
 
  190        df = parq.toDataFrame(columns=columnDict)
 
  192    and this will be the equivalent of `df[
'meas'][
'HSC-G']` on the total dataframe.
 
  196    filename : str, optional
 
  197        Path to Parquet file.
 
  198    dataFrame : dataFrame, optional
 
  202        super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
 
  210                level: 
list(np.unique(np.array(self.
columns)[:, i]))
 
  217        """Names of levels in column index 
  221    def _getColumnIndex(self):
 
  222        if self.
_df is not None:
 
  223            return super()._getColumnIndex()
 
  225            levelNames = [f[
"name"] 
for f 
in self.
pandasMd[
"column_indexes"]]
 
  226            return pd.MultiIndex.from_tuples(self.
columns, names=levelNames)
 
  228    def _getColumns(self):
 
  229        if self.
_df is not None:
 
  230            return super()._getColumns()
 
  232            columns = self.
_pf.metadata.schema.names
 
  233            n = len(self.
pandasMd[
"column_indexes"])
 
  234            pattern = re.compile(
", ".join([
"'(.*)'"] * n))
 
  235            matches = [re.search(pattern, c) 
for c 
in columns]
 
  236            return [m.groups() 
for m 
in matches 
if m 
is not None]
 
  239        """Get table (or specified columns) as a pandas DataFrame 
  241        To get specific columns in specified sub-levels:
 
  244            columnDict = {
'dataset':
'meas',
 
  246                      'column':[
'coord_ra', 
'coord_dec']}
 
  247            df = parq.toDataFrame(columns=columnDict)
 
  249        Or, to get an entire subtable, leave out one level name:
 
  252            columnDict = {
'dataset':
'meas',
 
  254            df = parq.toDataFrame(columns=columnDict)
 
  258        columns : list 
or dict, optional
 
  259            Desired columns.  If `
None`, then all columns will be
 
  260            returned.  If a list, then the names of the columns must
 
  261            be *exactly* 
as stored by pyarrow; that 
is, stringified tuples.
 
  262            If a dictionary, then the entries of the dictionary must
 
  263            correspond to the level names of the column multi-index
 
  264            (that 
is, the `columnLevels` attribute).  Not every level
 
  265            must be passed; 
if any level 
is left out, then all entries
 
  266            in that level will be implicitly included.
 
  268            If 
True drop levels of column index that have just one entry
 
  275                return self.
_pf.
read().to_pandas()
 
  277        if isinstance(columns, dict):
 
  282                df = self.
_df[columns]
 
  283            except (AttributeError, KeyError):
 
  284                newColumns = [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  286                    raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
 
  287                df = self.
_df[newColumns]
 
  291                df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
 
  292            except (AttributeError, KeyError):
 
  293                newColumns = [c 
for c 
in columns 
if c 
in self.
columnIndex]
 
  295                    raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
 
  297                df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
 
  301            levelsToDrop = [n 
for lev, n 
in zip(df.columns.levels, df.columns.names) 
if len(lev) == 1]
 
  304            if len(levelsToDrop) == len(df.columns.names):
 
  305                levelsToDrop.remove(df.columns.names[-1])
 
  307            df.columns = df.columns.droplevel(levelsToDrop)
 
  311    def _colsFromDict(self, colDict):
 
  315                if isinstance(colDict[lev], str):
 
  316                    new_colDict[lev] = [colDict[lev]]
 
  318                    new_colDict[lev] = colDict[lev]
 
  322        levelCols = [new_colDict[lev] 
for lev 
in self.
columnLevels]
 
  323        cols = product(*levelCols)
 
  326    def _stringify(self, cols):
 
  327        return [
str(c) 
for c 
in cols]
 
def __init__(self, *args, **kwargs)
def toDataFrame(self, columns=None, droplevels=True)
def _stringify(self, cols)
def _colsFromDict(self, colDict)
def columnLevelNames(self)
def __init__(self, filename=None, dataFrame=None)
def _getColumnIndex(self)
def _sanitizeColumns(self, columns)
def write(self, filename)
def toDataFrame(self, columns=None)
daf::base::PropertyList * list
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override