LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
Classes | Functions | Variables
lsst.dax.apdb.cassandra_utils Namespace Reference

Classes

class  SessionWrapper
 

Functions

pandas.DataFrame pandas_dataframe_factory (List[str] colnames, List[Tuple] rows)
 
Tuple[List[str], List[Tuple]] raw_data_factory (List[str] colnames, List[Tuple] rows)
 
Union[pandas.DataFrame, List] select_concurrent (Session session, List[Tuple] statements, str execution_profile, int concurrency)
 
Any literal (Any v)
 
str quote_id (str columnName)
 

Variables

bool CASSANDRA_IMPORTED = True
 

Function Documentation

◆ literal()

Any lsst.dax.apdb.cassandra_utils.literal ( Any  v)
Transform object into a value for the query.

Definition at line 245 of file cassandra_utils.py.

245def literal(v: Any) -> Any:
246 """Transform object into a value for the query."""
247 if v is None:
248 pass
249 elif isinstance(v, datetime):
250 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1)) * 1000
251 elif isinstance(v, (bytes, str)):
252 pass
253 else:
254 try:
255 if not np.isfinite(v):
256 v = None
257 except TypeError:
258 pass
259 return v
260
261

◆ pandas_dataframe_factory()

pandas.DataFrame lsst.dax.apdb.cassandra_utils.pandas_dataframe_factory ( List[str]  colnames,
List[Tuple]   rows 
)
Special non-standard row factory that creates pandas DataFrame from
Cassandra result set.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
catalog : `pandas.DataFrame`
    DataFrame with the result set.

Notes
-----
When using this method as row factory for Cassandra, the resulting
DataFrame should be accessed in a non-standard way using
`ResultSet._current_rows` attribute.

Definition at line 89 of file cassandra_utils.py.

91) -> pandas.DataFrame:
92 """Special non-standard row factory that creates pandas DataFrame from
93 Cassandra result set.
94
95 Parameters
96 ----------
97 colnames : `list` [ `str` ]
98 Names of the columns.
99 rows : `list` of `tuple`
100 Result rows.
101
102 Returns
103 -------
104 catalog : `pandas.DataFrame`
105 DataFrame with the result set.
106
107 Notes
108 -----
109 When using this method as row factory for Cassandra, the resulting
110 DataFrame should be accessed in a non-standard way using
111 `ResultSet._current_rows` attribute.
112 """
113 return pandas.DataFrame.from_records(rows, columns=colnames)
114
115

◆ quote_id()

str lsst.dax.apdb.cassandra_utils.quote_id ( str  columnName)
Smart quoting for column names. Lower-case names are not quoted.

Definition at line 262 of file cassandra_utils.py.

262def quote_id(columnName: str) -> str:
263 """Smart quoting for column names. Lower-case names are not quoted."""
264 if not columnName.islower():
265 columnName = '"' + columnName + '"'
266 return columnName
str quote_id(str columnName)

◆ raw_data_factory()

Tuple[List[str], List[Tuple]] lsst.dax.apdb.cassandra_utils.raw_data_factory ( List[str]  colnames,
List[Tuple]   rows 
)
Special non-standard row factory that makes 2-element tuple containing
unmodified data: list of column names and list of rows.

Parameters
----------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows.

Returns
-------
colnames : `list` [ `str` ]
    Names of the columns.
rows : `list` of `tuple`
    Result rows

Notes
-----
When using this method as row factory for Cassandra, the resulting
2-element tuple should be accessed in a non-standard way using
`ResultSet._current_rows` attribute. This factory is used to build
pandas DataFrames in `select_concurrent` method.

Definition at line 116 of file cassandra_utils.py.

118) -> Tuple[List[str], List[Tuple]]:
119 """Special non-standard row factory that makes 2-element tuple containing
120 unmodified data: list of column names and list of rows.
121
122 Parameters
123 ----------
124 colnames : `list` [ `str` ]
125 Names of the columns.
126 rows : `list` of `tuple`
127 Result rows.
128
129 Returns
130 -------
131 colnames : `list` [ `str` ]
132 Names of the columns.
133 rows : `list` of `tuple`
134 Result rows
135
136 Notes
137 -----
138 When using this method as row factory for Cassandra, the resulting
139 2-element tuple should be accessed in a non-standard way using
140 `ResultSet._current_rows` attribute. This factory is used to build
141 pandas DataFrames in `select_concurrent` method.
142 """
143 return (colnames, rows)
144
145

