LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.sessionFactory.SessionFactory Class Reference

Public Member Functions

 __init__ (self, ApdbCassandraConfig config)
 
None __del__ (self)
 
Session session (self)
 

Protected Member Functions

tuple[Cluster, Session] _make_session (self)
 
AuthProvider|None _make_auth_provider (self)
 
Mapping[Any, ExecutionProfile] _make_profiles (self)
 

Protected Attributes

 _config = config
 
Cluster|None _cluster = None
 
Session|None _session = None
 

Detailed Description

Implementation of SessionFactory that uses parameters from Apdb
configuration.

Parameters
----------
config : `ApdbCassandraConfig`
    Configuration object.

Definition at line 78 of file sessionFactory.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.sessionFactory.SessionFactory.__init__ ( self,
ApdbCassandraConfig config )

Definition at line 88 of file sessionFactory.py.

88 def __init__(self, config: ApdbCassandraConfig):
89 self._config = config
90 self._cluster: Cluster | None = None
91 self._session: Session | None = None
92

◆ __del__()

None lsst.dax.apdb.cassandra.sessionFactory.SessionFactory.__del__ ( self)

Definition at line 93 of file sessionFactory.py.

93 def __del__(self) -> None:
94 # Need to call Cluster.shutdown() to avoid warnings.
95 if hasattr(self, "_cluster"):
96 if self._cluster:
97 self._cluster.shutdown()
98

Member Function Documentation

◆ _make_auth_provider()

AuthProvider | None lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._make_auth_provider ( self)
protected
Make Cassandra authentication provider instance.

Definition at line 148 of file sessionFactory.py.

148 def _make_auth_provider(self) -> AuthProvider | None:
149 """Make Cassandra authentication provider instance."""
150 try:
151 dbauth = DbAuth()
152 except DbAuthNotFoundError:
153 # Credentials file doesn't exist, use anonymous login.
154 return None
155
156 empty_username = True
157 # Try every contact point in turn.
158 for hostname in self._config.contact_points:
159 try:
160 username, password = dbauth.getAuth(
161 "cassandra",
162 self._config.connection_config.username,
163 hostname,
164 self._config.connection_config.port,
165 self._config.keyspace,
166 )
167 if not username:
168 # Password without user name, try next hostname, but give
169 # warning later if no better match is found.
170 empty_username = True
171 else:
172 return PlainTextAuthProvider(username=username, password=password)
173 except DbAuthNotFoundError:
174 pass
175
176 if empty_username:
177 _LOG.warning(
178 f"Credentials file ({dbauth.db_auth_path}) provided password but not "
179 "user name, anonymous Cassandra logon will be attempted."
180 )
181
182 return None
183

◆ _make_profiles()

Mapping[Any, ExecutionProfile] lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._make_profiles ( self)
protected
Make all execution profiles used in the code.

Definition at line 184 of file sessionFactory.py.

