LSST Applications 26.0.0,g0265f82a02+6660c170cc,g07994bdeae+30b05a742e,g0a0026dc87+17526d298f,g0a60f58ba1+17526d298f,g0e4bf8285c+96dd2c2ea9,g0ecae5effc+c266a536c8,g1e7d6db67d+6f7cb1f4bb,g26482f50c6+6346c0633c,g2bbee38e9b+6660c170cc,g2cc88a2952+0a4e78cd49,g3273194fdb+f6908454ef,g337abbeb29+6660c170cc,g337c41fc51+9a8f8f0815,g37c6e7c3d5+7bbafe9d37,g44018dc512+6660c170cc,g4a941329ef+4f7594a38e,g4c90b7bd52+5145c320d2,g58be5f913a+bea990ba40,g635b316a6c+8d6b3a3e56,g67924a670a+bfead8c487,g6ae5381d9b+81bc2a20b4,g93c4d6e787+26b17396bd,g98cecbdb62+ed2cb6d659,g98ffbb4407+81bc2a20b4,g9ddcbc5298+7f7571301f,ga1e77700b3+99e9273977,gae46bcf261+6660c170cc,gb2715bf1a1+17526d298f,gc86a011abf+17526d298f,gcf0d15dbbd+96dd2c2ea9,gdaeeff99f8+0d8dbea60f,gdb4ec4c597+6660c170cc,ge23793e450+96dd2c2ea9,gf041782ebf+171108ac67
LSST Data Management Base Package
Loading...
Searching...
No Matches
parquetTable.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ParquetTable", "MultilevelParquetTable"]
23
24"""
25Implementation of thin wrappers to pyarrow.ParquetFile.
26"""
27
28import re
29import json
30from itertools import product
31import pyarrow
32import pyarrow.parquet
33import numpy as np
34import pandas as pd
35
36from deprecated.sphinx import deprecated
37
38
39@deprecated(reason="The ParquetTable interface is from Gen2 i/o and will be removed after v26.",
40 version="v25", category=FutureWarning)
42 """Thin wrapper to pyarrow's ParquetFile object
43
44 Call `toDataFrame` method to get a `pandas.DataFrame` object,
45 optionally passing specific columns.
46
47 The main purpose of having this wrapper rather than directly
48 using `pyarrow.ParquetFile` is to make it nicer to load
49 selected subsets of columns, especially from dataframes with multi-level
50 column indices.
51
52 Instantiated with either a path to a parquet file or a dataFrame
53
54 Parameters
55 ----------
56 filename : str, optional
57 Path to Parquet file.
58 dataFrame : dataFrame, optional
59 """
60
61 def __init__(self, filename=None, dataFrame=None):
62 self.filename = filename
63 if filename is not None:
64 self._pf = pyarrow.parquet.ParquetFile(filename)
65 self._df = None
66 self._pandasMd = None
67 elif dataFrame is not None:
68 self._df = dataFrame
69 self._pf = None
70 else:
71 raise ValueError("Either filename or dataFrame must be passed.")
72
73 self._columns = None
74 self._columnIndex = None
75
76 def write(self, filename):
77 """Write pandas dataframe to parquet
78
79 Parameters
80 ----------
81 filename : str
82 Path to which to write.
83 """
84 if self._df is None:
85 raise ValueError("df property must be defined to write.")
86 table = pyarrow.Table.from_pandas(self._df)
87 pyarrow.parquet.write_table(table, filename)
88
89 @property
90 def pandasMd(self):
91 if self._pf is None:
92 raise AttributeError("This property is only accessible if ._pf is set.")
93 if self._pandasMd is None:
94 self._pandasMd = json.loads(self._pf.metadata.metadata[b"pandas"])
95 return self._pandasMd
96
97 @property
98 def columnIndex(self):
99 """Columns as a pandas Index
100 """
101 if self._columnIndex is None:
102 self._columnIndex = self._getColumnIndex()
103 return self._columnIndex
104
106 if self._df is not None:
107 return self._df.columns
108 else:
109 return pd.Index(self.columnscolumns)
110
111 @property
112 def columns(self):
113 """List of column names (or column index if df is set)
114
115 This may either be a list of column names, or a
116 pandas.Index object describing the column index, depending
117 on whether the ParquetTable object is wrapping a ParquetFile
118 or a DataFrame.
119 """
120 if self._columns is None:
121 self._columns = self._getColumns()
122 return self._columns
123
124 def _getColumns(self):
125 if self._df is not None:
126 return self._sanitizeColumns(self._df.columns)
127 else:
128 return self._pf.metadata.schema.names
129
130 def _sanitizeColumns(self, columns):
131 return [c for c in columns if c in self.columnIndex]
132
133 def toDataFrame(self, columns=None):
134 """Get table (or specified columns) as a pandas DataFrame
135
136 Parameters
137 ----------
138 columns : list, optional
139 Desired columns. If `None`, then all columns will be
140 returned.
141 """
142 if self._pf is None:
143 if columns is None:
144 return self._df
145 else:
146 return self._df[columns]
147
148 if columns is None:
149 return self._pf.read().to_pandas()
150
151 df = self._pf.read(columns=columns, use_pandas_metadata=True).to_pandas()
152 return df
153
154
155@deprecated(reason="The MultilevelParquetTable interface is from Gen2 i/o and will be removed after v26.",
156 version="v25", category=FutureWarning)
158 """Wrapper to access dataframe with multi-level column index from Parquet
159
160 This subclass of `ParquetTable` to handle the multi-level is necessary
161 because there is not a convenient way to request specific table subsets
162 by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
163
164 Additionally, pyarrow stores multilevel index information in a very strange
165 way. Pandas stores it as a tuple, so that one can access a single column
166 from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
167 some reason pyarrow saves these indices as "stringified" tuples, such that
168 in order to read thissame column from a table written to Parquet, you would
169 have to do the following:
170
171 pf = pyarrow.ParquetFile(filename)
172 df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
173
174 See also https://github.com/apache/arrow/issues/1771, where we've raised
175 this issue.
176
177 As multilevel-indexed dataframes can be very useful to store data like
178 multiple filters' worth of data in the same table, this case deserves a
179 wrapper to enable easier access;
180 that's what this object is for. For example,
181
182 parq = MultilevelParquetTable(filename)
183 columnDict = {'dataset':'meas',
184 'filter':'HSC-G',
185 'column':['coord_ra', 'coord_dec']}
186 df = parq.toDataFrame(columns=columnDict)
187
188 will return just the coordinate columns; the equivalent of calling
189 `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
190 but without having to load the whole frame into memory---this reads just
191 those columns from disk. You can also request a sub-table; e.g.,
192
193 parq = MultilevelParquetTable(filename)
194 columnDict = {'dataset':'meas',
195 'filter':'HSC-G'}
196 df = parq.toDataFrame(columns=columnDict)
197
198 and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
199
200 Parameters
201 ----------
202 filename : str, optional
203 Path to Parquet file.
204 dataFrame : dataFrame, optional
205 """
206
207 def __init__(self, *args, **kwargs):
208 super(MultilevelParquetTable, self).__init__(*args, **kwargs)
209
211
212 @property
214 if self._columnLevelNames is None:
215 self._columnLevelNames = {
216 level: list(np.unique(np.array(self.columnscolumnscolumns)[:, i]))
217 for i, level in enumerate(self.columnLevelscolumnLevels)
218 }
219 return self._columnLevelNames
220
221 @property
222 def columnLevels(self):
223 """Names of levels in column index
224 """
225 return self.columnIndex.names
226
228 if self._df is not None:
229 return super()._getColumnIndex()
230 else:
231 levelNames = [f["name"] for f in self.pandasMd["column_indexes"]]
232 return pd.MultiIndex.from_tuples(self.columnscolumnscolumns, names=levelNames)
233
234 def _getColumns(self):
235 if self._df is not None:
236 return super()._getColumns()
237 else:
238 columns = self._pf.metadata.schema.names
239 n = len(self.pandasMd["column_indexes"])
240 pattern = re.compile(", ".join(["'(.*)'"] * n))
241 matches = [re.search(pattern, c) for c in columns]
242 return [m.groups() for m in matches if m is not None]
243
244 def toDataFrame(self, columns=None, droplevels=True):
245 """Get table (or specified columns) as a pandas DataFrame
246
247 To get specific columns in specified sub-levels:
248
249 parq = MultilevelParquetTable(filename)
250 columnDict = {'dataset':'meas',
251 'filter':'HSC-G',
252 'column':['coord_ra', 'coord_dec']}
253 df = parq.toDataFrame(columns=columnDict)
254
255 Or, to get an entire subtable, leave out one level name:
256
257 parq = MultilevelParquetTable(filename)
258 columnDict = {'dataset':'meas',
259 'filter':'HSC-G'}
260 df = parq.toDataFrame(columns=columnDict)
261
262 Parameters
263 ----------
264 columns : list or dict, optional
265 Desired columns. If `None`, then all columns will be
266 returned. If a list, then the names of the columns must
267 be *exactly* as stored by pyarrow; that is, stringified tuples.
268 If a dictionary, then the entries of the dictionary must
269 correspond to the level names of the column multi-index
270 (that is, the `columnLevels` attribute). Not every level
271 must be passed; if any level is left out, then all entries
272 in that level will be implicitly included.
273 droplevels : bool
274 If True drop levels of column index that have just one entry
275
276 """
277 if columns is None:
278 if self._pf is None:
279 return self._df
280 else:
281 return self._pf.read().to_pandas()
282
283 if isinstance(columns, dict):
284 columns = self._colsFromDict(columns)
285
286 if self._pf is None:
287 try:
288 df = self._df[columns]
289 except (AttributeError, KeyError):
290 newColumns = [c for c in columns if c in self.columnIndex]
291 if not newColumns:
292 raise ValueError("None of the requested columns ({}) are available!".format(columns))
293 df = self._df[newColumns]
294 else:
295 pfColumns = self._stringify(columns)
296 try:
297 df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
298 except (AttributeError, KeyError):
299 newColumns = [c for c in columns if c in self.columnIndex]
300 if not newColumns:
301 raise ValueError("None of the requested columns ({}) are available!".format(columns))
302 pfColumns = self._stringify(newColumns)
303 df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
304
305 if droplevels:
306 # Drop levels of column index that have just one entry
307 levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1]
308
309 # Prevent error when trying to drop *all* columns
310 if len(levelsToDrop) == len(df.columns.names):
311 levelsToDrop.remove(df.columns.names[-1])
312
313 df.columns = df.columns.droplevel(levelsToDrop)
314
315 return df
316
317 def _colsFromDict(self, colDict):
318 new_colDict = {}
319 for i, lev in enumerate(self.columnLevelscolumnLevels):
320 if lev in colDict:
321 if isinstance(colDict[lev], str):
322 new_colDict[lev] = [colDict[lev]]
323 else:
324 new_colDict[lev] = colDict[lev]
325 else:
326 new_colDict[lev] = self.columnIndex.levels[i]
327
328 levelCols = [new_colDict[lev] for lev in self.columnLevelscolumnLevels]
329 cols = product(*levelCols)
330 return list(cols)
331
332 def _stringify(self, cols):
333 return [str(c) for c in cols]
table::Key< std::string > object
Definition VisitInfo.cc:232
toDataFrame(self, columns=None, droplevels=True)
__init__(self, filename=None, dataFrame=None)
daf::base::PropertyList * list
Definition fits.cc:928
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override