◆ select_concurrent()

Union[pandas.DataFrame, List] lsst.dax.apdb.cassandra_utils.select_concurrent ( Session  session,
List[Tuple]  statements,
str  execution_profile,
int   concurrency 
)
Execute bunch of queries concurrently and merge their results into
a single result.

Parameters
----------
statements : `list` [ `tuple` ]
    List of statements and their parameters, passed directly to
    ``execute_concurrent()``.
execution_profile : `str`
    Execution profile name.

Returns
-------
result
    Combined result of multiple statements, type of the result depends on
    specific row factory defined in execution profile. If row factory is
    one of `pandas_dataframe_factory` or `raw_data_factory` then pandas
    DataFrame is created from a combined result. Otherwise a list of
    rows is returned, type of each row is determined by the row factory.

Notes
-----
This method can raise any exception that is raised by one of the provided
statements.

Definition at line 146 of file cassandra_utils.py.

148) -> Union[pandas.DataFrame, List]:
149 """Execute bunch of queries concurrently and merge their results into
150 a single result.
151
152 Parameters
153 ----------
154 statements : `list` [ `tuple` ]
155 List of statements and their parameters, passed directly to
156 ``execute_concurrent()``.
157 execution_profile : `str`
158 Execution profile name.
159
160 Returns
161 -------
162 result
163 Combined result of multiple statements, type of the result depends on
164 specific row factory defined in execution profile. If row factory is
165 one of `pandas_dataframe_factory` or `raw_data_factory` then pandas
166 DataFrame is created from a combined result. Otherwise a list of
167 rows is returned, type of each row is determined by the row factory.
168
169 Notes
170 -----
171 This method can raise any exception that is raised by one of the provided
172 statements.
173 """
174 session_wrap = SessionWrapper(session, execution_profile)
175 results = execute_concurrent(
176 session_wrap,
177 statements,
178 results_generator=True,
179 raise_on_first_error=False,
180 concurrency=concurrency,
181 )
182
183 ep = session.get_execution_profile(execution_profile)
184 if ep.row_factory is raw_data_factory:
185
186 # Collect rows into a single list and build Dataframe out of that
187 _LOG.debug("making pandas data frame out of rows/columns")
188 columns: Any = None
189 rows = []
190 for success, result in results:
191 if success:
192 result = result._current_rows
193 if columns is None:
194 columns = result[0]
195 elif columns != result[0]:
196 _LOG.error(
197 "different columns returned by queries: %s and %s",
198 columns,
199 result[0],
200 )
201 raise ValueError(
202 f"different columns returned by queries: {columns} and {result[0]}"
203 )
204 rows += result[1]
205 else:
206 _LOG.error("error returned by query: %s", result)
207 raise result
208 catalog = pandas_dataframe_factory(columns, rows)
209 _LOG.debug("pandas catalog shape: %s", catalog.shape)
210 return catalog
211
212 elif ep.row_factory is pandas_dataframe_factory:
213
214 # Merge multiple DataFrames into one
215 _LOG.debug("making pandas data frame out of set of data frames")
216 dataframes = []
217 for success, result in results:
218 if success:
219 dataframes.append(result._current_rows)
220 else:
221 _LOG.error("error returned by query: %s", result)
222 raise result
223 # concatenate all frames
224 if len(dataframes) == 1:
225 catalog = dataframes[0]
226 else:
227 catalog = pandas.concat(dataframes)
228 _LOG.debug("pandas catalog shape: %s", catalog.shape)
229 return catalog
230
231 else:
232
233 # Just concatenate all rows into a single collection.
234 rows = []
235 for success, result in results:
236 if success:
237 rows.extend(result)
238 else:
239 _LOG.error("error returned by query: %s", result)
240 raise result
241 _LOG.debug("number of rows: %s", len(rows))
242 return rows
243
244
pandas.DataFrame pandas_dataframe_factory(List[str] colnames, List[Tuple] rows)

Variable Documentation

◆ CASSANDRA_IMPORTED

bool lsst.dax.apdb.cassandra_utils.CASSANDRA_IMPORTED = True

Definition at line 44 of file cassandra_utils.py.