185) -> pandas.DataFrame | ApdbCassandraTableData | list:
186 """Execute bunch of queries concurrently and merge their results into
187 a single result.
188
189 Parameters
190 ----------
191 statements : `list` [ `tuple` ]
192 List of statements and their parameters, passed directly to
193 ``execute_concurrent()``.
194 execution_profile : `str`
195 Execution profile name.
196
197 Returns
198 -------
199 result
200 Combined result of multiple statements, type of the result depends on
201 specific row factory defined in execution profile. If row factory is
202 `pandas_dataframe_factory` then pandas DataFrame is created from a
203 combined result. If row factory is `raw_data_factory` then
204 `ApdbCassandraTableData` is built from all records. Otherwise a list of
205 rows is returned, type of each row is determined by the row factory.
206
207 Notes
208 -----
209 This method can raise any exception that is raised by one of the provided
210 statements.
211 """
212 session_wrap = SessionWrapper(session, execution_profile)
213 results = execute_concurrent(
214 session_wrap,
215 statements,
216 results_generator=True,
217 raise_on_first_error=False,
218 concurrency=concurrency,
219 )
220
221 ep = session.get_execution_profile(execution_profile)
222 if ep.row_factory is raw_data_factory:
223
224 _LOG.debug("making pandas data frame out of rows/columns")
225 table_data: ApdbCassandraTableData | None = None
226 for success, result in results:
227 if success:
228 data = result._current_rows
229 assert isinstance(data, ApdbCassandraTableData)
230 if table_data is None:
231 table_data = data
232 else:
233 table_data.append(data)
234 else:
235 _LOG.error("error returned by query: %s", result)
236 raise result
237 if table_data is None:
238 table_data = ApdbCassandraTableData([], [])
239 return table_data
240
241 elif ep.row_factory is pandas_dataframe_factory:
242
243 _LOG.debug("making pandas data frame out of set of data frames")
244 dataframes = []
245 for success, result in results:
246 if success:
247 dataframes.append(result._current_rows)
248 else:
249 _LOG.error("error returned by query: %s", result)
250 raise result
251
252 non_empty = [df for df in dataframes if not df.empty]
253 if not non_empty:
254
255 catalog = dataframes[0]
256 elif len(non_empty) == 1:
257 catalog = non_empty[0]
258 else:
259 catalog = pandas.concat(non_empty)
260 _LOG.debug("pandas catalog shape: %s", catalog.shape)
261 return catalog
262
263 else:
264
265 rows = []
266 for success, result in results:
267 if success:
268 rows.extend(result)
269 else:
270 _LOG.error("error returned by query: %s", result)
271 raise result
272 _LOG.debug("number of rows: %s", len(rows))
273 return rows
274
275