LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
cassandra_utils.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = [
25 "literal",
26 "pandas_dataframe_factory",
27 "quote_id",
28 "raw_data_factory",
29 "select_concurrent",
30]
31
32import logging
33import numpy as np
34import pandas
35from datetime import datetime, timedelta
36from typing import Any, List, Tuple, Union
37
38# If cassandra-driver is not there the module can still be imported
39# but things will not work.
40try:
41 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Session
42 from cassandra.concurrent import execute_concurrent
43
44 CASSANDRA_IMPORTED = True
45except ImportError:
46 CASSANDRA_IMPORTED = False
47
48
49_LOG = logging.getLogger(__name__)
50
51
52if CASSANDRA_IMPORTED:
53
55 """Special wrapper class to workaround ``execute_concurrent()`` issue
56 which does not allow non-default execution profile.
57
58 Instance of this class can be passed to execute_concurrent() instead
59 of `Session` instance. This class implements a small set of methods
60 that are needed by ``execute_concurrent()``. When
61 ``execute_concurrent()`` is fixed to accept exectution profiles, this
62 wrapper can be dropped.
63 """
64
66 self, session: Session, execution_profile: Any = EXEC_PROFILE_DEFAULT
67 ):
68 self._session_session = session
69 self._execution_profile_execution_profile = execution_profile
70
72 self,
73 *args: Any,
74 execution_profile: Any = EXEC_PROFILE_DEFAULT,
75 **kwargs: Any,
76 ) -> Any:
77 # explicit parameter can override our settings
78 if execution_profile is EXEC_PROFILE_DEFAULT:
79 execution_profile = self._execution_profile_execution_profile
80 return self._session_session.execute_async(
81 *args, execution_profile=execution_profile, **kwargs
82 )
83
84 def submit(self, *args: Any, **kwargs: Any) -> Any:
85 # internal method
86 return self._session_session.submit(*args, **kwargs)
87
88
90 colnames: List[str], rows: List[Tuple]
91) -> pandas.DataFrame:
92 """Special non-standard row factory that creates pandas DataFrame from
93 Cassandra result set.
94
95 Parameters
96 ----------
97 colnames : `list` [ `str` ]
98 Names of the columns.
99 rows : `list` of `tuple`
100 Result rows.
101
102 Returns
103 -------
104 catalog : `pandas.DataFrame`
105 DataFrame with the result set.
106
107 Notes
108 -----
109 When using this method as row factory for Cassandra, the resulting
110 DataFrame should be accessed in a non-standard way using
111 `ResultSet._current_rows` attribute.
112 """
113 return pandas.DataFrame.from_records(rows, columns=colnames)
114
115
117 colnames: List[str], rows: List[Tuple]
118) -> Tuple[List[str], List[Tuple]]:
119 """Special non-standard row factory that makes 2-element tuple containing
120 unmodified data: list of column names and list of rows.
121
122 Parameters
123 ----------
124 colnames : `list` [ `str` ]
125 Names of the columns.
126 rows : `list` of `tuple`
127 Result rows.
128
129 Returns
130 -------
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
134 Result rows
135
136 Notes
137 -----
138 When using this method as row factory for Cassandra, the resulting
139 2-element tuple should be accessed in a non-standard way using
140 `ResultSet._current_rows` attribute. This factory is used to build
141 pandas DataFrames in `select_concurrent` method.
142 """
143 return (colnames, rows)
144
145
147 session: Session, statements: List[Tuple], execution_profile: str, concurrency: int
148) -> Union[pandas.DataFrame, List]:
149 """Execute bunch of queries concurrently and merge their results into
150 a single result.
151
152 Parameters
153 ----------
154 statements : `list` [ `tuple` ]
155 List of statements and their parameters, passed directly to
156 ``execute_concurrent()``.
157 execution_profile : `str`
158 Execution profile name.
159
160 Returns
161 -------
162 result
163 Combined result of multiple statements, type of the result depends on
164 specific row factory defined in execution profile. If row factory is
165 one of `pandas_dataframe_factory` or `raw_data_factory` then pandas
166 DataFrame is created from a combined result. Otherwise a list of
167 rows is returned, type of each row is determined by the row factory.
168
169 Notes
170 -----
171 This method can raise any exception that is raised by one of the provided
172 statements.
173 """
174 session_wrap = SessionWrapper(session, execution_profile)
175 results = execute_concurrent(
176 session_wrap,
177 statements,
178 results_generator=True,
179 raise_on_first_error=False,
180 concurrency=concurrency,
181 )
182
183 ep = session.get_execution_profile(execution_profile)
184 if ep.row_factory is raw_data_factory:
185
186 # Collect rows into a single list and build Dataframe out of that
187 _LOG.debug("making pandas data frame out of rows/columns")
188 columns: Any = None
189 rows = []
190 for success, result in results:
191 if success:
192 result = result._current_rows
193 if columns is None:
194 columns = result[0]
195 elif columns != result[0]:
196 _LOG.error(
197 "different columns returned by queries: %s and %s",
198 columns,
199 result[0],
200 )
201 raise ValueError(
202 f"different columns returned by queries: {columns} and {result[0]}"
203 )
204 rows += result[1]
205 else:
206 _LOG.error("error returned by query: %s", result)
207 raise result
208 catalog = pandas_dataframe_factory(columns, rows)
209 _LOG.debug("pandas catalog shape: %s", catalog.shape)
210 return catalog
211
212 elif ep.row_factory is pandas_dataframe_factory:
213
214 # Merge multiple DataFrames into one
215 _LOG.debug("making pandas data frame out of set of data frames")
216 dataframes = []
217 for success, result in results:
218 if success:
219 dataframes.append(result._current_rows)
220 else:
221 _LOG.error("error returned by query: %s", result)
222 raise result
223 # concatenate all frames
224 if len(dataframes) == 1:
225 catalog = dataframes[0]
226 else:
227 catalog = pandas.concat(dataframes)
228 _LOG.debug("pandas catalog shape: %s", catalog.shape)
229 return catalog
230
231 else:
232
233 # Just concatenate all rows into a single collection.
234 rows = []
235 for success, result in results:
236 if success:
237 rows.extend(result)
238 else:
239 _LOG.error("error returned by query: %s", result)
240 raise result
241 _LOG.debug("number of rows: %s", len(rows))
242 return rows
243
244
245def literal(v: Any) -> Any:
246 """Transform object into a value for the query."""
247 if v is None:
248 pass
249 elif isinstance(v, datetime):
250 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
251 elif isinstance(v, (bytes, str)):
252 pass
253 else:
254 try:
255 if not np.isfinite(v):
256 v = None
257 except TypeError:
258 pass
259 return v
260
261
262def quote_id(columnName: str) -> str:
263 """Smart quoting for column names. Lower-case names are not quoted."""
264 if not columnName.islower():
265 columnName = '"' + columnName + '"'
266 return columnName
table::Key< int > to
table::Key< int > a
Any submit(self, *Any args, **Any kwargs)
Any execute_async(self, *Any args, Any execution_profile=EXEC_PROFILE_DEFAULT, **Any kwargs)
def __init__(self, Session session, Any execution_profile=EXEC_PROFILE_DEFAULT)
daf::base::PropertySet * set
Definition: fits.cc:912
str quote_id(str columnName)
Union[pandas.DataFrame, List] select_concurrent(Session session, List[Tuple] statements, str execution_profile, int concurrency)
Tuple[List[str], List[Tuple]] raw_data_factory(List[str] colnames, List[Tuple] rows)
pandas.DataFrame pandas_dataframe_factory(List[str] colnames, List[Tuple] rows)