193) -> pandas.DataFrame | ApdbCassandraTableData | list:
194 """Execute bunch of queries concurrently and merge their results into
195 a single result.
196
197 Parameters
198 ----------
199 statements : `list` [ `tuple` ]
200 List of statements and their parameters, passed directly to
201 ``execute_concurrent()``.
202 execution_profile : `str`
203 Execution profile name.
204
205 Returns
206 -------
207 result
208 Combined result of multiple statements, type of the result depends on
209 specific row factory defined in execution profile. If row factory is
210 `pandas_dataframe_factory` then pandas DataFrame is created from a
211 combined result. If row factory is `raw_data_factory` then
212 `ApdbCassandraTableData` is built from all records. Otherwise a list of
213 rows is returned, type of each row is determined by the row factory.
214
215 Notes
216 -----
217 This method can raise any exception that is raised by one of the provided
218 statements.
219 """
220 results = cassandra.concurrent.execute_concurrent(
221 session,
222 statements,
223 results_generator=True,
224 raise_on_first_error=False,
225 concurrency=concurrency,
226 execution_profile=execution_profile,
227 )
228
229 ep = session.get_execution_profile(execution_profile)
230 if ep.row_factory is raw_data_factory:
231
232 _LOG.debug("making raw data out of rows/columns")
233 table_data: ApdbCassandraTableData | None = None
234 for success, result in results:
235 if success:
236 data = result._current_rows
237 assert isinstance(data, ApdbCassandraTableData)
238 if table_data is None:
239 table_data = data
240 else:
241 table_data.append(data)
242 else:
243 _LOG.error("error returned by query: %s", result)
244 raise result
245 if table_data is None:
246 table_data = ApdbCassandraTableData([], [])
247 return table_data
248
249 elif ep.row_factory is pandas_dataframe_factory:
250
251 _LOG.debug("making pandas data frame out of set of data frames")
252 dataframes = []
253 for success, result in results:
254 if success:
255 dataframes.append(result._current_rows)
256 else:
257 _LOG.error("error returned by query: %s", result)
258 raise result
259
260 non_empty = [df for df in dataframes if not df.empty]
261 if not non_empty:
262
263 catalog = dataframes[0]
264 elif len(non_empty) == 1:
265 catalog = non_empty[0]
266 else:
267 catalog = pandas.concat(non_empty)
268 _LOG.debug("pandas catalog shape: %s", catalog.shape)
269 return catalog
270
271 else:
272
273 rows = []
274 for success, result in results:
275 if success:
276 rows.extend(result)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 _LOG.debug("number of rows: %s", len(rows))
281 return rows
282
283