Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0485b4d2cb+c8d56b10d4,g0fba68d861+c513f10703,g1ec0fe41b4+3e153770da,g1fd858c14a+fca1f36da6,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4d2262a081+73db4aee98,g53246c7159+8c5ae1fdc5,g56a49b3a55+8699aedcf1,g60b5630c4e+0e13f5d8e4,g65f11fe441+d5838b1573,g67b6fd64d1+035c836e50,g6ba5f4ee81+5218eae6c4,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+5e76f8a839,g89139ef638+035c836e50,g94187f82dc+0e13f5d8e4,g989de1cb63+035c836e50,g9d31334357+0e13f5d8e4,g9f33ca652e+190f04d267,gabe3b4be73+8856018cbb,gabf8522325+21619da9f3,gb1101e3267+e14fd59f59,gb89ab40317+035c836e50,gc91f06edcd+a47578e525,gcef618a4dd+0e13f5d8e4,gcf25f946ba+5e76f8a839,gd6cbbdb0b4+958adf5c1f,gdb8242c116+18fb55f1cc,gde0f65d7ad+fe97e93be4,ge278dab8ac+83c63f4893,ge410e46f29+035c836e50,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf67bdafdda+035c836e50,gf6800124b1+a9633b82fa,w.2025.19
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
lsst.dax.apdb.cassandra.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
None execute_concurrent (Session session, list[tuple] statements, *, object execution_profile=EXEC_PROFILE_DEFAULT, int concurrency=100)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 EXEC_PROFILE_DEFAULT = object()
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ execute_concurrent()

None lsst.dax.apdb.cassandra.cassandra_utils.execute_concurrent ( Session session,
list[tuple] statements,
* ,
object execution_profile = EXEC_PROFILE_DEFAULT,
int concurrency = 100 )
Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
importing cassandra in other modules.

Definition at line 173 of file cassandra_utils.py.

179) -> None:
180 """Wrapp call to `cassandra.concurrent.execute_concurrent` to avoid
181 importing cassandra in other modules.
182 """
183 cassandra.concurrent.execute_concurrent(
184 session,
185 statements,
186 concurrency=concurrency,
187 execution_profile=execution_profile,
188 )
189
190

◆ literal()

Any lsst.dax.apdb.cassandra.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 284 of file cassandra_utils.py.

284def literal(v: Any) -> Any:
285 """Transform object into a value for the query."""
286 if v is None:
287 pass
288 elif isinstance(v, datetime):
289 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
290 elif isinstance(v, (bytes, str, UUID, int)):
291 pass
292 else:
293 try:
294 if not np.isfinite(v):
295 v = None
296 except TypeError:
297 pass
298 return v
299
300

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 124 of file cassandra_utils.py.

124def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
125 """Create pandas DataFrame from Cassandra result set.
126
127 Parameters
128 ----------
129 colnames : `list` [ `str` ]
130 Names of the columns.
131 rows : `list` of `tuple`
132 Result rows.
133
134 Returns
135 -------
136 catalog : `pandas.DataFrame`
137 DataFrame with the result set.
138
139 Notes
140 -----
141 When using this method as row factory for Cassandra, the resulting
142 DataFrame should be accessed in a non-standard way using
143 `ResultSet._current_rows` attribute.
144 """
145 return pandas.DataFrame.from_records(rows, columns=colnames)
146
147

◆ quote_id()

str lsst.dax.apdb.cassandra.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 301 of file cassandra_utils.py.

301def quote_id(columnName: str) -> str:
302 """Smart quoting for column names. Lower-case names are not quoted."""
303 if not columnName.islower():
304 columnName = '"' + columnName + '"'
305 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 148 of file cassandra_utils.py.

148def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
149 """Make 2-element tuple containing unmodified data: list of column names
150 and list of rows.
151
152 Parameters
153 ----------
154 colnames : `list` [ `str` ]
155 Names of the columns.
156 rows : `list` of `tuple`
157 Result rows.
158
159 Returns
160 -------
161 data : `ApdbCassandraTableData`
162 Input data wrapped into ApdbCassandraTableData.
163
164 Notes
165 -----
166 When using this method as row factory for Cassandra, the resulting
167 object should be accessed in a non-standard way using
168 `ResultSet._current_rows` attribute.
169 """
170 return ApdbCassandraTableData(colnames, rows)
171
172

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 191 of file cassandra_utils.py.

193) -> pandas.DataFrame | ApdbCassandraTableData | list:
194 """Execute bunch of queries concurrently and merge their results into
195 a single result.
196
197 Parameters
198 ----------
199 statements : `list` [ `tuple` ]
200 List of statements and their parameters, passed directly to
201 ``execute_concurrent()``.
202 execution_profile : `str`
203 Execution profile name.
204
205 Returns
206 -------
207 result
208 Combined result of multiple statements, type of the result depends on
209 specific row factory defined in execution profile. If row factory is
210 `pandas_dataframe_factory` then pandas DataFrame is created from a
211 combined result. If row factory is `raw_data_factory` then
212 `ApdbCassandraTableData` is built from all records. Otherwise a list of
213 rows is returned, type of each row is determined by the row factory.
214
215 Notes
216 -----
217 This method can raise any exception that is raised by one of the provided
218 statements.
219 """
220 results = cassandra.concurrent.execute_concurrent(
221 session,
222 statements,
223 results_generator=True,
224 raise_on_first_error=False,
225 concurrency=concurrency,
226 execution_profile=execution_profile,
227 )
228
229 ep = session.get_execution_profile(execution_profile)
230 if ep.row_factory is raw_data_factory:
231 # Collect rows into a single list and build Dataframe out of that
232 _LOG.debug("making raw data out of rows/columns")
233 table_data: ApdbCassandraTableData | None = None
234 for success, result in results:
235 if success:
236 data = result._current_rows
237 assert isinstance(data, ApdbCassandraTableData)
238 if table_data is None:
239 table_data = data
240 else:
241 table_data.append(data)
242 else:
243 _LOG.error("error returned by query: %s", result)
244 raise result
245 if table_data is None:
246 table_data = ApdbCassandraTableData([], [])
247 return table_data
248
249 elif ep.row_factory is pandas_dataframe_factory:
250 # Merge multiple DataFrames into one
251 _LOG.debug("making pandas data frame out of set of data frames")
252 dataframes = []
253 for success, result in results:
254 if success:
255 dataframes.append(result._current_rows)
256 else:
257 _LOG.error("error returned by query: %s", result)
258 raise result
259 # Concatenate all frames, but skip empty ones.
260 non_empty = [df for df in dataframes if not df.empty]
261 if not non_empty:
262 # If all frames are empty, return the first one.
263 catalog = dataframes[0]
264 elif len(non_empty) == 1:
265 catalog = non_empty[0]
266 else:
267 catalog = pandas.concat(non_empty)
268 _LOG.debug("pandas catalog shape: %s", catalog.shape)
269 return catalog
270
271 else:
272 # Just concatenate all rows into a single collection.
273 rows = []
274 for success, result in results:
275 if success:
276 rows.extend(result)
277 else:
278 _LOG.error("error returned by query: %s", result)
279 raise result
280 _LOG.debug("number of rows: %s", len(rows))
281 return rows
282
283

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 57 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 50 of file cassandra_utils.py.

◆ EXEC_PROFILE_DEFAULT

lsst.dax.apdb.cassandra.cassandra_utils.EXEC_PROFILE_DEFAULT = object()

Definition at line 53 of file cassandra_utils.py.