152) -> pandas.DataFrame | ApdbCassandraTableData | list:
153 """Execute bunch of queries concurrently and merge their results into
154 a single result.
155
156 Parameters
157 ----------
158 statements : `list` [ `tuple` ]
159 List of statements and their parameters, passed directly to
160 ``execute_concurrent()``.
161 execution_profile : `str`
162 Execution profile name.
163
164 Returns
165 -------
166 result
167 Combined result of multiple statements, type of the result depends on
168 specific row factory defined in execution profile. If row factory is
169 `pandas_dataframe_factory` then pandas DataFrame is created from a
170 combined result. If row factory is `raw_data_factory` then
171 `ApdbCassandraTableData` is built from all records. Otherwise a list of
172 rows is returned, type of each row is determined by the row factory.
173
174 Notes
175 -----
176 This method can raise any exception that is raised by one of the provided
177 statements.
178 """
179 results = execute_concurrent(
180 session,
181 statements,
182 results_generator=True,
183 raise_on_first_error=False,
184 concurrency=concurrency,
185 execution_profile=execution_profile,
186 )
187
188 ep = session.get_execution_profile(execution_profile)
189 if ep.row_factory is raw_data_factory:
190
191 _LOG.debug("making raw data out of rows/columns")
192 table_data: ApdbCassandraTableData | None = None
193 for success, result in results:
194 if success:
195 data = result._current_rows
196 assert isinstance(data, ApdbCassandraTableData)
197 if table_data is None:
198 table_data = data
199 else:
200 table_data.append(data)
201 else:
202 _LOG.error("error returned by query: %s", result)
203 raise result
204 if table_data is None:
205 table_data = ApdbCassandraTableData([], [])
206 return table_data
207
208 elif ep.row_factory is pandas_dataframe_factory:
209
210 _LOG.debug("making pandas data frame out of set of data frames")
211 dataframes = []
212 for success, result in results:
213 if success:
214 dataframes.append(result._current_rows)
215 else:
216 _LOG.error("error returned by query: %s", result)
217 raise result
218
219 non_empty = [df for df in dataframes if not df.empty]
220 if not non_empty:
221
222 catalog = dataframes[0]
223 elif len(non_empty) == 1:
224 catalog = non_empty[0]
225 else:
226 catalog = pandas.concat(non_empty)
227 _LOG.debug("pandas catalog shape: %s", catalog.shape)
228 return catalog
229
230 else:
231
232 rows = []
233 for success, result in results:
234 if success:
235 rows.extend(result)
236 else:
237 _LOG.error("error returned by query: %s", result)
238 raise result
239 _LOG.debug("number of rows: %s", len(rows))
240 return rows
241
242