LSST Applications g0f08755f38+c89d42e150,g1635faa6d4+b6cf076a36,g1653933729+a8ce1bb630,g1a0ca8cf93+4c08b13bf7,g28da252d5a+f33f8200ef,g29321ee8c0+0187be18b1,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+e740673f1a,g5fbc88fb19+17cd334064,g7642f7d749+c89d42e150,g781aacb6e4+a8ce1bb630,g80478fca09+f8b2ab54e1,g82479be7b0+e2bd23ab8b,g858d7b2824+c89d42e150,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+065360aec4,gacf8899fa4+9553554aa7,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gbd46683f8f+ac57cbb13d,gc28159a63d+9634bc57db,gcf0d15dbbd+e37acf7834,gda3e153d99+c89d42e150,gda6a2b7d83+e37acf7834,gdaeeff99f8+1711a396fd,ge2409df99d+cb1e6652d6,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+02b11634a5,w.2024.45
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Functions | Variables
lsst.dax.apdb.cassandra.cassandra_utils Namespace Reference

Classes

class  ApdbCassandraTableData
 
class  PreparedStatementCache
 

Functions

pandas.DataFrame pandas_dataframe_factory (list[str] colnames, list[tuple] rows)
 
ApdbCassandraTableData raw_data_factory (list[str] colnames, list[tuple] rows)
 
pandas.DataFrame|ApdbCassandraTableData|list select_concurrent (Session session, list[tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 
 _LOG = logging.getLogger(__name__)
 

Function Documentation

◆ literal()

Any lsst.dax.apdb.cassandra.cassandra_utils.literal ( Any v)
Transform object into a value for the query.

Definition at line 243 of file cassandra_utils.py.

243def literal(v: Any) -> Any:
244 """Transform object into a value for the query."""
245 if v is None:
246 pass
247 elif isinstance(v, datetime):
248 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1) * 1000)
249 elif isinstance(v, (bytes, str, UUID, int)):
250 pass
251 else:
252 try:
253 if not np.isfinite(v):
254 v = None
255 except TypeError:
256 pass
257 return v
258
259

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra.cassandra_utils.pandas_dataframe_factory ( list[str] colnames,
list[tuple] rows )
Create pandas DataFrame from Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 101 of file cassandra_utils.py.

101def pandas_dataframe_factory(colnames: list[str], rows: list[tuple]) -> pandas.DataFrame:
102 """Create pandas DataFrame from Cassandra result set.
103
104 Parameters
105 ----------
106 colnames : `list` [ `str` ]
107 Names of the columns.
108 rows : `list` of `tuple`
109 Result rows.
110
111 Returns
112 -------
113 catalog : `pandas.DataFrame`
114 DataFrame with the result set.
115
116 Notes
117 -----
118 When using this method as row factory for Cassandra, the resulting
119 DataFrame should be accessed in a non-standard way using
120 `ResultSet._current_rows` attribute.
121 """
122 return pandas.DataFrame.from_records(rows, columns=colnames)
123
124

◆ quote_id()

