LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
cassandra_utils.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = [
25 "ApdbCassandraTableData",
26 "PreparedStatementCache",
27 "literal",
28 "pandas_dataframe_factory",
29 "quote_id",
30 "raw_data_factory",
31 "select_concurrent",
32]
33
34import logging
35from collections.abc import Collection, Iterable, Iterator, Sequence
36from datetime import datetime, timedelta
37from typing import Any
38from uuid import UUID
39
40import felis.datamodel
41import numpy as np
42import pandas
43
44# If cassandra-driver is not there the module can still be imported
45# but things will not work.
46try:
47 import cassandra.concurrent
48 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Session
49 from cassandra.query import PreparedStatement
50
51 CASSANDRA_IMPORTED = True
52except ImportError:
53 CASSANDRA_IMPORTED = False
54 EXEC_PROFILE_DEFAULT = object()
55
56from ..apdbReplica import ApdbTableData
57
58_LOG = logging.getLogger(__name__)
59
60
62 """Implementation of ApdbTableData that wraps Cassandra raw data."""
63
64 def __init__(self, columns: list[str], rows: list[tuple]):
65 self._columns = columns
66 self._rows = rows
67 self._column_types: dict[str, felis.datamodel.DataType] = {}
68
69 def set_column_types(self, types: dict[str, felis.datamodel.DataType]) -> None:
70 """Update column types.
71
72 Parameters
73 ----------
74 types : `dict`[`str`, `felis.datamodel.DataType`]
75 Mapping of column name its type.
76
77 Notes
78 -----
79 Due to the way how instances of this class are constructed it is
80 impossible to pass types of columns to the constructor, instead we will
81 need to make a call to this method after construction.
82 """
83 self._column_types = types
84
85 def column_names(self) -> Sequence[str]:
86 # docstring inherited
87 return self._columns
88
89 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
90 return tuple((column, self._column_types[column]) for column in self._columns)
91
92 def rows(self) -> Collection[tuple]:
93 # docstring inherited
94 return self._rows
95
96 def append(self, other: ApdbCassandraTableData) -> None:
97 """Extend rows in this table with rows in other table"""
98 if self._columns != other._columns:
99 raise ValueError(f"Different columns returned by queries: {self._columns} and {other._columns}")
100 self._rows.extend(other._rows)
101
102 def project(self, *, drop: Iterable[str] = set()) -> None:
103 """Modify data in place by droppiing some columns."""
104 drop_set = set(drop)
105 if not drop_set:
106 return
107
108 drop_idx = []
109 for idx, col_name in enumerate(self._columns):
110 if col_name in drop_set:
111 drop_idx.append(idx)
112 # Have to reverse it so deletion does not change index.
113 drop_idx.reverse()
114
115 for row_idx in range(len(self._rows)):
116 row = list(self._rows[row_idx])
117 for idx in drop_idx:
118 del row[idx]
119 self._rows[row_idx] = tuple(row)
120
121 for idx in drop_idx:
122 del self._columns[idx]
123
124 def __iter__(self) -> Iterator[tuple]:
125 """Make it look like a row iterator, needed for some odd logic."""
126 return iter(self._rows)
127
128
130 """Cache for prepared Cassandra statements"""
131
132 def __init__(self, session: Session) -> None:
133 self._session = session
134 self._prepared_statements: dict[str, PreparedStatement] = {}
135
136 def prepare(self, query: str) -> PreparedStatement:
137 """Convert query string into prepared statement."""
138 stmt = self._prepared_statements.get(query)
139 if stmt is None:
140 stmt = self._session.prepare(query)
141 self._prepared_statements[query] = stmt
142 return stmt
143
144
145def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
146 """Create pandas DataFrame from Cassandra result set.
147
148 Parameters
149 ----------
150 colnames : `list` [ `str` ]
151 Names of the columns.
152 rows : `list` of `tuple`
153 Result rows.
154
155 Returns
156 -------
157 catalog : `pandas.DataFrame`
158 DataFrame with the result set.
159
160 Notes
161 -----
162 When using this method as row factory for Cassandra, the resulting
163 DataFrame should be accessed in a non-standard way using
164 `ResultSet._current_rows` attribute.
165 """
166 return pandas.DataFrame.from_records(rows, columns=colnames)
167
168
169def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
170 """Make 2-element tuple containing unmodified data: list of column names
171 and list of rows.
172
173 Parameters
174 ----------
175 colnames : `list` [ `str` ]
176 Names of the columns.
177 rows : `list` of `tuple`
178 Result rows.
179
180 Returns
181 -------
182 data : `ApdbCassandraTableData`
183 Input data wrapped into ApdbCassandraTableData.
184
185 Notes
186 -----
187 When using this method as row factory for Cassandra, the resulting
188 object should be accessed in a non-standard way using
189 `ResultSet._current_rows` attribute.
190 """
191 return ApdbCassandraTableData(colnames, rows)
192
193
195 session: Session,
196 statements: list[tuple],
197 *,
198 execution_profile: object = EXEC_PROFILE_DEFAULT,
199 concurrency: int = 100,
200) -> None:
201 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
202 importing cassandra in other modules.
203 """
204 cassandra.concurrent.execute_concurrent(
205 session,
206 statements,
207 concurrency=concurrency,
208 execution_profile=execution_profile,
209 )
210
211
213 session: Session, statements: list[tuple], execution_profile: str, concurrency: int
214) -> pandas.DataFrame | ApdbCassandraTableData | list:
215 """Execute bunch of queries concurrently and merge their results into
216 a single result.
217
218 Parameters
219 ----------
220 statements : `list` [ `tuple` ]
221 List of statements and their parameters, passed directly to
222 ``execute_concurrent()``.
223 execution_profile : `str`
224 Execution profile name.
225
226 Returns
227 -------
228 result
229 Combined result of multiple statements, type of the result depends on
230 specific row factory defined in execution profile. If row factory is
231 `pandas_dataframe_factory` then pandas DataFrame is created from a
232 combined result. If row factory is `raw_data_factory` then
233 `ApdbCassandraTableData` is built from all records. Otherwise a list of
234 rows is returned, type of each row is determined by the row factory.
235
236 Notes
237 -----
238 This method can raise any exception that is raised by one of the provided
239 statements.
240 """
241 results = cassandra.concurrent.execute_concurrent(
242 session,
243 statements,
244 results_generator=True,
245 raise_on_first_error=False,
246 concurrency=concurrency,
247 execution_profile=execution_profile,
248 )
249
250 ep = session.get_execution_profile(execution_profile)
251 if ep.row_factory is raw_data_factory:
252 # Collect rows into a single list and build Dataframe out of that
253 _LOG.debug("making raw data out of rows/columns")
254 table_data: ApdbCassandraTableData | None = None
255 for success, result in results:
256 if success:
257 data = result._current_rows
258 assert isinstance(data, ApdbCassandraTableData)
259 if table_data is None:
260 table_data = data
261 else:
262 table_data.append(data)
263 else:
264 _LOG.error("error returned by query: %s", result)
265 raise result
266 if table_data is None:
267 table_data = ApdbCassandraTableData([], [])
268 return table_data
269
270 elif ep.row_factory is pandas_dataframe_factory:
271 # Merge multiple DataFrames into one
272 _LOG.debug("making pandas data frame out of set of data frames")
273 dataframes = []
274 for success, result in results:
275 if success:
276 dataframes.append(result._current_rows)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 # Concatenate all frames, but skip empty ones.
281 non_empty = [df for df in dataframes if not df.empty]
282 if not non_empty:
283 # If all frames are empty, return the first one.
284 catalog = dataframes[0]
285 elif len(non_empty) == 1:
286 catalog = non_empty[0]
287 else:
288 catalog = pandas.concat(non_empty)
289 _LOG.debug("pandas catalog shape: %s", catalog.shape)
290 return catalog
291
292 else:
293 # Just concatenate all rows into a single collection.
294 rows = []
295 for success, result in results:
296 if success:
297 rows.extend(result)
298 else:
299 _LOG.error("error returned by query: %s", result)
300 raise result
301 _LOG.debug("number of rows: %s", len(rows))
302 return rows
303
304
305def literal(v: Any) -> Any:
306 """Transform object into a value for the query."""
307 if v is None or v is pandas.NA:
308 v = None
309 elif isinstance(v, datetime):
310 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
311 elif isinstance(v, bytes | str | UUID | int):
312 pass
313 else:
314 try:
315 if not np.isfinite(v):
316 v = None
317 except TypeError:
318 pass
319 return v
320
321
322def quote_id(columnName: str) -> str:
323 """Smart quoting for column names. Lower-case names are not quoted."""
324 if not columnName.islower():
325 columnName = '"' + columnName + '"'
326 return columnName
Sequence[tuple[str, felis.datamodel.DataType]] column_defs(self)
None set_column_types(self, dict[str, felis.datamodel.DataType] types)
__init__(self, list[str] columns, list[tuple] rows)
pandas.DataFrame pandas_dataframe_factory(list[str] colnames, list[tuple] rows)
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent(Session session, list[tuple] statements, str execution_profile, int concurrency)
None execute_concurrent(Session session, list[tuple] statements, *, object execution_profile=EXEC_PROFILE_DEFAULT, int concurrency=100)
ApdbCassandraTableData raw_data_factory(list[str] colnames, list[tuple] rows)