22__all__ = [
"ParquetTable",
"MultilevelParquetTable"]
25Implementation of thin wrappers to pyarrow.ParquetFile.
30from itertools
import product
36from deprecated.sphinx
import deprecated
39@deprecated(reason=
"The ParquetTable interface is from Gen2 i/o and will be removed after v26.",
40 version=
"v25", category=FutureWarning)
42 """Thin wrapper to pyarrow's ParquetFile object
44 Call `toDataFrame` method to get a `pandas.DataFrame` object,
45 optionally passing specific columns.
47 The main purpose of having this wrapper rather than directly
48 using `pyarrow.ParquetFile` is to make it nicer to load
49 selected subsets of columns, especially
from dataframes
with multi-level
52 Instantiated
with either a path to a parquet file
or a dataFrame
56 filename : str, optional
58 dataFrame : dataFrame, optional
61 def __init__(self, filename=None, dataFrame=None):
63 if filename
is not None:
64 self.
_pf = pyarrow.parquet.ParquetFile(filename)
67 elif dataFrame
is not None:
71 raise ValueError(
"Either filename or dataFrame must be passed.")
77 """Write pandas dataframe to parquet
82 Path to which to write.
85 raise ValueError(
"df property must be defined to write.")
86 table = pyarrow.Table.from_pandas(self.
_df)
87 pyarrow.parquet.write_table(table, filename)
92 raise AttributeError(
"This property is only accessible if ._pf is set.")
94 self.
_pandasMd = json.loads(self.
_pf.metadata.metadata[b
"pandas"])
99 """Columns as a pandas Index
106 if self.
_df is not None:
107 return self.
_df.columns
113 """List of column names (or column index if df is set)
115 This may either be a list of column names, or a
116 pandas.Index object describing the column index, depending
117 on whether the ParquetTable object
is wrapping a ParquetFile
125 if self.
_df is not None:
128 return self.
_pf.metadata.schema.names
131 return [c
for c
in columns
if c
in self.
columnIndex]
134 """Get table (or specified columns) as a pandas DataFrame
138 columns : list, optional
139 Desired columns. If `None`, then all columns will be
146 return self.
_df[columns]
149 return self.
_pf.
read().to_pandas()
151 df = self.
_pf.
read(columns=columns, use_pandas_metadata=
True).to_pandas()
155@deprecated(reason=
"The MultilevelParquetTable interface is from Gen2 i/o and will be removed after v26.",
156 version=
"v25", category=FutureWarning)
158 """Wrapper to access dataframe with multi-level column index from Parquet
160 This subclass of `ParquetTable` to handle the multi-level is necessary
161 because there
is not a convenient way to request specific table subsets
162 by level via Parquet through pyarrow,
as there
is with a `pandas.DataFrame`.
164 Additionally, pyarrow stores multilevel index information
in a very strange
165 way. Pandas stores it
as a tuple, so that one can access a single column
166 from a pandas dataframe
as `df[(
'ref',
'HSC-G',
'coord_ra')]`. However,
for
167 some reason pyarrow saves these indices
as "stringified" tuples, such that
168 in order to read thissame column
from a table written to Parquet, you would
169 have to do the following:
171 pf = pyarrow.ParquetFile(filename)
172 df = pf.read(columns=[
"('ref', 'HSC-G', 'coord_ra')"])
174 See also https://github.com/apache/arrow/issues/1771, where we
've raised
177 As multilevel-indexed dataframes can be very useful to store data like
178 multiple filters' worth of data in the same table, this case deserves a
179 wrapper to enable easier access;
180 that's what this object is for. For example,
183 columnDict = {'dataset':
'meas',
185 'column':[
'coord_ra',
'coord_dec']}
186 df = parq.toDataFrame(columns=columnDict)
188 will
return just the coordinate columns; the equivalent of calling
189 `df[
'meas'][
'HSC-G'][[
'coord_ra',
'coord_dec']]` on the total dataframe,
190 but without having to load the whole frame into memory---this reads just
191 those columns
from disk. You can also request a sub-table; e.g.,
194 columnDict = {
'dataset':
'meas',
196 df = parq.toDataFrame(columns=columnDict)
198 and this will be the equivalent of `df[
'meas'][
'HSC-G']` on the total dataframe.
202 filename : str, optional
203 Path to Parquet file.
204 dataFrame : dataFrame, optional
208 super(MultilevelParquetTable, self).
__init__(*args, **kwargs)
223 """Names of levels in column index
228 if self.
_df is not None:
231 levelNames = [f[
"name"]
for f
in self.
pandasMd[
"column_indexes"]]
235 if self.
_df is not None:
238 columns = self.
_pf.metadata.schema.names
239 n = len(self.
pandasMd[
"column_indexes"])
240 pattern = re.compile(
", ".join([
"'(.*)'"] * n))
241 matches = [re.search(pattern, c)
for c
in columns]
242 return [m.groups()
for m
in matches
if m
is not None]
245 """Get table (or specified columns) as a pandas DataFrame
247 To get specific columns in specified sub-levels:
250 columnDict = {
'dataset':
'meas',
252 'column':[
'coord_ra',
'coord_dec']}
253 df = parq.toDataFrame(columns=columnDict)
255 Or, to get an entire subtable, leave out one level name:
258 columnDict = {
'dataset':
'meas',
260 df = parq.toDataFrame(columns=columnDict)
264 columns : list
or dict, optional
265 Desired columns. If `
None`, then all columns will be
266 returned. If a list, then the names of the columns must
267 be *exactly*
as stored by pyarrow; that
is, stringified tuples.
268 If a dictionary, then the entries of the dictionary must
269 correspond to the level names of the column multi-index
270 (that
is, the `columnLevels` attribute). Not every level
271 must be passed;
if any level
is left out, then all entries
272 in that level will be implicitly included.
274 If
True drop levels of column index that have just one entry
281 return self.
_pf.
read().to_pandas()
283 if isinstance(columns, dict):
288 df = self.
_df[columns]
289 except (AttributeError, KeyError):
290 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
292 raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
293 df = self.
_df[newColumns]
297 df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
298 except (AttributeError, KeyError):
299 newColumns = [c
for c
in columns
if c
in self.
columnIndex]
301 raise ValueError(
"None of the requested columns ({}) are available!".format(columns))
303 df = self.
_pf.
read(columns=pfColumns, use_pandas_metadata=
True).to_pandas()
307 levelsToDrop = [n
for lev, n
in zip(df.columns.levels, df.columns.names)
if len(lev) == 1]
310 if len(levelsToDrop) == len(df.columns.names):
311 levelsToDrop.remove(df.columns.names[-1])
313 df.columns = df.columns.droplevel(levelsToDrop)
321 if isinstance(colDict[lev], str):
322 new_colDict[lev] = [colDict[lev]]
324 new_colDict[lev] = colDict[lev]
329 cols = product(*levelCols)
333 return [str(c)
for c
in cols]
table::Key< std::string > object
toDataFrame(self, columns=None, droplevels=True)
__init__(self, *args, **kwargs)
_colsFromDict(self, colDict)
_sanitizeColumns(self, columns)
__init__(self, filename=None, dataFrame=None)
toDataFrame(self, columns=None)
daf::base::PropertyList * list
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override