str lsst.dax.apdb.cassandra.cassandra_utils.quote_id ( str columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 260 of file cassandra_utils.py.

260def quote_id(columnName: str) -> str:
261 """Smart quoting for column names. Lower-case names are not quoted."""
262 if not columnName.islower():
263 columnName = '"' + columnName + '"'
264 return columnName

◆ raw_data_factory()

ApdbCassandraTableData lsst.dax.apdb.cassandra.cassandra_utils.raw_data_factory ( list[str] colnames,
list[tuple] rows )
Make 2-element tuple containing unmodified data: list of column names
and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
data : `ApdbCassandraTableData`
    Input data wrapped into ApdbCassandraTableData.

Notes
-----
When using this method as row factory for Cassandra, the resulting
object should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 125 of file cassandra_utils.py.

125def raw_data_factory(colnames: list[str], rows: list[tuple]) -> ApdbCassandraTableData:
126 """Make 2-element tuple containing unmodified data: list of column names
127 and list of rows.
128
129 Parameters
130 ----------
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
134 Result rows.
135
136 Returns
137 -------
138 data : `ApdbCassandraTableData`
139 Input data wrapped into ApdbCassandraTableData.
140
141 Notes
142 -----
143 When using this method as row factory for Cassandra, the resulting
144 object should be accessed in a non-standard way using
145 `ResultSet._current_rows` attribute.
146 """
147 return ApdbCassandraTableData(colnames, rows)
148
149

◆ select_concurrent()

pandas.DataFrame | ApdbCassandraTableData | list lsst.dax.apdb.cassandra.cassandra_utils.select_concurrent ( Session session,
list[tuple] statements,
str execution_profile,
int concurrency )
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    `pandas_dataframe_factory` then pandas DataFrame is created from a
    combined result. If row factory is `raw_data_factory` then
    `ApdbCassandraTableData` is built from all records. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 150 of file cassandra_utils.py.

152) -> pandas.DataFrame | ApdbCassandraTableData | list:
153 """Execute bunch of queries concurrently and merge their results into
154 a single result.
155
156 Parameters
157 ----------
158 statements : `list` [ `tuple` ]
159 List of statements and their parameters, passed directly to
160 ``execute_concurrent()``.
161 execution_profile : `str`
162 Execution profile name.
163
164 Returns
165 -------
166 result
167 Combined result of multiple statements, type of the result depends on
168 specific row factory defined in execution profile. If row factory is
169 `pandas_dataframe_factory` then pandas DataFrame is created from a
170 combined result. If row factory is `raw_data_factory` then
171 `ApdbCassandraTableData` is built from all records. Otherwise a list of
172 rows is returned, type of each row is determined by the row factory.
173
174 Notes
175 -----
176 This method can raise any exception that is raised by one of the provided
177 statements.
178 """
179 results = execute_concurrent(
180 session,
181 statements,
182 results_generator=True,
183 raise_on_first_error=False,
184 concurrency=concurrency,
185 execution_profile=execution_profile,
186 )
187
188 ep = session.get_execution_profile(execution_profile)
189 if ep.row_factory is raw_data_factory:
190 # Collect rows into a single list and build Dataframe out of that
191 _LOG.debug("making raw data out of rows/columns")
192 table_data: ApdbCassandraTableData | None = None
193 for success, result in results:
194 if success:
195 data = result._current_rows
196 assert isinstance(data, ApdbCassandraTableData)
197 if table_data is None:
198 table_data = data
199 else:
200 table_data.append(data)
201 else:
202 _LOG.error("error returned by query: %s", result)
203 raise result
204 if table_data is None:
205 table_data = ApdbCassandraTableData([], [])
206 return table_data
207
208 elif ep.row_factory is pandas_dataframe_factory:
209 # Merge multiple DataFrames into one
210 _LOG.debug("making pandas data frame out of set of data frames")
211 dataframes = []
212 for success, result in results:
213 if success:
214 dataframes.append(result._current_rows)
215 else:
216 _LOG.error("error returned by query: %s", result)
217 raise result
218 # Concatenate all frames, but skip empty ones.
219 non_empty = [df for df in dataframes if not df.empty]
220 if not non_empty:
221 # If all frames are empty, return the first one.
222 catalog = dataframes[0]
223 elif len(non_empty) == 1:
224 catalog = non_empty[0]
225 else:
226 catalog = pandas.concat(non_empty)
227 _LOG.debug("pandas catalog shape: %s", catalog.shape)
228 return catalog
229
230 else:
231 # Just concatenate all rows into a single collection.
232 rows = []
233 for success, result in results:
234 if success:
235 rows.extend(result)
236 else:
237 _LOG.error("error returned by query: %s", result)
238 raise result
239 _LOG.debug("number of rows: %s", len(rows))
240 return rows
241
242

Variable Documentation

◆ _LOG

lsst.dax.apdb.cassandra.cassandra_utils._LOG = logging.getLogger(__name__)
protected

Definition at line 56 of file cassandra_utils.py.

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 50 of file cassandra_utils.py.