LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
parquetTable.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ParquetTable", "MultilevelParquetTable"]
23
24"""
25Implementation of thin wrappers to pyarrow.ParquetFile.
26"""
27
28import re
29import json
30from itertools import product
31import pyarrow
32import pyarrow.parquet
33import numpy as np
34import pandas as pd
35
36
38 """Thin wrapper to pyarrow's ParquetFile object
39
40 Call `toDataFrame` method to get a `pandas.DataFrame` object,
41 optionally passing specific columns.
42
43 The main purpose of having this wrapper rather than directly
44 using `pyarrow.ParquetFile` is to make it nicer to load
45 selected subsets of columns, especially from dataframes with multi-level
46 column indices.
47
48 Instantiated with either a path to a parquet file or a dataFrame
49
50 Parameters
51 ----------
52 filename : str, optional
53 Path to Parquet file.
54 dataFrame : dataFrame, optional
55 """
56
57 def __init__(self, filename=None, dataFrame=None):
58 self.filename = filename
59 if filename is not None:
60 self._pf = pyarrow.parquet.ParquetFile(filename)
61 self._df = None
62 self._pandasMd = None
63 elif dataFrame is not None:
64 self._df = dataFrame
65 self._pf = None
66 else:
67 raise ValueError("Either filename or dataFrame must be passed.")
68
69 self._columns = None
70 self._columnIndex = None
71
72 def write(self, filename):
73 """Write pandas dataframe to parquet
74
75 Parameters
76 ----------
77 filename : str
78 Path to which to write.
79 """
80 if self._df is None:
81 raise ValueError("df property must be defined to write.")
82 table = pyarrow.Table.from_pandas(self._df)
83 pyarrow.parquet.write_table(table, filename)
84
85 @property
86 def pandasMd(self):
87 if self._pf is None:
88 raise AttributeError("This property is only accessible if ._pf is set.")
89 if self._pandasMd is None:
90 self._pandasMd = json.loads(self._pf.metadata.metadata[b"pandas"])
91 return self._pandasMd
92
93 @property
94 def columnIndex(self):
95 """Columns as a pandas Index
96 """
97 if self._columnIndex is None:
98 self._columnIndex = self._getColumnIndex()
99 return self._columnIndex
100
101 def _getColumnIndex(self):
102 if self._df is not None:
103 return self._df.columns
104 else:
105 return pd.Index(self.columns)
106
107 @property
108 def columns(self):
109 """List of column names (or column index if df is set)
110
111 This may either be a list of column names, or a
112 pandas.Index object describing the column index, depending
113 on whether the ParquetTable object is wrapping a ParquetFile
114 or a DataFrame.
115 """
116 if self._columns is None:
117 self._columns = self._getColumns()
118 return self._columns
119
120 def _getColumns(self):
121 if self._df is not None:
122 return self._sanitizeColumns(self._df.columns)
123 else:
124 return self._pf.metadata.schema.names
125
126 def _sanitizeColumns(self, columns):
127 return [c for c in columns if c in self.columnIndex]
128
129 def toDataFrame(self, columns=None):
130 """Get table (or specified columns) as a pandas DataFrame
131
132 Parameters
133 ----------
134 columns : list, optional
135 Desired columns. If `None`, then all columns will be
136 returned.
137 """
138 if self._pf is None:
139 if columns is None:
140 return self._df
141 else:
142 return self._df[columns]
143
144 if columns is None:
145 return self._pf.read().to_pandas()
146
147 df = self._pf.read(columns=columns, use_pandas_metadata=True).to_pandas()
148 return df
149
150
152 """Wrapper to access dataframe with multi-level column index from Parquet
153
154 This subclass of `ParquetTable` to handle the multi-level is necessary
155 because there is not a convenient way to request specific table subsets
156 by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
157
158 Additionally, pyarrow stores multilevel index information in a very strange
159 way. Pandas stores it as a tuple, so that one can access a single column
160 from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
161 some reason pyarrow saves these indices as "stringified" tuples, such that
162 in order to read thissame column from a table written to Parquet, you would
163 have to do the following:
164
165 pf = pyarrow.ParquetFile(filename)
166 df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
167
168 See also https://github.com/apache/arrow/issues/1771, where we've raised
169 this issue.
170
171 As multilevel-indexed dataframes can be very useful to store data like
172 multiple filters' worth of data in the same table, this case deserves a
173 wrapper to enable easier access;
174 that's what this object is for. For example,
175
176 parq = MultilevelParquetTable(filename)
177 columnDict = {'dataset':'meas',
178 'filter':'HSC-G',
179 'column':['coord_ra', 'coord_dec']}
180 df = parq.toDataFrame(columns=columnDict)
181
182 will return just the coordinate columns; the equivalent of calling
183 `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
184 but without having to load the whole frame into memory---this reads just
185 those columns from disk. You can also request a sub-table; e.g.,
186
187 parq = MultilevelParquetTable(filename)
188 columnDict = {'dataset':'meas',
189 'filter':'HSC-G'}
190 df = parq.toDataFrame(columns=columnDict)
191
192 and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
193
194 Parameters
195 ----------
196 filename : str, optional
197 Path to Parquet file.
198 dataFrame : dataFrame, optional
199 """
200
201 def __init__(self, *args, **kwargs):
202 super(MultilevelParquetTable, self).__init__(*args, **kwargs)
203
204 self._columnLevelNames = None
205
206 @property
208 if self._columnLevelNames is None:
209 self._columnLevelNames = {
210 level: list(np.unique(np.array(self.columns)[:, i]))
211 for i, level in enumerate(self.columnLevels)
212 }
213 return self._columnLevelNames
214
215 @property
216 def columnLevels(self):
217 """Names of levels in column index
218 """
219 return self.columnIndex.names
220
221 def _getColumnIndex(self):
222 if self._df is not None:
223 return super()._getColumnIndex()
224 else:
225 levelNames = [f["name"] for f in self.pandasMd["column_indexes"]]
226 return pd.MultiIndex.from_tuples(self.columns, names=levelNames)
227
228 def _getColumns(self):
229 if self._df is not None:
230 return super()._getColumns()
231 else:
232 columns = self._pf.metadata.schema.names
233 n = len(self.pandasMd["column_indexes"])
234 pattern = re.compile(", ".join(["'(.*)'"] * n))
235 matches = [re.search(pattern, c) for c in columns]
236 return [m.groups() for m in matches if m is not None]
237
238 def toDataFrame(self, columns=None, droplevels=True):
239 """Get table (or specified columns) as a pandas DataFrame
240
241 To get specific columns in specified sub-levels:
242
243 parq = MultilevelParquetTable(filename)
244 columnDict = {'dataset':'meas',
245 'filter':'HSC-G',
246 'column':['coord_ra', 'coord_dec']}
247 df = parq.toDataFrame(columns=columnDict)
248
249 Or, to get an entire subtable, leave out one level name:
250
251 parq = MultilevelParquetTable(filename)
252 columnDict = {'dataset':'meas',
253 'filter':'HSC-G'}
254 df = parq.toDataFrame(columns=columnDict)
255
256 Parameters
257 ----------
258 columns : list or dict, optional
259 Desired columns. If `None`, then all columns will be
260 returned. If a list, then the names of the columns must
261 be *exactly* as stored by pyarrow; that is, stringified tuples.
262 If a dictionary, then the entries of the dictionary must
263 correspond to the level names of the column multi-index
264 (that is, the `columnLevels` attribute). Not every level
265 must be passed; if any level is left out, then all entries
266 in that level will be implicitly included.
267 droplevels : bool
268 If True drop levels of column index that have just one entry
269
270 """
271 if columns is None:
272 if self._pf is None:
273 return self._df
274 else:
275 return self._pf.read().to_pandas()
276
277 if isinstance(columns, dict):
278 columns = self._colsFromDict(columns)
279
280 if self._pf is None:
281 try:
282 df = self._df[columns]
283 except (AttributeError, KeyError):
284 newColumns = [c for c in columns if c in self.columnIndex]
285 if not newColumns:
286 raise ValueError("None of the requested columns ({}) are available!".format(columns))
287 df = self._df[newColumns]
288 else:
289 pfColumns = self._stringify(columns)
290 try:
291 df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
292 except (AttributeError, KeyError):
293 newColumns = [c for c in columns if c in self.columnIndex]
294 if not newColumns:
295 raise ValueError("None of the requested columns ({}) are available!".format(columns))
296 pfColumns = self._stringify(newColumns)
297 df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
298
299 if droplevels:
300 # Drop levels of column index that have just one entry
301 levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1]
302
303 # Prevent error when trying to drop *all* columns
304 if len(levelsToDrop) == len(df.columns.names):
305 levelsToDrop.remove(df.columns.names[-1])
306
307 df.columns = df.columns.droplevel(levelsToDrop)
308
309 return df
310
311 def _colsFromDict(self, colDict):
312 new_colDict = {}
313 for i, lev in enumerate(self.columnLevels):
314 if lev in colDict:
315 if isinstance(colDict[lev], str):
316 new_colDict[lev] = [colDict[lev]]
317 else:
318 new_colDict[lev] = colDict[lev]
319 else:
320 new_colDict[lev] = self.columnIndex.levels[i]
321
322 levelCols = [new_colDict[lev] for lev in self.columnLevels]
323 cols = product(*levelCols)
324 return list(cols)
325
326 def _stringify(self, cols):
327 return [str(c) for c in cols]
def toDataFrame(self, columns=None, droplevels=True)
def __init__(self, filename=None, dataFrame=None)
Definition: parquetTable.py:57
daf::base::PropertyList * list
Definition: fits.cc:928
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override
Definition: warpExposure.cc:0