184 def _make_profiles(self) -> Mapping[Any, ExecutionProfile]:
185 """Make all execution profiles used in the code."""
186 config = self._config
187 if config.connection_config.private_ips:
188 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
189 else:
190 loadBalancePolicy = RoundRobinPolicy()
191
192 read_tuples_profile = ExecutionProfile(
193 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
194 request_timeout=config.connection_config.read_timeout,
195 row_factory=cassandra.query.tuple_factory,
196 load_balancing_policy=loadBalancePolicy,
197 )
198 read_pandas_profile = ExecutionProfile(
199 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
200 request_timeout=config.connection_config.read_timeout,
201 row_factory=pandas_dataframe_factory,
202 load_balancing_policy=loadBalancePolicy,
203 )
204 read_raw_profile = ExecutionProfile(
205 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
206 request_timeout=config.connection_config.read_timeout,
207 row_factory=raw_data_factory,
208 load_balancing_policy=loadBalancePolicy,
209 )
210 # Profile to use with select_concurrent to return pandas data frame
211 read_pandas_multi_profile = ExecutionProfile(
212 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
213 request_timeout=config.connection_config.read_timeout,
214 row_factory=pandas_dataframe_factory,
215 load_balancing_policy=loadBalancePolicy,
216 )
217 # Profile to use with select_concurrent to return raw data (columns and
218 # rows)
219 read_raw_multi_profile = ExecutionProfile(
220 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
221 request_timeout=config.connection_config.read_timeout,
222 row_factory=raw_data_factory,
223 load_balancing_policy=loadBalancePolicy,
224 )
225 write_profile = ExecutionProfile(
226 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
227 request_timeout=config.connection_config.write_timeout,
228 load_balancing_policy=loadBalancePolicy,
229 )
230 # To replace default DCAwareRoundRobinPolicy
231 default_profile = ExecutionProfile(
232 load_balancing_policy=loadBalancePolicy,
233 )
234 return {
235 "read_tuples": read_tuples_profile,
236 "read_pandas": read_pandas_profile,
237 "read_raw": read_raw_profile,
238 "read_pandas_multi": read_pandas_multi_profile,
239 "read_raw_multi": read_raw_multi_profile,
240 "write": write_profile,
241 EXEC_PROFILE_DEFAULT: default_profile,
242 }
243
244

◆ _make_session()

tuple[Cluster, Session] lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._make_session ( self)
protected
Make Cassandra session.

Returns
-------
cluster : `cassandra.cluster.Cluster`
    Cassandra Cluster object
session : `cassandra.cluster.Session`
    Cassandra session object

Definition at line 111 of file sessionFactory.py.

111 def _make_session(self) -> tuple[Cluster, Session]:
112 """Make Cassandra session.
113
114 Returns
115 -------
116 cluster : `cassandra.cluster.Cluster`
117 Cassandra Cluster object
118 session : `cassandra.cluster.Session`
119 Cassandra session object
120 """
121 addressTranslator: AddressTranslator | None = None
122 if self._config.connection_config.private_ips:
123 addressTranslator = _AddressTranslator(
124 self._config.contact_points, self._config.connection_config.private_ips
125 )
126
127 with Timer("cluster_connect", _MON):
128 cluster = Cluster(
129 execution_profiles=self._make_profiles(),
130 contact_points=self._config.contact_points,
131 port=self._config.connection_config.port,
132 address_translator=addressTranslator,
133 protocol_version=self._config.connection_config.protocol_version,
134 auth_provider=self._make_auth_provider(),
135 **self._config.connection_config.extra_parameters,
136 )
137 session = cluster.connect()
138
139 # Dump queries if debug level is enabled.
140 if _LOG.isEnabledFor(logging.DEBUG):
141 session.add_request_init_listener(_dump_query)
142
143 # Disable result paging
144 session.default_fetch_size = None
145
146 return cluster, session
147

◆ session()

Session lsst.dax.apdb.cassandra.sessionFactory.SessionFactory.session ( self)
Return Cassandra Session, making new connection if necessary.

Returns
-------
session : `cassandra.cluster.Sesion`
    Cassandra session object.

Definition at line 99 of file sessionFactory.py.

99 def session(self) -> Session:
100 """Return Cassandra Session, making new connection if necessary.
101
102 Returns
103 -------
104 session : `cassandra.cluster.Sesion`
105 Cassandra session object.
106 """
107 if self._session is None:
108 self._cluster, self._session = self._make_session()
109 return self._session
110

Member Data Documentation

◆ _cluster

lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._cluster = None
protected

Definition at line 90 of file sessionFactory.py.

◆ _config

lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._config = config
protected

Definition at line 89 of file sessionFactory.py.

◆ _session

Session | None lsst.dax.apdb.cassandra.sessionFactory.SessionFactory._session = None
protected

Definition at line 91 of file sessionFactory.py.


The documentation for this class was generated from the following file: