22from __future__
import annotations
26 "pandas_dataframe_factory",
35from datetime
import datetime, timedelta
36from typing
import Any, List, Tuple, Union
41 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Session
42 from cassandra.concurrent
import execute_concurrent
44 CASSANDRA_IMPORTED =
True
46 CASSANDRA_IMPORTED =
False
49_LOG = logging.getLogger(__name__)
55 """Special wrapper class to workaround ``execute_concurrent()`` issue
56 which does not allow non-default execution profile.
58 Instance of this
class can be passed
to execute_concurrent() instead
59 of `Session` instance. This
class implements
a small
set of methods
60 that are needed by ``execute_concurrent()``. When
61 ``execute_concurrent()``
is fixed to accept exectution profiles, this
62 wrapper can be dropped.
66 self, session: Session, execution_profile: Any = EXEC_PROFILE_DEFAULT
74 execution_profile: Any = EXEC_PROFILE_DEFAULT,
78 if execution_profile
is EXEC_PROFILE_DEFAULT:
81 *args, execution_profile=execution_profile, **kwargs
84 def submit(self, *args: Any, **kwargs: Any) -> Any:
90 colnames: List[str], rows: List[Tuple]
92 """Special non-standard row factory that creates pandas DataFrame from
97 colnames : `list` [ `str` ]
99 rows : `list` of `tuple`
104 catalog : `pandas.DataFrame`
105 DataFrame with the result set.
109 When using this method
as row factory
for Cassandra, the resulting
110 DataFrame should be accessed
in a non-standard way using
111 `ResultSet._current_rows` attribute.
113 return pandas.DataFrame.from_records(rows, columns=colnames)
117 colnames: List[str], rows: List[Tuple]
118) -> Tuple[List[str], List[Tuple]]:
119 """Special non-standard row factory that makes 2-element tuple containing
120 unmodified data: list of column names and list of rows.
124 colnames : `list` [ `str` ]
125 Names of the columns.
126 rows : `list` of `tuple`
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
138 When using this method
as row factory
for Cassandra, the resulting
139 2-element tuple should be accessed
in a non-standard way using
140 `ResultSet._current_rows` attribute. This factory
is used to build
141 pandas DataFrames
in `select_concurrent` method.
143 return (colnames, rows)
147 session: Session, statements: List[Tuple], execution_profile: str, concurrency: int
148) -> Union[pandas.DataFrame, List]:
149 """Execute bunch of queries concurrently and merge their results into
154 statements : `list` [ `tuple` ]
155 List of statements and their parameters, passed directly to
156 ``execute_concurrent()``.
157 execution_profile : `str`
158 Execution profile name.
163 Combined result of multiple statements, type of the result depends on
164 specific row factory defined
in execution profile. If row factory
is
165 one of `pandas_dataframe_factory`
or `raw_data_factory` then pandas
166 DataFrame
is created
from a combined result. Otherwise a list of
167 rows
is returned, type of each row
is determined by the row factory.
171 This method can
raise any exception that
is raised by one of the provided
175 results = execute_concurrent(
178 results_generator=True,
179 raise_on_first_error=
False,
180 concurrency=concurrency,
183 ep = session.get_execution_profile(execution_profile)
184 if ep.row_factory
is raw_data_factory:
187 _LOG.debug(
"making pandas data frame out of rows/columns")
190 for success, result
in results:
192 result = result._current_rows
195 elif columns != result[0]:
197 "different columns returned by queries: %s and %s",
202 f
"different columns returned by queries: {columns} and {result[0]}"
206 _LOG.error(
"error returned by query: %s", result)
209 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
212 elif ep.row_factory
is pandas_dataframe_factory:
215 _LOG.debug(
"making pandas data frame out of set of data frames")
217 for success, result
in results:
219 dataframes.append(result._current_rows)
221 _LOG.error(
"error returned by query: %s", result)
224 if len(dataframes) == 1:
225 catalog = dataframes[0]
227 catalog = pandas.concat(dataframes)
228 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
235 for success, result
in results:
239 _LOG.error(
"error returned by query: %s", result)
241 _LOG.debug(
"number of rows: %s", len(rows))
246 """Transform object into a value for the query."""
249 elif isinstance(v, datetime):
250 v =
int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
251 elif isinstance(v, (bytes, str)):
255 if not np.isfinite(v):
263 """Smart quoting for column names. Lower-case names are not quoted."""
264 if not columnName.islower():
265 columnName =
'"' + columnName +
'"'
Any submit(self, *Any args, **Any kwargs)
Any execute_async(self, *Any args, Any execution_profile=EXEC_PROFILE_DEFAULT, **Any kwargs)
def __init__(self, Session session, Any execution_profile=EXEC_PROFILE_DEFAULT)
daf::base::PropertySet * set
str quote_id(str columnName)
Union[pandas.DataFrame, List] select_concurrent(Session session, List[Tuple] statements, str execution_profile, int concurrency)
Tuple[List[str], List[Tuple]] raw_data_factory(List[str] colnames, List[Tuple] rows)
pandas.DataFrame pandas_dataframe_factory(List[str] colnames, List[Tuple] rows)