214) -> pandas.DataFrame | ApdbCassandraTableData | list:
215 """Execute bunch of queries concurrently and merge their results into
216 a single result.
217
218 Parameters
219 ----------
220 statements : `list` [ `tuple` ]
221 List of statements and their parameters, passed directly to
222 ``execute_concurrent()``.
223 execution_profile : `str`
224 Execution profile name.
225
226 Returns
227 -------
228 result
229 Combined result of multiple statements, type of the result depends on
230 specific row factory defined in execution profile. If row factory is
231 `pandas_dataframe_factory` then pandas DataFrame is created from a
232 combined result. If row factory is `raw_data_factory` then
233 `ApdbCassandraTableData` is built from all records. Otherwise a list of
234 rows is returned, type of each row is determined by the row factory.
235
236 Notes
237 -----
238 This method can raise any exception that is raised by one of the provided
239 statements.
240 """
241 results = cassandra.concurrent.execute_concurrent(
242 session,
243 statements,
244 results_generator=True,
245 raise_on_first_error=False,
246 concurrency=concurrency,
247 execution_profile=execution_profile,
248 )
249
250 ep = session.get_execution_profile(execution_profile)
251 if ep.row_factory is raw_data_factory:
252
253 _LOG.debug("making raw data out of rows/columns")
254 table_data: ApdbCassandraTableData | None = None
255 for success, result in results:
256 if success:
257 data = result._current_rows
258 assert isinstance(data, ApdbCassandraTableData)
259 if table_data is None:
260 table_data = data
261 else:
262 table_data.append(data)
263 else:
264 _LOG.error("error returned by query: %s", result)
265 raise result
266 if table_data is None:
267 table_data = ApdbCassandraTableData([], [])
268 return table_data
269
270 elif ep.row_factory is pandas_dataframe_factory:
271
272 _LOG.debug("making pandas data frame out of set of data frames")
273 dataframes = []
274 for success, result in results:
275 if success:
276 dataframes.append(result._current_rows)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280
281 non_empty = [df for df in dataframes if not df.empty]
282 if not non_empty:
283
284 catalog = dataframes[0]
285 elif len(non_empty) == 1:
286 catalog = non_empty[0]
287 else:
288 catalog = pandas.concat(non_empty)
289 _LOG.debug("pandas catalog shape: %s", catalog.shape)
290 return catalog
291
292 else:
293
294 rows = []
295 for success, result in results:
296 if success:
297 rows.extend(result)
298 else:
299 _LOG.error("error returned by query: %s", result)
300 raise result
301 _LOG.debug("number of rows: %s", len(rows))
302 return rows
303
304