LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
cassandra_utils.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = [
25 "literal",
26 "pandas_dataframe_factory",
27 "quote_id",
28 "raw_data_factory",
29 "select_concurrent",
30]
31
32import logging
33import numpy as np
34import pandas
35from datetime import datetime, timedelta
36from typing import Any, List, Tuple, Union
37
38# If cassandra-driver is not there the module can still be imported
39# but things will not work.
40try:
41 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Session
42 from cassandra.concurrent import execute_concurrent
43
44 CASSANDRA_IMPORTED = True
45except ImportError:
46 CASSANDRA_IMPORTED = False
47
48
49_LOG = logging.getLogger(__name__)
50
51
52if CASSANDRA_IMPORTED:
53
55 """Special wrapper class to workaround ``execute_concurrent()`` issue
56 which does not allow non-default execution profile.
57
58 Instance of this class can be passed to execute_concurrent() instead
59 of `Session` instance. This class implements a small set of methods
60 that are needed by ``execute_concurrent()``. When
61 ``execute_concurrent()`` is fixed to accept exectution profiles, this
62 wrapper can be dropped.
63 """
64
66 self, session: Session, execution_profile: Any = EXEC_PROFILE_DEFAULT
67 ):
68 self._session = session
69 self._execution_profile = execution_profile
70
72 self,
73 *args: Any,
74 execution_profile: Any = EXEC_PROFILE_DEFAULT,
75 **kwargs: Any,
76 ) -> Any:
77 # explicit parameter can override our settings
78 if execution_profile is EXEC_PROFILE_DEFAULT:
79 execution_profile = self._execution_profile
80 return self._session.execute_async(
81 *args, execution_profile=execution_profile, **kwargs
82 )
83
84 def submit(self, *args: Any, **kwargs: Any) -> Any:
85 # internal method
86 return self._session.submit(*args, **kwargs)
87
88
89def pandas_dataframe_factory(
90 colnames: List[str], rows: List[Tuple]
91) -> pandas.DataFrame:
92 """Special non-standard row factory that creates pandas DataFrame from
93 Cassandra result set.
94
95 Parameters
96 ----------
97 colnames : `list` [ `str` ]
98 Names of the columns.
99 rows : `list` of `tuple`
100 Result rows.
101
102 Returns
103 -------
104 catalog : `pandas.DataFrame`
105 DataFrame with the result set.
106
107 Notes
108 -----
109 When using this method as row factory for Cassandra, the resulting
110 DataFrame should be accessed in a non-standard way using
111 `ResultSet._current_rows` attribute.
112 """
113 return pandas.DataFrame.from_records(rows, columns=colnames)
114
115
116def raw_data_factory(
117 colnames: List[str], rows: List[Tuple]
118) -> Tuple[List[str], List[Tuple]]:
119 """Special non-standard row factory that makes 2-element tuple containing
120 unmodified data: list of column names and list of rows.
121
122 Parameters
123 ----------
124 colnames : `list` [ `str` ]
125 Names of the columns.
126 rows : `list` of `tuple`
127 Result rows.
128
129 Returns
130 -------
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
134 Result rows
135
136 Notes
137 -----
138 When using this method as row factory for Cassandra, the resulting
139 2-element tuple should be accessed in a non-standard way using
140 `ResultSet._current_rows` attribute. This factory is used to build
141 pandas DataFrames in `select_concurrent` method.
142 """
143 return (colnames, rows)
144
145
146def select_concurrent(
147 session: Session, statements: List[Tuple], execution_profile: str, concurrency: int
148) -> Union[pandas.DataFrame, List]:
149 """Execute bunch of queries concurrently and merge their results into
150 a single result.
151
152 Parameters
153 ----------
154 statements : `list` [ `tuple` ]
155 List of statements and their parameters, passed directly to
156 ``execute_concurrent()``.
157 execution_profile : `str`
158 Execution profile name.
159
160 Returns
161 -------
162 result
163 Combined result of multiple statements, type of the result depends on
164 specific row factory defined in execution profile. If row factory is
165 one of `pandas_dataframe_factory` or `raw_data_factory` then pandas
166 DataFrame is created from a combined result. Otherwise a list of
167 rows is returned, type of each row is determined by the row factory.
168
169 Notes
170 -----
171 This method can raise any exception that is raised by one of the provided
172 statements.
173 """
174 session_wrap = SessionWrapper(session, execution_profile)
175 results = execute_concurrent(
176 session_wrap,
177 statements,
178 results_generator=True,
179 raise_on_first_error=False,
180 concurrency=concurrency,
181 )
182
183 ep = session.get_execution_profile(execution_profile)
184 if ep.row_factory is raw_data_factory:
185
186 # Collect rows into a single list and build Dataframe out of that
187 _LOG.debug("making pandas data frame out of rows/columns")
188 columns: Any = None
189 rows = []
190 for success, result in results:
191 if success:
192 result = result._current_rows
193 if columns is None:
194 columns = result[0]
195 elif columns != result[0]:
196 _LOG.error(
197 "different columns returned by queries: %s and %s",
198 columns,
199 result[0],
200 )
201 raise ValueError(
202 f"different columns returned by queries: {columns} and {result[0]}"
203 )
204 rows += result[1]
205 else:
206 _LOG.error("error returned by query: %s", result)
207 raise result
208 catalog = pandas_dataframe_factory(columns, rows)
209 _LOG.debug("pandas catalog shape: %s", catalog.shape)
210 return catalog
211
212 elif ep.row_factory is pandas_dataframe_factory:
213
214 # Merge multiple DataFrames into one
215 _LOG.debug("making pandas data frame out of set of data frames")
216 dataframes = []
217 for success, result in results:
218 if success:
219 dataframes.append(result._current_rows)
220 else:
221 _LOG.error("error returned by query: %s", result)
222 raise result
223 # concatenate all frames
224 if len(dataframes) == 1:
225 catalog = dataframes[0]
226 else:
227 catalog = pandas.concat(dataframes)
228 _LOG.debug("pandas catalog shape: %s", catalog.shape)
229 return catalog
230
231 else:
232
233 # Just concatenate all rows into a single collection.
234 rows = []
235 for success, result in results:
236 if success:
237 rows.extend(result)
238 else:
239 _LOG.error("error returned by query: %s", result)
240 raise result
241 _LOG.debug("number of rows: %s", len(rows))
242 return rows
243
244
245def literal(v: Any) -> Any:
246 """Transform object into a value for the query."""
247 if v is None:
248 pass
249 elif isinstance(v, datetime):
250 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
251 elif isinstance(v, (bytes, str)):
252 pass
253 else:
254 try:
255 if not np.isfinite(v):
256 v = None
257 except TypeError:
258 pass
259 return v
260
261
262def quote_id(columnName: str) -> str:
263 """Smart quoting for column names. Lower-case names are not quoted."""
264 if not columnName.islower():
265 columnName = '"' + columnName + '"'
266 return columnName
table::Key< int > to
table::Key< int > a
Any submit(self, *Any args, **Any kwargs)
Any execute_async(self, *Any args, Any execution_profile=EXEC_PROFILE_DEFAULT, **Any kwargs)
def __init__(self, Session session, Any execution_profile=EXEC_PROFILE_DEFAULT)
daf::base::PropertySet * set
Definition: fits.cc:927