LSST Applications g0265f82a02+d6b5cd48b5,g02d81e74bb+80768bd682,g04242d3e92+8eaa23c173,g06b2ea86fd+734f9505a2,g2079a07aa2+14824f138e,g212a7c68fe+5f4fc2ea00,g2305ad1205+293ab1327e,g2bbee38e9b+d6b5cd48b5,g337abbeb29+d6b5cd48b5,g3ddfee87b4+8eaa23c173,g487adcacf7+abec5a19c5,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+97ef3b4495,g5a732f18d5+66d966b544,g5d7b63bc56+636c3c3fd8,g64a986408d+80768bd682,g858d7b2824+80768bd682,g8a8a8dda67+a6fc98d2e7,g99cad8db69+6282a5f541,g9ddcbc5298+d4bad12328,ga1e77700b3+246acaaf9c,ga8c6da7877+9e3c062e8e,gb0e22166c9+3863383f4c,gb6a65358fc+d6b5cd48b5,gba4ed39666+9664299f35,gbb8dafda3b+60f904e7bc,gc120e1dc64+1bf26d0180,gc28159a63d+d6b5cd48b5,gcf0d15dbbd+8eaa23c173,gd2a12a3803+f8351bc914,gdaeeff99f8+a38ce5ea23,ge79ae78c31+d6b5cd48b5,gee10cc3b42+a6fc98d2e7,gf1cff7945b+80768bd682,v24.1.5.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Functions | Variables
lsst.dax.apdb.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 
class  SessionWrapper
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ literal()

Any lsst.dax.apdb.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 276 of file cassandra_utils.py.

276def literal(v: Any) -> Any:
277 """Transform object into a value for the query."""
278 if v is None:
279 pass
280 elif isinstance(v, datetime):
281 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
282 elif isinstance(v, (bytes, str, UUID, int)):
283 pass
284 else:
285 try:
286 if not np.isfinite(v):
287 v = None
288 except TypeError:
289 pass
290 return v
291
292

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 134 of file cassandra_utils.py.

134def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
135 """Create pandas DataFrame from Cassandra result set.
136
137 Parameters
138 ----------
139 colnames : `list` [ `str` ]
140 Names of the columns.
141 rows : `list` of `tuple`
142 Result rows.
143
144 Returns
145 -------
146 catalog : `pandas.DataFrame`
147 DataFrame with the result set.
148
149 Notes
150 -----
151 When using this method as row factory for Cassandra, the resulting
152 DataFrame should be accessed in a non-standard way using
153 `ResultSet._current_rows` attribute.
154 """
155 return pandas.DataFrame.from_records(rows, columns=colnames)
156
157

◆ quote_id()

str lsst.dax.apdb.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 293 of file cassandra_utils.py.

293def quote_id(columnName: str) -> str:
294 """Smart quoting for column names. Lower-case names are not quoted."""
295 if not columnName.islower():
296 columnName = '"' + columnName + '"'
297 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 158 of file cassandra_utils.py.

158def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
159 """Make 2-element tuple containing unmodified data: list of column names
160 and list of rows.
161
162 Parameters
163 ----------
164 colnames : `list` [ `str` ]
165 Names of the columns.
166 rows : `list` of `tuple`
167 Result rows.
168
169 Returns
170 -------
171 data : `ApdbCassandraTableData`
172 Input data wrapped into ApdbCassandraTableData.
173
174 Notes
175 -----
176 When using this method as row factory for Cassandra, the resulting
177 object should be accessed in a non-standard way using
178 `ResultSet._current_rows` attribute.
179 """
180 return ApdbCassandraTableData(colnames, rows)
181
182

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 183 of file cassandra_utils.py.

185) -> pandas.DataFrame | ApdbCassandraTableData | list:
186 """Execute bunch of queries concurrently and merge their results into
187 a single result.
188
189 Parameters
190 ----------
191 statements : `list` [ `tuple` ]
192 List of statements and their parameters, passed directly to
193 ``execute_concurrent()``.
194 execution_profile : `str`
195 Execution profile name.
196
197 Returns
198 -------
199 result
200 Combined result of multiple statements, type of the result depends on
201 specific row factory defined in execution profile. If row factory is
202 `pandas_dataframe_factory` then pandas DataFrame is created from a
203 combined result. If row factory is `raw_data_factory` then
204 `ApdbCassandraTableData` is built from all records. Otherwise a list of
205 rows is returned, type of each row is determined by the row factory.
206
207 Notes
208 -----
209 This method can raise any exception that is raised by one of the provided
210 statements.
211 """
212 session_wrap = SessionWrapper(session, execution_profile)
213 results = execute_concurrent(
214 session_wrap,
215 statements,
216 results_generator=True,
217 raise_on_first_error=False,
218 concurrency=concurrency,
219 )
220
221 ep = session.get_execution_profile(execution_profile)
222 if ep.row_factory is raw_data_factory:
223 # Collect rows into a single list and build Dataframe out of that
224 _LOG.debug("making pandas data frame out of rows/columns")
225 table_data: ApdbCassandraTableData | None = None
226 for success, result in results:
227 if success:
228 data = result._current_rows
229 assert isinstance(data, ApdbCassandraTableData)
230 if table_data is None:
231 table_data = data
232 else:
233 table_data.append(data)
234 else:
235 _LOG.error("error returned by query: %s", result)
236 raise result
237 if table_data is None:
238 table_data = ApdbCassandraTableData([], [])
239 return table_data
240
241 elif ep.row_factory is pandas_dataframe_factory:
242 # Merge multiple DataFrames into one
243 _LOG.debug("making pandas data frame out of set of data frames")
244 dataframes = []
245 for success, result in results:
246 if success:
247 dataframes.append(result._current_rows)
248 else:
249 _LOG.error("error returned by query: %s", result)
250 raise result
251 # Concatenate all frames, but skip empty ones.
252 non_empty = [df for df in dataframes if not df.empty]
253 if not non_empty:
254 # If all frames are empty, return the first one.
255 catalog = dataframes[0]
256 elif len(non_empty) == 1:
257 catalog = non_empty[0]
258 else:
259 catalog = pandas.concat(non_empty)
260 _LOG.debug("pandas catalog shape: %s", catalog.shape)
261 return catalog
262
263 else:
264 # Just concatenate all rows into a single collection.
265 rows = []
266 for success, result in results:
267 if success:
268 rows.extend(result)
269 else:
270 _LOG.error("error returned by query: %s", result)
271 raise result
272 _LOG.debug("number of rows: %s", len(rows))
273 return rows
274
275

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 56 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 50 of file cassandra_utils.py.