LSST Applications g0f08755f38+c89d42e150,g1635faa6d4+b6cf076a36,g1653933729+a8ce1bb630,g1a0ca8cf93+4c08b13bf7,g28da252d5a+f33f8200ef,g29321ee8c0+0187be18b1,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+e740673f1a,g5fbc88fb19+17cd334064,g7642f7d749+c89d42e150,g781aacb6e4+a8ce1bb630,g80478fca09+f8b2ab54e1,g82479be7b0+e2bd23ab8b,g858d7b2824+c89d42e150,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+065360aec4,gacf8899fa4+9553554aa7,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gbd46683f8f+ac57cbb13d,gc28159a63d+9634bc57db,gcf0d15dbbd+e37acf7834,gda3e153d99+c89d42e150,gda6a2b7d83+e37acf7834,gdaeeff99f8+1711a396fd,ge2409df99d+cb1e6652d6,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+02b11634a5,w.2024.45
LSST Data Management Base Package
Loading...
Searching...
No Matches
cassandra_utils.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = [
25 "ApdbCassandraTableData",
26 "PreparedStatementCache",
27 "literal",
28 "pandas_dataframe_factory",
29 "quote_id",
30 "raw_data_factory",
31 "select_concurrent",
32]
33
34import logging
35from collections.abc import Collection, Iterator, Sequence
36from datetime import datetime, timedelta
37from typing import Any
38from uuid import UUID
39
40import numpy as np
41import pandas
42
43# If cassandra-driver is not there the module can still be imported
44# but things will not work.
45try:
46 from cassandra.cluster import Session
47 from cassandra.concurrent import execute_concurrent
48 from cassandra.query import PreparedStatement
49
50 CASSANDRA_IMPORTED = True
51except ImportError:
52 CASSANDRA_IMPORTED = False
53
54from ..apdbReplica import ApdbTableData
55
56_LOG = logging.getLogger(__name__)
57
58
60 """Implementation of ApdbTableData that wraps Cassandra raw data."""
61
62 def __init__(self, columns: list[str], rows: list[tuple]):
63 self._columns = columns
64 self._rows = rows
65
66 def column_names(self) -> Sequence[str]:
67 # docstring inherited
68 return self._columns
69
70 def rows(self) -> Collection[tuple]:
71 # docstring inherited
72 return self._rows
73
74 def append(self, other: ApdbCassandraTableData) -> None:
75 """Extend rows in this table with rows in other table"""
76 if self._columns != other._columns:
77 raise ValueError(f"Different columns returned by queries: {self._columns} and {other._columns}")
78 self._rows.extend(other._rows)
79
80 def __iter__(self) -> Iterator[tuple]:
81 """Make it look like a row iterator, needed for some odd logic."""
82 return iter(self._rows)
83
84
86 """Cache for prepared Cassandra statements"""
87
88 def __init__(self, session: Session) -> None:
89 self._session = session
90 self._prepared_statements: dict[str, PreparedStatement] = {}
91
92 def prepare(self, query: str) -> PreparedStatement:
93 """Convert query string into prepared statement."""
94 stmt = self._prepared_statements.get(query)
95 if stmt is None:
96 stmt = self._session.prepare(query)
97 self._prepared_statements[query] = stmt
98 return stmt
99
100
101def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
102 """Create pandas DataFrame from Cassandra result set.
103
104 Parameters
105 ----------
106 colnames : `list` [ `str` ]
107 Names of the columns.
108 rows : `list` of `tuple`
109 Result rows.
110
111 Returns
112 -------
113 catalog : `pandas.DataFrame`
114 DataFrame with the result set.
115
116 Notes
117 -----
118 When using this method as row factory for Cassandra, the resulting
119 DataFrame should be accessed in a non-standard way using
120 `ResultSet._current_rows` attribute.
121 """
122 return pandas.DataFrame.from_records(rows, columns=colnames)
123
124
125def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
126 """Make 2-element tuple containing unmodified data: list of column names
127 and list of rows.
128
129 Parameters
130 ----------
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
134 Result rows.
135
136 Returns
137 -------
138 data : `ApdbCassandraTableData`
139 Input data wrapped into ApdbCassandraTableData.
140
141 Notes
142 -----
143 When using this method as row factory for Cassandra, the resulting
144 object should be accessed in a non-standard way using
145 `ResultSet._current_rows` attribute.
146 """
147 return ApdbCassandraTableData(colnames, rows)
148
149
151 session: Session, statements: list[tuple], execution_profile: str, concurrency: int
152) -> pandas.DataFrame | ApdbCassandraTableData | list:
153 """Execute bunch of queries concurrently and merge their results into
154 a single result.
155
156 Parameters
157 ----------
158 statements : `list` [ `tuple` ]
159 List of statements and their parameters, passed directly to
160 ``execute_concurrent()``.
161 execution_profile : `str`
162 Execution profile name.
163
164 Returns
165 -------
166 result
167 Combined result of multiple statements, type of the result depends on
168 specific row factory defined in execution profile. If row factory is
169 `pandas_dataframe_factory` then pandas DataFrame is created from a
170 combined result. If row factory is `raw_data_factory` then
171 `ApdbCassandraTableData` is built from all records. Otherwise a list of
172 rows is returned, type of each row is determined by the row factory.
173
174 Notes
175 -----
176 This method can raise any exception that is raised by one of the provided
177 statements.
178 """
179 results = execute_concurrent(
180 session,
181 statements,
182 results_generator=True,
183 raise_on_first_error=False,
184 concurrency=concurrency,
185 execution_profile=execution_profile,
186 )
187
188 ep = session.get_execution_profile(execution_profile)
189 if ep.row_factory is raw_data_factory:
190 # Collect rows into a single list and build Dataframe out of that
191 _LOG.debug("making raw data out of rows/columns")
192 table_data: ApdbCassandraTableData | None = None
193 for success, result in results:
194 if success:
195 data = result._current_rows
196 assert isinstance(data, ApdbCassandraTableData)
197 if table_data is None:
198 table_data = data
199 else:
200 table_data.append(data)
201 else:
202 _LOG.error("error returned by query: %s", result)
203 raise result
204 if table_data is None:
205 table_data = ApdbCassandraTableData([], [])
206 return table_data
207
208 elif ep.row_factory is pandas_dataframe_factory:
209 # Merge multiple DataFrames into one
210 _LOG.debug("making pandas data frame out of set of data frames")
211 dataframes = []
212 for success, result in results:
213 if success:
214 dataframes.append(result._current_rows)
215 else:
216 _LOG.error("error returned by query: %s", result)
217 raise result
218 # Concatenate all frames, but skip empty ones.
219 non_empty = [df for df in dataframes if not df.empty]
220 if not non_empty:
221 # If all frames are empty, return the first one.
222 catalog = dataframes[0]
223 elif len(non_empty) == 1:
224 catalog = non_empty[0]
225 else:
226 catalog = pandas.concat(non_empty)
227 _LOG.debug("pandas catalog shape: %s", catalog.shape)
228 return catalog
229
230 else:
231 # Just concatenate all rows into a single collection.
232 rows = []
233 for success, result in results:
234 if success:
235 rows.extend(result)
236 else:
237 _LOG.error("error returned by query: %s", result)
238 raise result
239 _LOG.debug("number of rows: %s", len(rows))
240 return rows
241
242
243def literal(v: Any) -> Any:
244 """Transform object into a value for the query."""
245 if v is None:
246 pass
247 elif isinstance(v, datetime):
248 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
249 elif isinstance(v, (bytes, str, UUID, int)):
250 pass
251 else:
252 try:
253 if not np.isfinite(v):
254 v = None
255 except TypeError:
256 pass
257 return v
258
259
260def quote_id(columnName: str) -> str:
261 """Smart quoting for column names. Lower-case names are not quoted."""
262 if not columnName.islower():
263 columnName = '"' + columnName + '"'
264 return columnName
__init__(self, list[str] columns, list[tuple] rows)
pandas.DataFrame pandas_dataframe_factory(list[str] colnames, list[tuple] rows)
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent(Session session, list[tuple] statements, str execution_profile, int concurrency)
ApdbCassandraTableData raw_data_factory(list[str] colnames, list[tuple] rows)