LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
parquetTable.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 """
22 Implementation of thin wrappers to pyarrow.ParquetFile.
23 """
24 
25 import re
26 import json
27 from itertools import product
28 import pyarrow
29 import pyarrow.parquet
30 import numpy as np
31 import pandas as pd
32 
33 
35  """Thin wrapper to pyarrow's ParquetFile object
36 
37  Call `toDataFrame` method to get a `pandas.DataFrame` object,
38  optionally passing specific columns.
39 
40  The main purpose of having this wrapper rather than directly
41  using `pyarrow.ParquetFile` is to make it nicer to load
42  selected subsets of columns, especially from dataframes with multi-level
43  column indices.
44 
45  Instantiated with either a path to a parquet file or a dataFrame
46 
47  Parameters
48  ----------
49  filename : str, optional
50  Path to Parquet file.
51  dataFrame : dataFrame, optional
52  """
53 
54  def __init__(self, filename=None, dataFrame=None):
55  self.filenamefilename = filename
56  if filename is not None:
57  self._pf_pf = pyarrow.parquet.ParquetFile(filename)
58  self._df_df = None
59  self._pandasMd_pandasMd = None
60  elif dataFrame is not None:
61  self._df_df = dataFrame
62  self._pf_pf = None
63  else:
64  raise ValueError("Either filename or dataFrame must be passed.")
65 
66  self._columns_columns = None
67  self._columnIndex_columnIndex = None
68 
69  def write(self, filename):
70  """Write pandas dataframe to parquet
71 
72  Parameters
73  ----------
74  filename : str
75  Path to which to write.
76  """
77  if self._df_df is None:
78  raise ValueError("df property must be defined to write.")
79  table = pyarrow.Table.from_pandas(self._df_df)
80  pyarrow.parquet.write_table(table, filename, compression="none")
81 
82  @property
83  def pandasMd(self):
84  if self._pf_pf is None:
85  raise AttributeError("This property is only accessible if ._pf is set.")
86  if self._pandasMd_pandasMd is None:
87  self._pandasMd_pandasMd = json.loads(self._pf_pf.metadata.metadata[b"pandas"])
88  return self._pandasMd_pandasMd
89 
90  @property
91  def columnIndex(self):
92  """Columns as a pandas Index
93  """
94  if self._columnIndex_columnIndex is None:
95  self._columnIndex_columnIndex = self._getColumnIndex_getColumnIndex()
96  return self._columnIndex_columnIndex
97 
98  def _getColumnIndex(self):
99  if self._df_df is not None:
100  return self._df_df.columns
101  else:
102  return pd.Index(self.columnscolumns)
103 
104  @property
105  def columns(self):
106  """List of column names (or column index if df is set)
107 
108  This may either be a list of column names, or a
109  pandas.Index object describing the column index, depending
110  on whether the ParquetTable object is wrapping a ParquetFile
111  or a DataFrame.
112  """
113  if self._columns_columns is None:
114  self._columns_columns = self._getColumns_getColumns()
115  return self._columns_columns
116 
117  def _getColumns(self):
118  if self._df_df is not None:
119  return self._sanitizeColumns_sanitizeColumns(self._df_df.columns)
120  else:
121  return self._pf_pf.metadata.schema.names
122 
123  def _sanitizeColumns(self, columns):
124  return [c for c in columns if c in self.columnIndexcolumnIndex]
125 
126  def toDataFrame(self, columns=None):
127  """Get table (or specified columns) as a pandas DataFrame
128 
129  Parameters
130  ----------
131  columns : list, optional
132  Desired columns. If `None`, then all columns will be
133  returned.
134  """
135  if self._pf_pf is None:
136  if columns is None:
137  return self._df_df
138  else:
139  return self._df_df[columns]
140 
141  if columns is None:
142  return self._pf_pf.read().to_pandas()
143 
144  df = self._pf_pf.read(columns=columns, use_pandas_metadata=True).to_pandas()
145  return df
146 
147 
149  """Wrapper to access dataframe with multi-level column index from Parquet
150 
151  This subclass of `ParquetTable` to handle the multi-level is necessary
152  because there is not a convenient way to request specific table subsets
153  by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
154 
155  Additionally, pyarrow stores multilevel index information in a very strange
156  way. Pandas stores it as a tuple, so that one can access a single column
157  from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
158  some reason pyarrow saves these indices as "stringified" tuples, such that
159  in order to read thissame column from a table written to Parquet, you would
160  have to do the following:
161 
162  pf = pyarrow.ParquetFile(filename)
163  df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
164 
165  See also https://github.com/apache/arrow/issues/1771, where we've raised
166  this issue.
167 
168  As multilevel-indexed dataframes can be very useful to store data like
169  multiple filters' worth of data in the same table, this case deserves a
170  wrapper to enable easier access;
171  that's what this object is for. For example,
172 
173  parq = MultilevelParquetTable(filename)
174  columnDict = {'dataset':'meas',
175  'filter':'HSC-G',
176  'column':['coord_ra', 'coord_dec']}
177  df = parq.toDataFrame(columns=columnDict)
178 
179  will return just the coordinate columns; the equivalent of calling
180  `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
181  but without having to load the whole frame into memory---this reads just
182  those columns from disk. You can also request a sub-table; e.g.,
183 
184  parq = MultilevelParquetTable(filename)
185  columnDict = {'dataset':'meas',
186  'filter':'HSC-G'}
187  df = parq.toDataFrame(columns=columnDict)
188 
189  and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
190 
191  Parameters
192  ----------
193  filename : str, optional
194  Path to Parquet file.
195  dataFrame : dataFrame, optional
196  """
197 
198  def __init__(self, *args, **kwargs):
199  super(MultilevelParquetTable, self).__init__(*args, **kwargs)
200 
201  self._columnLevelNames_columnLevelNames = None
202 
203  @property
204  def columnLevelNames(self):
205  if self._columnLevelNames_columnLevelNames is None:
206  self._columnLevelNames_columnLevelNames = {
207  level: list(np.unique(np.array(self.columnscolumns)[:, i]))
208  for i, level in enumerate(self.columnLevelscolumnLevels)
209  }
210  return self._columnLevelNames_columnLevelNames
211 
212  @property
213  def columnLevels(self):
214  """Names of levels in column index
215  """
216  return self.columnIndexcolumnIndex.names
217 
218  def _getColumnIndex(self):
219  if self._df_df is not None:
220  return super()._getColumnIndex()
221  else:
222  levelNames = [f["name"] for f in self.pandasMdpandasMd["column_indexes"]]
223  return pd.MultiIndex.from_tuples(self.columnscolumns, names=levelNames)
224 
225  def _getColumns(self):
226  if self._df_df is not None:
227  return super()._getColumns()
228  else:
229  columns = self._pf_pf.metadata.schema.names
230  n = len(self.pandasMdpandasMd["column_indexes"])
231  pattern = re.compile(", ".join(["'(.*)'"] * n))
232  matches = [re.search(pattern, c) for c in columns]
233  return [m.groups() for m in matches if m is not None]
234 
235  def toDataFrame(self, columns=None, droplevels=True):
236  """Get table (or specified columns) as a pandas DataFrame
237 
238  To get specific columns in specified sub-levels:
239 
240  parq = MultilevelParquetTable(filename)
241  columnDict = {'dataset':'meas',
242  'filter':'HSC-G',
243  'column':['coord_ra', 'coord_dec']}
244  df = parq.toDataFrame(columns=columnDict)
245 
246  Or, to get an entire subtable, leave out one level name:
247 
248  parq = MultilevelParquetTable(filename)
249  columnDict = {'dataset':'meas',
250  'filter':'HSC-G'}
251  df = parq.toDataFrame(columns=columnDict)
252 
253  Parameters
254  ----------
255  columns : list or dict, optional
256  Desired columns. If `None`, then all columns will be
257  returned. If a list, then the names of the columns must
258  be *exactly* as stored by pyarrow; that is, stringified tuples.
259  If a dictionary, then the entries of the dictionary must
260  correspond to the level names of the column multi-index
261  (that is, the `columnLevels` attribute). Not every level
262  must be passed; if any level is left out, then all entries
263  in that level will be implicitly included.
264  droplevels : bool
265  If True drop levels of column index that have just one entry
266 
267  """
268  if columns is None:
269  if self._pf_pf is None:
270  return self._df_df
271  else:
272  return self._pf_pf.read().to_pandas()
273 
274  if isinstance(columns, dict):
275  columns = self._colsFromDict_colsFromDict(columns)
276 
277  if self._pf_pf is None:
278  try:
279  df = self._df_df[columns]
280  except (AttributeError, KeyError):
281  newColumns = [c for c in columns if c in self.columnIndexcolumnIndex]
282  if not newColumns:
283  raise ValueError("None of the requested columns ({}) are available!".format(columns))
284  df = self._df_df[newColumns]
285  else:
286  pfColumns = self._stringify_stringify(columns)
287  try:
288  df = self._pf_pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
289  except (AttributeError, KeyError):
290  newColumns = [c for c in columns if c in self.columnIndexcolumnIndex]
291  if not newColumns:
292  raise ValueError("None of the requested columns ({}) are available!".format(columns))
293  pfColumns = self._stringify_stringify(newColumns)
294  df = self._pf_pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
295 
296  if droplevels:
297  # Drop levels of column index that have just one entry
298  levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1]
299 
300  # Prevent error when trying to drop *all* columns
301  if len(levelsToDrop) == len(df.columns.names):
302  levelsToDrop.remove(df.columns.names[-1])
303 
304  df.columns = df.columns.droplevel(levelsToDrop)
305 
306  return df
307 
308  def _colsFromDict(self, colDict):
309  new_colDict = {}
310  for i, lev in enumerate(self.columnLevelscolumnLevels):
311  if lev in colDict:
312  if isinstance(colDict[lev], str):
313  new_colDict[lev] = [colDict[lev]]
314  else:
315  new_colDict[lev] = colDict[lev]
316  else:
317  new_colDict[lev] = self.columnIndexcolumnIndex.levels[i]
318 
319  levelCols = [new_colDict[lev] for lev in self.columnLevelscolumnLevels]
320  cols = product(*levelCols)
321  return list(cols)
322 
323  def _stringify(self, cols):
324  return [str(c) for c in cols]
def toDataFrame(self, columns=None, droplevels=True)
def __init__(self, filename=None, dataFrame=None)
Definition: parquetTable.py:54
daf::base::PropertyList * list
Definition: fits.cc:913
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
std::shared_ptr< table::io::Persistable > read(table::io::InputArchive const &archive, table::io::CatalogVector const &catalogs) const override
Definition: warpExposure.cc:0