Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+83433b07ee,g16d25e1f1b+23bc9e47ac,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+c74e83464e,g53246c7159+8c5ae1fdc5,g55585698de+1e04e59700,g56a49b3a55+92a7603e7a,g60b5630c4e+1e04e59700,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+1e04e59700,g989de1cb63+3fc8cb0b9e,g9d31334357+1e04e59700,g9f33ca652e+0a83e03614,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+8b4b9c8ed7,gb89ab40317+3fc8cb0b9e,gc0af124501+57ccba3ad1,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gd794735e4e+7be992507c,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+c7f52e58fe,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+747388abfa,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
cassandra_utils.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = [
25 "ApdbCassandraTableData",
26 "PreparedStatementCache",
27 "literal",
28 "pandas_dataframe_factory",
29 "quote_id",
30 "raw_data_factory",
31 "select_concurrent",
32]
33
34import logging
35from collections.abc import Collection, Iterable, Iterator, Sequence
36from datetime import datetime, timedelta
37from typing import Any
38from uuid import UUID
39
40import numpy as np
41import pandas
42
43# If cassandra-driver is not there the module can still be imported
44# but things will not work.
45try:
46 import cassandra.concurrent
47 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Session
48 from cassandra.query import PreparedStatement
49
50 CASSANDRA_IMPORTED = True
51except ImportError:
52 CASSANDRA_IMPORTED = False
53 EXEC_PROFILE_DEFAULT = object()
54
55from ..apdbReplica import ApdbTableData
56
57_LOG = logging.getLogger(__name__)
58
59
61 """Implementation of ApdbTableData that wraps Cassandra raw data."""
62
63 def __init__(self, columns: list[str], rows: list[tuple]):
64 self._columns = columns
65 self._rows = rows
66
67 def column_names(self) -> Sequence[str]:
68 # docstring inherited
69 return self._columns
70
71 def rows(self) -> Collection[tuple]:
72 # docstring inherited
73 return self._rows
74
75 def append(self, other: ApdbCassandraTableData) -> None:
76 """Extend rows in this table with rows in other table"""
77 if self._columns != other._columns:
78 raise ValueError(f"Different columns returned by queries: {self._columns} and {other._columns}")
79 self._rows.extend(other._rows)
80
81 def project(self, *, drop: Iterable[str] = set()) -> None:
82 """Modify data in place by droppiing some columns."""
83 drop_set = set(drop)
84 if not drop_set:
85 return
86
87 drop_idx = []
88 for idx, col_name in enumerate(self._columns):
89 if col_name in drop_set:
90 drop_idx.append(idx)
91 # Have to reverse it so deletion does not change index.
92 drop_idx.reverse()
93
94 for row_idx in range(len(self._rows)):
95 row = list(self._rows[row_idx])
96 for idx in drop_idx:
97 del row[idx]
98 self._rows[row_idx] = tuple(row)
99
100 for idx in drop_idx:
101 del self._columns[idx]
102
103 def __iter__(self) -> Iterator[tuple]:
104 """Make it look like a row iterator, needed for some odd logic."""
105 return iter(self._rows)
106
107
109 """Cache for prepared Cassandra statements"""
110
111 def __init__(self, session: Session) -> None:
112 self._session = session
113 self._prepared_statements: dict[str, PreparedStatement] = {}
114
115 def prepare(self, query: str) -> PreparedStatement:
116 """Convert query string into prepared statement."""
117 stmt = self._prepared_statements.get(query)
118 if stmt is None:
119 stmt = self._session.prepare(query)
120 self._prepared_statements[query] = stmt
121 return stmt
122
123
124def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
125 """Create pandas DataFrame from Cassandra result set.
126
127 Parameters
128 ----------
129 colnames : `list` [ `str` ]
130 Names of the columns.
131 rows : `list` of `tuple`
132 Result rows.
133
134 Returns
135 -------
136 catalog : `pandas.DataFrame`
137 DataFrame with the result set.
138
139 Notes
140 -----
141 When using this method as row factory for Cassandra, the resulting
142 DataFrame should be accessed in a non-standard way using
143 `ResultSet._current_rows` attribute.
144 """
145 return pandas.DataFrame.from_records(rows, columns=colnames)
146
147
148def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
149 """Make 2-element tuple containing unmodified data: list of column names
150 and list of rows.
151
152 Parameters
153 ----------
154 colnames : `list` [ `str` ]
155 Names of the columns.
156 rows : `list` of `tuple`
157 Result rows.
158
159 Returns
160 -------
161 data : `ApdbCassandraTableData`
162 Input data wrapped into ApdbCassandraTableData.
163
164 Notes
165 -----
166 When using this method as row factory for Cassandra, the resulting
167 object should be accessed in a non-standard way using
168 `ResultSet._current_rows` attribute.
169 """
170 return ApdbCassandraTableData(colnames, rows)
171
172
174 session: Session,
175 statements: list[tuple],
176 *,
177 execution_profile: object = EXEC_PROFILE_DEFAULT,
178 concurrency: int = 100,
179) -> None:
180 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
181 importing cassandra in other modules.
182 """
183 cassandra.concurrent.execute_concurrent(
184 session,
185 statements,
186 concurrency=concurrency,
187 execution_profile=execution_profile,
188 )
189
190
192 session: Session, statements: list[tuple], execution_profile: str, concurrency: int
193) -> pandas.DataFrame | ApdbCassandraTableData | list:
194 """Execute bunch of queries concurrently and merge their results into
195 a single result.
196
197 Parameters
198 ----------
199 statements : `list` [ `tuple` ]
200 List of statements and their parameters, passed directly to
201 ``execute_concurrent()``.
202 execution_profile : `str`
203 Execution profile name.
204
205 Returns
206 -------
207 result
208 Combined result of multiple statements, type of the result depends on
209 specific row factory defined in execution profile. If row factory is
210 `pandas_dataframe_factory` then pandas DataFrame is created from a
211 combined result. If row factory is `raw_data_factory` then
212 `ApdbCassandraTableData` is built from all records. Otherwise a list of
213 rows is returned, type of each row is determined by the row factory.
214
215 Notes
216 -----
217 This method can raise any exception that is raised by one of the provided
218 statements.
219 """
220 results = cassandra.concurrent.execute_concurrent(
221 session,
222 statements,
223 results_generator=True,
224 raise_on_first_error=False,
225 concurrency=concurrency,
226 execution_profile=execution_profile,
227 )
228
229 ep = session.get_execution_profile(execution_profile)
230 if ep.row_factory is raw_data_factory:
231 # Collect rows into a single list and build Dataframe out of that
232 _LOG.debug("making raw data out of rows/columns")
233 table_data: ApdbCassandraTableData | None = None
234 for success, result in results:
235 if success:
236 data = result._current_rows
237 assert isinstance(data, ApdbCassandraTableData)
238 if table_data is None:
239 table_data = data
240 else:
241 table_data.append(data)
242 else:
243 _LOG.error("error returned by query: %s", result)
244 raise result
245 if table_data is None:
246 table_data = ApdbCassandraTableData([], [])
247 return table_data
248
249 elif ep.row_factory is pandas_dataframe_factory:
250 # Merge multiple DataFrames into one
251 _LOG.debug("making pandas data frame out of set of data frames")
252 dataframes = []
253 for success, result in results:
254 if success:
255 dataframes.append(result._current_rows)
256 else:
257 _LOG.error("error returned by query: %s", result)
258 raise result
259 # Concatenate all frames, but skip empty ones.
260 non_empty = [df for df in dataframes if not df.empty]
261 if not non_empty:
262 # If all frames are empty, return the first one.
263 catalog = dataframes[0]
264 elif len(non_empty) == 1:
265 catalog = non_empty[0]
266 else:
267 catalog = pandas.concat(non_empty)
268 _LOG.debug("pandas catalog shape: %s", catalog.shape)
269 return catalog
270
271 else:
272 # Just concatenate all rows into a single collection.
273 rows = []
274 for success, result in results:
275 if success:
276 rows.extend(result)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 _LOG.debug("number of rows: %s", len(rows))
281 return rows
282
283
284def literal(v: Any) -> Any:
285 """Transform object into a value for the query."""
286 if v is None:
287 pass
288 elif isinstance(v, datetime):
289 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
290 elif isinstance(v, (bytes, str, UUID, int)):
291 pass
292 else:
293 try:
294 if not np.isfinite(v):
295 v = None
296 except TypeError:
297 pass
298 return v
299
300
301def quote_id(columnName: str) -> str:
302 """Smart quoting for column names. Lower-case names are not quoted."""
303 if not columnName.islower():
304 columnName = '"' + columnName + '"'
305 return columnName
__init__(self, list[str] columns, list[tuple] rows)
pandas.DataFrame pandas_dataframe_factory(list[str] colnames, list[tuple] rows)
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent(Session session, list[tuple] statements, str execution_profile, int concurrency)
None execute_concurrent(Session session, list[tuple] statements, *, object execution_profile=EXEC_PROFILE_DEFAULT, int concurrency=100)
ApdbCassandraTableData raw_data_factory(list[str] colnames, list[tuple] rows)