LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
None execute_concurrent (Session session, list[tuple] statements, *, object execution_profile=EXEC_PROFILE_DEFAULT, int concurrency=100)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 EXEC_PROFILE_DEFAULT = object()
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ execute_concurrent()

None lsst.dax.apdb.cassandra.cassandra_utils.execute_concurrent ( Session session,
list[tuple] statements,
* ,
object execution_profile = EXEC_PROFILE_DEFAULT,
int concurrency = 100 )
Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
importing cassandra in other modules.

Definition at line 194 of file cassandra_utils.py.

200) -> None:
201 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
202 importing cassandra in other modules.
203 """
204 cassandra.concurrent.execute_concurrent(
205 session,
206 statements,
207 concurrency=concurrency,
208 execution_profile=execution_profile,
209 )
210
211

◆ literal()

Any lsst.dax.apdb.cassandra.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 305 of file cassandra_utils.py.

305def literal(v: Any) -> Any:
306 """Transform object into a value for the query."""
307 if v is None or v is pandas.NA:
308 v = None
309 elif isinstance(v, datetime):
310 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
311 elif isinstance(v, bytes | str | UUID | int):
312 pass
313 else:
314 try:
315 if not np.isfinite(v):
316 v = None
317 except TypeError:
318 pass
319 return v
320
321

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 145 of file cassandra_utils.py.

145def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
146 """Create pandas DataFrame from Cassandra result set.
147
148 Parameters
149 ----------
150 colnames : `list` [ `str` ]
151 Names of the columns.
152 rows : `list` of `tuple`
153 Result rows.
154
155 Returns
156 -------
157 catalog : `pandas.DataFrame`
158 DataFrame with the result set.
159
160 Notes
161 -----
162 When using this method as row factory for Cassandra, the resulting
163 DataFrame should be accessed in a non-standard way using
164 `ResultSet._current_rows` attribute.
165 """
166 return pandas.DataFrame.from_records(rows, columns=colnames)
167
168

◆ quote_id()

str lsst.dax.apdb.cassandra.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 322 of file cassandra_utils.py.

322def quote_id(columnName: str) -> str:
323 """Smart quoting for column names. Lower-case names are not quoted."""
324 if not columnName.islower():
325 columnName = '"' + columnName + '"'
326 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 169 of file cassandra_utils.py.

169def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
170 """Make 2-element tuple containing unmodified data: list of column names
171 and list of rows.
172
173 Parameters
174 ----------
175 colnames : `list` [ `str` ]
176 Names of the columns.
177 rows : `list` of `tuple`
178 Result rows.
179
180 Returns
181 -------
182 data : `ApdbCassandraTableData`
183 Input data wrapped into ApdbCassandraTableData.
184
185 Notes
186 -----
187 When using this method as row factory for Cassandra, the resulting
188 object should be accessed in a non-standard way using
189 `ResultSet._current_rows` attribute.
190 """
191 return ApdbCassandraTableData(colnames, rows)
192
193

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 212 of file cassandra_utils.py.

214) -> pandas.DataFrame | ApdbCassandraTableData | list:
215 """Execute bunch of queries concurrently and merge their results into
216 a single result.
217
218 Parameters
219 ----------
220 statements : `list` [ `tuple` ]
221 List of statements and their parameters, passed directly to
222 ``execute_concurrent()``.
223 execution_profile : `str`
224 Execution profile name.
225
226 Returns
227 -------
228 result
229 Combined result of multiple statements, type of the result depends on
230 specific row factory defined in execution profile. If row factory is
231 `pandas_dataframe_factory` then pandas DataFrame is created from a
232 combined result. If row factory is `raw_data_factory` then
233 `ApdbCassandraTableData` is built from all records. Otherwise a list of
234 rows is returned, type of each row is determined by the row factory.
235
236 Notes
237 -----
238 This method can raise any exception that is raised by one of the provided
239 statements.
240 """
241 results = cassandra.concurrent.execute_concurrent(
242 session,
243 statements,
244 results_generator=True,
245 raise_on_first_error=False,
246 concurrency=concurrency,
247 execution_profile=execution_profile,
248 )
249
250 ep = session.get_execution_profile(execution_profile)
251 if ep.row_factory is raw_data_factory:
252 # Collect rows into a single list and build Dataframe out of that
253 _LOG.debug("making raw data out of rows/columns")
254 table_data: ApdbCassandraTableData | None = None
255 for success, result in results:
256 if success:
257 data = result._current_rows
258 assert isinstance(data, ApdbCassandraTableData)
259 if table_data is None:
260 table_data = data
261 else:
262 table_data.append(data)
263 else:
264 _LOG.error("error returned by query: %s", result)
265 raise result
266 if table_data is None:
267 table_data = ApdbCassandraTableData([], [])
268 return table_data
269
270 elif ep.row_factory is pandas_dataframe_factory:
271 # Merge multiple DataFrames into one
272 _LOG.debug("making pandas data frame out of set of data frames")
273 dataframes = []
274 for success, result in results:
275 if success:
276 dataframes.append(result._current_rows)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 # Concatenate all frames, but skip empty ones.
281 non_empty = [df for df in dataframes if not df.empty]
282 if not non_empty:
283 # If all frames are empty, return the first one.
284 catalog = dataframes[0]
285 elif len(non_empty) == 1:
286 catalog = non_empty[0]
287 else:
288 catalog = pandas.concat(non_empty)
289 _LOG.debug("pandas catalog shape: %s", catalog.shape)
290 return catalog
291
292 else:
293 # Just concatenate all rows into a single collection.
294 rows = []
295 for success, result in results:
296 if success:
297 rows.extend(result)
298 else:
299 _LOG.error("error returned by query: %s", result)
300 raise result
301 _LOG.debug("number of rows: %s", len(rows))
302 return rows
303
304

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 58 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 51 of file cassandra_utils.py.

◆ EXEC_PROFILE_DEFAULT

lsst.dax.apdb.cassandra.cassandra_utils.EXEC_PROFILE_DEFAULT = object()

Definition at line 54 of file cassandra_utils.py.