LSST Applications g042eb84c57+730a74494b,g04e9c324dd+8c5ae1fdc5,g134cb467dc+1f1e3e7524,g199a45376c+0ba108daf9,g1fd858c14a+fa7d31856b,g210f2d0738+f66ac109ec,g262e1987ae+83a3acc0e5,g29ae962dfc+d856a2cb1f,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+a1e0c9f713,g47891489e3+0d594cb711,g4d44eb3520+c57ec8f3ed,g4d7b6aa1c5+f66ac109ec,g53246c7159+8c5ae1fdc5,g56a1a4eaf3+fd7ad03fde,g64539dfbff+f66ac109ec,g67b6fd64d1+0d594cb711,g67fd3c3899+f66ac109ec,g6985122a63+0d594cb711,g74acd417e5+3098891321,g786e29fd12+668abc6043,g81db2e9a8d+98e2ab9f28,g87389fa792+8856018cbb,g89139ef638+0d594cb711,g8d7436a09f+80fda9ce03,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+a8a29bda4b,g99822b682c+e3ec3c61f9,g9d5c6a246b+0d5dac0c3d,ga41d0fce20+9243b26dd2,gbf99507273+8c5ae1fdc5,gd7ef33dd92+0d594cb711,gdab6d2f7ff+3098891321,ge410e46f29+0d594cb711,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.38
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
None execute_concurrent (Session session, list[tuple] statements, *, object execution_profile=EXEC_PROFILE_DEFAULT, int concurrency=100)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 EXEC_PROFILE_DEFAULT = object()
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ execute_concurrent()

None lsst.dax.apdb.cassandra.cassandra_utils.execute_concurrent ( Session session,
list[tuple] statements,
* ,
object execution_profile = EXEC_PROFILE_DEFAULT,
int concurrency = 100 )
Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
importing cassandra in other modules.

Definition at line 194 of file cassandra_utils.py.

200) -> None:
201 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
202 importing cassandra in other modules.
203 """
204 cassandra.concurrent.execute_concurrent(
205 session,
206 statements,
207 concurrency=concurrency,
208 execution_profile=execution_profile,
209 )
210
211

◆ literal()

Any lsst.dax.apdb.cassandra.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 305 of file cassandra_utils.py.

305def literal(v: Any) -> Any:
306 """Transform object into a value for the query."""
307 if v is None or v is pandas.NA:
308 v = None
309 elif isinstance(v, datetime):
310 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
311 elif isinstance(v, bytes | str | UUID | int):
312 pass
313 else:
314 try:
315 if not np.isfinite(v):
316 v = None
317 except TypeError:
318 pass
319 return v
320
321

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 145 of file cassandra_utils.py.

145def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
146 """Create pandas DataFrame from Cassandra result set.
147
148 Parameters
149 ----------
150 colnames : `list` [ `str` ]
151 Names of the columns.
152 rows : `list` of `tuple`
153 Result rows.
154
155 Returns
156 -------
157 catalog : `pandas.DataFrame`
158 DataFrame with the result set.
159
160 Notes
161 -----
162 When using this method as row factory for Cassandra, the resulting
163 DataFrame should be accessed in a non-standard way using
164 `ResultSet._current_rows` attribute.
165 """
166 return pandas.DataFrame.from_records(rows, columns=colnames)
167
168

◆ quote_id()

str lsst.dax.apdb.cassandra.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 322 of file cassandra_utils.py.

322def quote_id(columnName: str) -> str:
323 """Smart quoting for column names. Lower-case names are not quoted."""
324 if not columnName.islower():
325 columnName = '"' + columnName + '"'
326 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 169 of file cassandra_utils.py.

169def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
170 """Make 2-element tuple containing unmodified data: list of column names
171 and list of rows.
172
173 Parameters
174 ----------
175 colnames : `list` [ `str` ]
176 Names of the columns.
177 rows : `list` of `tuple`
178 Result rows.
179
180 Returns
181 -------
182 data : `ApdbCassandraTableData`
183 Input data wrapped into ApdbCassandraTableData.
184
185 Notes
186 -----
187 When using this method as row factory for Cassandra, the resulting
188 object should be accessed in a non-standard way using
189 `ResultSet._current_rows` attribute.
190 """
191 return ApdbCassandraTableData(colnames, rows)
192
193

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 212 of file cassandra_utils.py.

214) -> pandas.DataFrame | ApdbCassandraTableData | list:
215 """Execute bunch of queries concurrently and merge their results into
216 a single result.
217
218 Parameters
219 ----------
220 statements : `list` [ `tuple` ]
221 List of statements and their parameters, passed directly to
222 ``execute_concurrent()``.
223 execution_profile : `str`
224 Execution profile name.
225
226 Returns
227 -------
228 result
229 Combined result of multiple statements, type of the result depends on
230 specific row factory defined in execution profile. If row factory is
231 `pandas_dataframe_factory` then pandas DataFrame is created from a
232 combined result. If row factory is `raw_data_factory` then
233 `ApdbCassandraTableData` is built from all records. Otherwise a list of
234 rows is returned, type of each row is determined by the row factory.
235
236 Notes
237 -----
238 This method can raise any exception that is raised by one of the provided
239 statements.
240 """
241 results = cassandra.concurrent.execute_concurrent(
242 session,
243 statements,
244 results_generator=True,
245 raise_on_first_error=False,
246 concurrency=concurrency,
247 execution_profile=execution_profile,
248 )
249
250 ep = session.get_execution_profile(execution_profile)
251 if ep.row_factory is raw_data_factory:
252 # Collect rows into a single list and build Dataframe out of that
253 _LOG.debug("making raw data out of rows/columns")
254 table_data: ApdbCassandraTableData | None = None
255 for success, result in results:
256 if success:
257 data = result._current_rows
258 assert isinstance(data, ApdbCassandraTableData)
259 if table_data is None:
260 table_data = data
261 else:
262 table_data.append(data)
263 else:
264 _LOG.error("error returned by query: %s", result)
265 raise result
266 if table_data is None:
267 table_data = ApdbCassandraTableData([], [])
268 return table_data
269
270 elif ep.row_factory is pandas_dataframe_factory:
271 # Merge multiple DataFrames into one
272 _LOG.debug("making pandas data frame out of set of data frames")
273 dataframes = []
274 for success, result in results:
275 if success:
276 dataframes.append(result._current_rows)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 # Concatenate all frames, but skip empty ones.
281 non_empty = [df for df in dataframes if not df.empty]
282 if not non_empty:
283 # If all frames are empty, return the first one.
284 catalog = dataframes[0]
285 elif len(non_empty) == 1:
286 catalog = non_empty[0]
287 else:
288 catalog = pandas.concat(non_empty)
289 _LOG.debug("pandas catalog shape: %s", catalog.shape)
290 return catalog
291
292 else:
293 # Just concatenate all rows into a single collection.
294 rows = []
295 for success, result in results:
296 if success:
297 rows.extend(result)
298 else:
299 _LOG.error("error returned by query: %s", result)
300 raise result
301 _LOG.debug("number of rows: %s", len(rows))
302 return rows
303
304

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 58 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 51 of file cassandra_utils.py.

◆ EXEC_PROFILE_DEFAULT

lsst.dax.apdb.cassandra.cassandra_utils.EXEC_PROFILE_DEFAULT = object()

Definition at line 54 of file cassandra_utils.py.