22from __future__
import annotations
26 "pandas_dataframe_factory",
34from datetime
import datetime, timedelta
44 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Session
45 from cassandra.concurrent
import execute_concurrent
47 CASSANDRA_IMPORTED =
True
49 CASSANDRA_IMPORTED =
False
51from .apdb
import ApdbTableData
53_LOG = logging.getLogger(__name__)
59 """Special wrapper class to workaround ``execute_concurrent()`` issue
60 which does not allow non-default execution profile.
62 Instance of this
class can be passed
to execute_concurrent() instead
63 of `Session` instance. This
class implements
a small
set of methods
64 that are needed by ``execute_concurrent()``. When
65 ``execute_concurrent()``
is fixed to accept exectution profiles, this
66 wrapper can be dropped.
69 def __init__(self, session: Session, execution_profile: Any = EXEC_PROFILE_DEFAULT):
76 execution_profile: Any = EXEC_PROFILE_DEFAULT,
80 if execution_profile
is EXEC_PROFILE_DEFAULT:
84 def submit(self, *args: Any, **kwargs: Any) -> Any:
90 """Implementation of ApdbTableData that wraps Cassandra raw data."""
92 def __init__(self, columns: list[str], rows: list[tuple]):
100 def rows(self) -> Iterable[tuple]:
104 def append(self, other: ApdbCassandraTableData) ->
None:
105 """Extend rows in this table with rows in other table"""
107 raise ValueError(f
"Different columns returned by queries: {self._columns} and {other._columns}")
108 self.
_rows.extend(other._rows)
111 """Make it look like a row iterator, needed for some odd logic."""
112 return iter(self.
_rows)
115def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
116 """Special non-standard row factory that creates pandas DataFrame from
117 Cassandra result set.
121 colnames : `list` [ `str` ]
122 Names of the columns.
123 rows : `list` of `tuple`
128 catalog : `pandas.DataFrame`
129 DataFrame with the result set.
133 When using this method
as row factory
for Cassandra, the resulting
134 DataFrame should be accessed
in a non-standard way using
135 `ResultSet._current_rows` attribute.
137 return pandas.DataFrame.from_records(rows, columns=colnames)
140def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
141 """Special non-standard row factory that makes 2-element tuple containing
142 unmodified data: list of column names and list of rows.
146 colnames : `list` [ `str` ]
147 Names of the columns.
148 rows : `list` of `tuple`
153 data : `ApdbCassandraTableData`
154 Input data wrapped into ApdbCassandraTableData.
158 When using this method
as row factory
for Cassandra, the resulting
159 object should be accessed
in a non-standard way using
160 `ResultSet._current_rows` attribute.
165def select_concurrent(
166 session: Session, statements: list[tuple], execution_profile: str, concurrency: int
167) -> pandas.DataFrame | ApdbCassandraTableData | list:
168 """Execute bunch of queries concurrently and merge their results into
173 statements : `list` [ `tuple` ]
174 List of statements and their parameters, passed directly to
175 ``execute_concurrent()``.
176 execution_profile : `str`
177 Execution profile name.
182 Combined result of multiple statements, type of the result depends on
183 specific row factory defined
in execution profile. If row factory
is
184 `pandas_dataframe_factory` then pandas DataFrame
is created
from a
185 combined result. If row factory
is `raw_data_factory` then
186 `ApdbCassandraTableData`
is built
from all records. Otherwise a list of
187 rows
is returned, type of each row
is determined by the row factory.
191 This method can
raise any exception that
is raised by one of the provided
195 results = execute_concurrent(
198 results_generator=True,
199 raise_on_first_error=
False,
200 concurrency=concurrency,
203 ep = session.get_execution_profile(execution_profile)
204 if ep.row_factory
is raw_data_factory:
206 _LOG.debug(
"making pandas data frame out of rows/columns")
207 table_data: ApdbCassandraTableData |
None =
None
208 for success, result
in results:
210 data = result._current_rows
211 assert isinstance(data, ApdbCassandraTableData)
212 if table_data
is None:
215 table_data.append(data)
217 _LOG.error(
"error returned by query: %s", result)
219 if table_data
is None:
223 elif ep.row_factory
is pandas_dataframe_factory:
225 _LOG.debug(
"making pandas data frame out of set of data frames")
227 for success, result
in results:
229 dataframes.append(result._current_rows)
231 _LOG.error(
"error returned by query: %s", result)
234 if len(dataframes) == 1:
235 catalog = dataframes[0]
237 catalog = pandas.concat(dataframes)
238 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
244 for success, result
in results:
248 _LOG.error(
"error returned by query: %s", result)
250 _LOG.debug(
"number of rows: %s", len(rows))
254def literal(v: Any) -> Any:
255 """Transform object into a value for the query."""
258 elif isinstance(v, datetime):
259 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
260 elif isinstance(v, (bytes, str, UUID, int)):
264 if not np.isfinite(v):
271def quote_id(columnName: str) -> str:
272 """Smart quoting for column names. Lower-case names are not quoted."""
273 if not columnName.islower():
274 columnName =
'"' + columnName +
'"'
None append(self, ApdbCassandraTableData other)
Iterator[tuple] __iter__(self)
__init__(self, list[str] columns, list[tuple] rows)
list[str] column_names(self)
Iterable[tuple] rows(self)
Any submit(self, *Any args, **Any kwargs)
Any execute_async(self, *Any args, Any execution_profile=EXEC_PROFILE_DEFAULT, **Any kwargs)
__init__(self, Session session, Any execution_profile=EXEC_PROFILE_DEFAULT)
daf::base::PropertySet * set