LSST Applications g0603fd7c41+501e3db9f9,g0aad566f14+23d8574c86,g0dd44d6229+a1a4c8b791,g2079a07aa2+86d27d4dc4,g2305ad1205+a62672bbc1,g2bbee38e9b+047b288a59,g337abbeb29+047b288a59,g33d1c0ed96+047b288a59,g3a166c0a6a+047b288a59,g3d1719c13e+23d8574c86,g487adcacf7+cb7fd919b2,g4be5004598+23d8574c86,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+4a9e435310,g63cd9335cc+585e252eca,g858d7b2824+23d8574c86,g88963caddf+0cb8e002cc,g99cad8db69+43388bcaec,g9ddcbc5298+9a081db1e4,ga1e77700b3+a912195c07,gae0086650b+585e252eca,gb0e22166c9+60f28cb32d,gb2522980b2+793639e996,gb3a676b8dc+b4feba26a1,gb4b16eec92+63f8520565,gba4ed39666+c2a2e4ac27,gbb8dafda3b+a5d255a82e,gc120e1dc64+d820f8acdb,gc28159a63d+047b288a59,gc3e9b769f7+f4f1cc6b50,gcf0d15dbbd+a1a4c8b791,gdaeeff99f8+f9a426f77a,gdb0af172c8+b6d5496702,ge79ae78c31+047b288a59,w.2024.19
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Functions | Variables
lsst.dax.apdb.cassandra.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 
class  SessionWrapper
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ literal()

Any lsst.dax.apdb.cassandra.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 276 of file cassandra_utils.py.

276def literal(v: Any) -> Any:
277 """Transform object into a value for the query."""
278 if v is None:
279 pass
280 elif isinstance(v, datetime):
281 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
282 elif isinstance(v, (bytes, str, UUID, int)):
283 pass
284 else:
285 try:
286 if not np.isfinite(v):
287 v = None
288 except TypeError:
289 pass
290 return v
291
292

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 134 of file cassandra_utils.py.

134def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
135 """Create pandas DataFrame from Cassandra result set.
136
137 Parameters
138 ----------
139 colnames : `list` [ `str` ]
140 Names of the columns.
141 rows : `list` of `tuple`
142 Result rows.
143
144 Returns
145 -------
146 catalog : `pandas.DataFrame`
147 DataFrame with the result set.
148
149 Notes
150 -----
151 When using this method as row factory for Cassandra, the resulting
152 DataFrame should be accessed in a non-standard way using
153 `ResultSet._current_rows` attribute.
154 """
155 return pandas.DataFrame.from_records(rows, columns=colnames)
156
157

◆ quote_id()

str lsst.dax.apdb.cassandra.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 293 of file cassandra_utils.py.

293def quote_id(columnName: str) -> str:
294 """Smart quoting for column names. Lower-case names are not quoted."""
295 if not columnName.islower():
296 columnName = '"' + columnName + '"'
297 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 158 of file cassandra_utils.py.

158def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
159 """Make 2-element tuple containing unmodified data: list of column names
160 and list of rows.
161
162 Parameters
163 ----------
164 colnames : `list` [ `str` ]
165 Names of the columns.
166 rows : `list` of `tuple`
167 Result rows.
168
169 Returns
170 -------
171 data : `ApdbCassandraTableData`
172 Input data wrapped into ApdbCassandraTableData.
173
174 Notes
175 -----
176 When using this method as row factory for Cassandra, the resulting
177 object should be accessed in a non-standard way using
178 `ResultSet._current_rows` attribute.
179 """
180 return ApdbCassandraTableData(colnames, rows)
181
182

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 183 of file cassandra_utils.py.

185) -> pandas.DataFrame | ApdbCassandraTableData | list:
186 """Execute bunch of queries concurrently and merge their results into
187 a single result.
188
189 Parameters
190 ----------
191 statements : `list` [ `tuple` ]
192 List of statements and their parameters, passed directly to
193 ``execute_concurrent()``.
194 execution_profile : `str`
195 Execution profile name.
196
197 Returns
198 -------
199 result
200 Combined result of multiple statements, type of the result depends on
201 specific row factory defined in execution profile. If row factory is
202 `pandas_dataframe_factory` then pandas DataFrame is created from a
203 combined result. If row factory is `raw_data_factory` then
204 `ApdbCassandraTableData` is built from all records. Otherwise a list of
205 rows is returned, type of each row is determined by the row factory.
206
207 Notes
208 -----
209 This method can raise any exception that is raised by one of the provided
210 statements.
211 """
212 session_wrap = SessionWrapper(session, execution_profile)
213 results = execute_concurrent(
214 session_wrap,
215 statements,
216 results_generator=True,
217 raise_on_first_error=False,
218 concurrency=concurrency,
219 )
220
221 ep = session.get_execution_profile(execution_profile)
222 if ep.row_factory is raw_data_factory:
223 # Collect rows into a single list and build Dataframe out of that
224 _LOG.debug("making pandas data frame out of rows/columns")
225 table_data: ApdbCassandraTableData | None = None
226 for success, result in results:
227 if success:
228 data = result._current_rows
229 assert isinstance(data, ApdbCassandraTableData)
230 if table_data is None:
231 table_data = data
232 else:
233 table_data.append(data)
234 else:
235 _LOG.error("error returned by query: %s", result)
236 raise result
237 if table_data is None:
238 table_data = ApdbCassandraTableData([], [])
239 return table_data
240
241 elif ep.row_factory is pandas_dataframe_factory:
242 # Merge multiple DataFrames into one
243 _LOG.debug("making pandas data frame out of set of data frames")
244 dataframes = []
245 for success, result in results:
246 if success:
247 dataframes.append(result._current_rows)
248 else:
249 _LOG.error("error returned by query: %s", result)
250 raise result
251 # Concatenate all frames, but skip empty ones.
252 non_empty = [df for df in dataframes if not df.empty]
253 if not non_empty:
254 # If all frames are empty, return the first one.
255 catalog = dataframes[0]
256 elif len(non_empty) == 1:
257 catalog = non_empty[0]
258 else:
259 catalog = pandas.concat(non_empty)
260 _LOG.debug("pandas catalog shape: %s", catalog.shape)
261 return catalog
262
263 else:
264 # Just concatenate all rows into a single collection.
265 rows = []
266 for success, result in results:
267 if success:
268 rows.extend(result)
269 else:
270 _LOG.error("error returned by query: %s", result)
271 raise result
272 _LOG.debug("number of rows: %s", len(rows))
273 return rows
274
275

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 56 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 50 of file cassandra_utils.py.