LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
parquetTable.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21"""
22Implementation of thin wrappers to pyarrow.ParquetFile.
23"""
24
25import re
26import json
27from itertools import product
28import pyarrow
29import pyarrow.parquet
30import numpy as np
31import pandas as pd
32
33
35 """Thin wrapper to pyarrow's ParquetFile object
36
37 Call `toDataFrame` method to get a `pandas.DataFrame` object,
38 optionally passing specific columns.
39
40 The main purpose of having this wrapper rather than directly
41 using `pyarrow.ParquetFile` is to make it nicer to load
42 selected subsets of columns, especially from dataframes with multi-level
43 column indices.
44
45 Instantiated with either a path to a parquet file or a dataFrame
46
47 Parameters
48 ----------
49 filename : str, optional
50 Path to Parquet file.
51 dataFrame : dataFrame, optional
52 """
53
54 def __init__(self, filename=None, dataFrame=None):
55 self.filenamefilename = filename
56 if filename is not None:
57 self._pf_pf = pyarrow.parquet.ParquetFile(filename)
58 self._df_df = None
59 self._pandasMd_pandasMd = None
60 elif dataFrame is not None:
61 self._df_df = dataFrame
62 self._pf_pf = None
63 else:
64 raise ValueError("Either filename or dataFrame must be passed.")
65
66 self._columns_columns = None
67 self._columnIndex_columnIndex = None
68
69 def write(self, filename):
70 """Write pandas dataframe to parquet
71
72 Parameters
73 ----------
74 filename : str
75 Path to which to write.
76 """
77 if self._df_df is None:
78 raise ValueError("df property must be defined to write.")
79 table = pyarrow.Table.from_pandas(self._df_df)
80 pyarrow.parquet.write_table(table, filename)
81
82 @property
83 def pandasMd(self):
84 if self._pf_pf is None:
85 raise AttributeError("This property is only accessible if ._pf is set.")
86 if self._pandasMd_pandasMd is None:
87 self._pandasMd_pandasMd = json.loads(self._pf_pf.metadata.metadata[b"pandas"])
88 return self._pandasMd_pandasMd
89
90 @property
91 def columnIndex(self):
92 """Columns as a pandas Index
93 """
94 if self._columnIndex_columnIndex is None:
95 self._columnIndex_columnIndex = self._getColumnIndex_getColumnIndex()
96 return self._columnIndex_columnIndex
97
98 def _getColumnIndex(self):
99 if self._df_df is not None:
100 return self._df_df.columns
101 else:
102 return pd.Index(self.columnscolumns)
103
104 @property
105 def columns(self):
106 """List of column names (or column index if df is set)
107
108 This may either be a list of column names, or a
109 pandas.Index object describing the column index, depending
110 on whether the ParquetTable object is wrapping a ParquetFile
111 or a DataFrame.
112 """
113 if self._columns_columns is None:
114 self._columns_columns = self._getColumns_getColumns()
115 return self._columns_columns
116
117 def _getColumns(self):
118 if self._df_df is not None:
119 return self._sanitizeColumns_sanitizeColumns(self._df_df.columns)
120 else:
121 return self._pf_pf.metadata.schema.names
122
123 def _sanitizeColumns(self, columns):
124 return [c for c in columns if c in self.columnIndexcolumnIndex]
125
126 def toDataFrame(self, columns=None):
127 """Get table (or specified columns) as a pandas DataFrame
128
129 Parameters
130 ----------
131 columns : list, optional
132 Desired columns. If `None`, then all columns will be
133 returned.
134 """
135 if self._pf_pf is None:
136 if columns is None:
137 return self._df_df
138 else:
139 return self._df_df[columns]
140
141 if columns is None:
142 return self._pf_pf.read().to_pandas()
143
144 df = self._pf_pf.read(columns=columns, use_pandas_metadata=True).to_pandas()
145 return df
146
147
149 """Wrapper to access dataframe with multi-level column index from Parquet
150
151 This subclass of `ParquetTable` to handle the multi-level is necessary
152 because there is not a convenient way to request specific table subsets
153 by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
154
155 Additionally, pyarrow stores multilevel index information in a very strange
156 way. Pandas stores it as a tuple, so that one can access a single column
157 from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
158 some reason pyarrow saves these indices as "stringified" tuples, such that
159 in order to read thissame column from a table written to Parquet, you would
160 have to do the following:
161
162 pf = pyarrow.ParquetFile(filename)
163 df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
164
165 See also https://github.com/apache/arrow/issues/1771, where we've raised
166 this issue.
167
168 As multilevel-indexed dataframes can be very useful to store data like
169 multiple filters' worth of data in the same table, this case deserves a
170 wrapper to enable easier access;
171 that's what this object is for. For example,
172
173 parq = MultilevelParquetTable(filename)
174 columnDict = {'dataset':'meas',
175 'filter':'HSC-G',
176 'column':['coord_ra', 'coord_dec']}
177 df = parq.toDataFrame(columns=columnDict)
178
179 will return just the coordinate columns; the equivalent of calling
180 `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
181 but without having to load the whole frame into memory---this reads just
182 those columns from disk. You can also request a sub-table; e.g.,
183
184 parq = MultilevelParquetTable(filename)
185 columnDict = {'dataset':'meas',
186 'filter':'HSC-G'}
187 df = parq.toDataFrame(columns=columnDict)
188
189 and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
190
191 Parameters
192 ----------
193 filename : str, optional
194 Path to Parquet file.
195 dataFrame : dataFrame, optional
196 """
197
198 def __init__(self, *args, **kwargs):
199 super(MultilevelParquetTable, self).__init__(*args, **kwargs)
200
201 self._columnLevelNames_columnLevelNames = None
202
203 @property
205 if self._columnLevelNames_columnLevelNames is None:
206 self._columnLevelNames_columnLevelNames = {
207 level: list(np.unique(np.array(self.columnscolumns)[:, i]))
208 for i, level in enumerate(self.columnLevelscolumnLevels)
209 }
210 return self._columnLevelNames_columnLevelNames
211
212 @property
213 def columnLevels(self):
214 """Names of levels in column index
215 """
216 return self.columnIndexcolumnIndex.names
217
218 def _getColumnIndex(self):
219 if self._df_df is not None:
220 return super()._getColumnIndex()
221 else:
222 levelNames = [f["name"] for f in self.pandasMdpandasMd["column_indexes"]]
223 return pd.MultiIndex.from_tuples(self.columnscolumns, names=levelNames)
224
225 def _getColumns(self):
226 if self._df_df is not None:
227 return super()._getColumns()
228 else:
229 columns = self._pf_pf.metadata.schema.names
230 n = len(self.pandasMdpandasMd["column_indexes"])
231 pattern = re.compile(", ".join(["'(.*)'"] * n))
232 matches = [re.search(pattern, c) for c in columns]
233 return [m.groups() for m in matches if m is not None]
234
235 def toDataFrame(self, columns=None, droplevels=True):
236 """Get table (or specified columns) as a pandas DataFrame
237
238 To get specific columns in specified sub-levels:
239
240 parq = MultilevelParquetTable(filename)
241 columnDict = {'dataset':'meas',
242 'filter':'HSC-G',
243 'column':['coord_ra', 'coord_dec']}
244 df = parq.toDataFrame(columns=columnDict)
245
246 Or, to get an entire subtable, leave out one level name:
247
248 parq = MultilevelParquetTable(filename)
249 columnDict = {'dataset':'meas',
250 'filter':'HSC-G'}
251 df = parq.toDataFrame(columns=columnDict)
252
253 Parameters
254 ----------
255 columns : list or dict, optional
256 Desired columns. If `None`, then all columns will be
257 returned. If a list, then the names of the columns must
258 be *exactly* as stored by pyarrow; that is, stringified tuples.
259 If a dictionary, then the entries of the dictionary must
260 correspond to the level names of the column multi-index
261 (that is, the `columnLevels` attribute). Not every level
262 must be passed; if any level is left out, then all entries
263 in that level will be implicitly included.
264 droplevels : bool
265 If True drop levels of column index that have just one entry
266
267 """
268 if columns is None:
269 if self._pf_pf is None:
270 return self._df_df
271 else:
272 return self._pf_pf.read().to_pandas()
273
274 if isinstance(columns, dict):
275 columns = self._colsFromDict_colsFromDict(columns)
276
277 if self._pf_pf is None:
278 try:
279 df = self._df_df[columns]
280 except (AttributeError, KeyError):
281 newColumns = [c for c in columns if c in self.columnIndexcolumnIndex]
282 if not newColumns:
283 raise ValueError("None of the requested columns ({}) are available!".format(columns))
284 df = self._df_df[newColumns]
285 else:
286 pfColumns = self._stringify_stringify(columns)
287 try:
288 df = self._pf_pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
289 except (AttributeError, KeyError):
290 newColumns = [c for c in columns if c in self.columnIndexcolumnIndex]
291 if not newColumns:
292 raise ValueError("None of the requested columns ({}) are available!".format(columns))
293 pfColumns = self._stringify_stringify(newColumns)
294 df = self._pf_pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
295
296 if droplevels:
297 # Drop levels of column index that have just one entry
298 levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1]
299
300 # Prevent error when trying to drop *all* columns
301 if len(levelsToDrop) == len(df.columns.names):
302 levelsToDrop.remove(df.columns.names[-1])
303
304 df.columns = df.columns.droplevel(levelsToDrop)
305
306 return df
307
308 def _colsFromDict(self, colDict):
309 new_colDict = {}
310 for i, lev in enumerate(self.columnLevelscolumnLevels):
311 if lev in colDict:
312 if isinstance(colDict[lev], str):
313 new_colDict[lev] = [colDict[lev]]
314 else:
315 new_colDict[lev] = colDict[lev]
316 else:
317 new_colDict[lev] = self.columnIndexcolumnIndex.levels[i]
318
319 levelCols = [new_colDict[lev] for lev in self.columnLevelscolumnLevels]
320 cols = product(*levelCols)
321 return list(cols)
322
323 def _stringify(self, cols):
324 return [str(c) for c in cols]
def toDataFrame(self, columns=None, droplevels=True)
def __init__(self, filename=None, dataFrame=None)
Definition: parquetTable.py:54
daf::base::PropertyList * list
Definition: fits.cc:913
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override
Definition: warpExposure.cc:0