LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Static Protected Attributes | List of all members
lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra Class Reference
Inheritance diagram for lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

 __init__ (self, ApdbCassandraConfig config)
 
None __del__ (self)
 
VersionTuple apdbImplementationVersion (cls)
 
VersionTuple apdbSchemaVersion (self)
 
Table|None tableDef (self, ApdbTables table)
 
ApdbCassandraConfig init_database (cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False)
 
ApdbCassandraReplica get_replica (self)
 
pandas.DataFrame getDiaObjects (self, sphgeom.Region region)
 
pandas.DataFrame|None getDiaSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame|None getDiaForcedSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
bool containsVisitDetector (self, int visit, int detector)
 
pandas.DataFrame getSSObjects (self)
 
None store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
 
None storeSSObjects (self, pandas.DataFrame objects)
 
None reassignDiaSources (self, Mapping[int, int] idMap)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ApdbMetadata metadata (self)
 

Public Attributes

 config
 
 metadataSchemaVersionKey
 
 metadataCodeVersionKey
 
 metadataReplicaVersionKey
 
 metadataConfigKey
 

Static Public Attributes

str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbCassandra"
 
str metadataReplicaVersionKey = "version:ApdbCassandraReplica"
 
str metadataConfigKey = "config:apdb-cassandra.json"
 
 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
 

Protected Member Functions

tuple[Cluster, Session] _make_session (cls, ApdbCassandraConfig config)
 
AuthProvider|None _make_auth_provider (cls, ApdbCassandraConfig config)
 
None _versionCheck (self, ApdbMetadataCassandra metadata)
 
None _makeSchema (cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None)
 
Mapping[Any, ExecutionProfile] _makeProfiles (cls, ApdbCassandraConfig config)
 
pandas.DataFrame _getSources (self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
 
None _storeReplicaChunk (self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
 
None _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeDiaSources (self, ApdbTables table_name, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeDiaSourcesPartitions (self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeObjectsPandas (self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
 
pandas.DataFrame _add_obj_part (self, pandas.DataFrame df)
 
pandas.DataFrame _add_src_part (self, pandas.DataFrame sources, pandas.DataFrame objs)
 
pandas.DataFrame _add_fsrc_part (self, pandas.DataFrame sources, pandas.DataFrame objs)
 
int _time_partition_cls (cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
 
int _time_partition (self, float|astropy.time.Time time)
 
pandas.DataFrame _make_empty_catalog (self, ApdbTables table_name)
 
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where (self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
 
list[tuple[str, tuple]] _spatial_where (self, sphgeom.Region|None region, bool use_ranges=False)
 
tuple[list[str], list[tuple[str, tuple]]] _temporal_where (self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
 

Protected Attributes

 _keyspace
 
 _cluster
 
 _session
 
 _metadata
 
 _pixelization
 
 _schema
 
 _partition_zero_epoch_mjd
 
 _preparer
 

Static Protected Attributes

tuple _frozen_parameters
 

Detailed Description

Implementation of APDB database on to of Apache Cassandra.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbCassandraConfig` configuration class. For an example of
different configurations check config/ folder.

Parameters
----------
config : `ApdbCassandraConfig`
    Configuration object.

Definition at line 250 of file apdbCassandra.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.__init__ ( self,
ApdbCassandraConfig config )

Definition at line 289 of file apdbCassandra.py.

289 def __init__(self, config: ApdbCassandraConfig):
290 if not CASSANDRA_IMPORTED:
291 raise CassandraMissingError()
292
293 self._keyspace = config.keyspace
294
295 self._cluster, self._session = self._make_session(config)
296
297 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
298 self._metadata = ApdbMetadataCassandra(
299 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
300 )
301
302 # Read frozen config from metadata.
303 config_json = self._metadata.get(self.metadataConfigKey)
304 if config_json is not None:
305 # Update config from metadata.
306 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
307 self.config = freezer.update(config, config_json)
308 else:
309 self.config = config
310 self.config.validate()
311
312 self._pixelization = Pixelization(
313 self.config.part_pixelization,
314 self.config.part_pix_level,
315 config.part_pix_max_ranges,
316 )
317
318 self._schema = ApdbCassandraSchema(
319 session=self._session,
320 keyspace=self._keyspace,
321 schema_file=self.config.schema_file,
322 schema_name=self.config.schema_name,
323 prefix=self.config.prefix,
324 time_partition_tables=self.config.time_partition_tables,
325 enable_replica=self.config.use_insert_id,
326 )
327 self._partition_zero_epoch_mjd = float(self.partition_zero_epoch.mjd)
328
329 if self._metadata.table_exists():
330 self._versionCheck(self._metadata)
331
332 # Cache for prepared statements
333 self._preparer = PreparedStatementCache(self._session)
334
335 _LOG.debug("ApdbCassandra Configuration:")
336 for key, value in self.config.items():
337 _LOG.debug(" %s: %s", key, value)
338
std::vector< SchemaItem< Flag > > * items

◆ __del__()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.__del__ ( self)

Definition at line 339 of file apdbCassandra.py.

339 def __del__(self) -> None:
340 if hasattr(self, "_cluster"):
341 self._cluster.shutdown()
342

Member Function Documentation

◆ _add_fsrc_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_fsrc_part ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add apdb_part column to DiaForcedSource catalog.

Notes
-----
This method copies apdb_part value from a matching DiaObject record.
DiaObject catalog needs to have a apdb_part column filled by
``_add_obj_part`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1251 of file apdbCassandra.py.

1251 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1252 """Add apdb_part column to DiaForcedSource catalog.
1253
1254 Notes
1255 -----
1256 This method copies apdb_part value from a matching DiaObject record.
1257 DiaObject catalog needs to have a apdb_part column filled by
1258 ``_add_obj_part`` method and DiaSource records need to be
1259 associated to DiaObjects via ``diaObjectId`` column.
1260
1261 This overrides any existing column in a DataFrame with the same name
1262 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1263 returned.
1264 """
1265 pixel_id_map: dict[int, int] = {
1266 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1267 }
1268 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1269 for i, diaObjId in enumerate(sources["diaObjectId"]):
1270 apdb_part[i] = pixel_id_map[diaObjId]
1271 sources = sources.copy()
1272 sources["apdb_part"] = apdb_part
1273 return sources
1274

◆ _add_obj_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_obj_part ( self,
pandas.DataFrame df )
protected
Calculate spatial partition for each record and add it to a
DataFrame.

Notes
-----
This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1195 of file apdbCassandra.py.

1195 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1196 """Calculate spatial partition for each record and add it to a
1197 DataFrame.
1198
1199 Notes
1200 -----
1201 This overrides any existing column in a DataFrame with the same name
1202 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1203 returned.
1204 """
1205 # calculate HTM index for every DiaObject
1206 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1207 ra_col, dec_col = self.config.ra_dec_columns
1208 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1209 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1210 idx = self._pixelization.pixel(uv3d)
1211 apdb_part[i] = idx
1212 df = df.copy()
1213 df["apdb_part"] = apdb_part
1214 return df
1215

◆ _add_src_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_src_part ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add apdb_part column to DiaSource catalog.

Notes
-----
This method copies apdb_part value from a matching DiaObject record.
DiaObject catalog needs to have a apdb_part column filled by
``_add_obj_part`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1216 of file apdbCassandra.py.

1216 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1217 """Add apdb_part column to DiaSource catalog.
1218
1219 Notes
1220 -----
1221 This method copies apdb_part value from a matching DiaObject record.
1222 DiaObject catalog needs to have a apdb_part column filled by
1223 ``_add_obj_part`` method and DiaSource records need to be
1224 associated to DiaObjects via ``diaObjectId`` column.
1225
1226 This overrides any existing column in a DataFrame with the same name
1227 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1228 returned.
1229 """
1230 pixel_id_map: dict[int, int] = {
1231 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1232 }
1233 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1234 ra_col, dec_col = self.config.ra_dec_columns
1235 for i, (diaObjId, ra, dec) in enumerate(
1236 zip(sources["diaObjectId"], sources[ra_col], sources[dec_col])
1237 ):
1238 if diaObjId == 0:
1239 # DiaSources associated with SolarSystemObjects do not have an
1240 # associated DiaObject hence we skip them and set partition
1241 # based on its own ra/dec
1242 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1243 idx = self._pixelization.pixel(uv3d)
1244 apdb_part[i] = idx
1245 else:
1246 apdb_part[i] = pixel_id_map[diaObjId]
1247 sources = sources.copy()
1248 sources["apdb_part"] = apdb_part
1249 return sources
1250

◆ _combine_where()

Iterator[tuple[cassandra.query.Statement, tuple]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._combine_where ( self,
str prefix,
list[tuple[str, tuple]] where1,
list[tuple[str, tuple]] where2,
str | None suffix = None )
protected
Make cartesian product of two parts of WHERE clause into a series
of statements to execute.

Parameters
----------
prefix : `str`
    Initial statement prefix that comes before WHERE clause, e.g.
    "SELECT * from Table"

Definition at line 1345 of file apdbCassandra.py.

1351 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1352 """Make cartesian product of two parts of WHERE clause into a series
1353 of statements to execute.
1354
1355 Parameters
1356 ----------
1357 prefix : `str`
1358 Initial statement prefix that comes before WHERE clause, e.g.
1359 "SELECT * from Table"
1360 """
1361 # If lists are empty use special sentinels.
1362 if not where1:
1363 where1 = [("", ())]
1364 if not where2:
1365 where2 = [("", ())]
1366
1367 for expr1, params1 in where1:
1368 for expr2, params2 in where2:
1369 full_query = prefix
1370 wheres = []
1371 if expr1:
1372 wheres.append(expr1)
1373 if expr2:
1374 wheres.append(expr2)
1375 if wheres:
1376 full_query += " WHERE " + " AND ".join(wheres)
1377 if suffix:
1378 full_query += " " + suffix
1379 params = params1 + params2
1380 if params:
1381 statement = self._preparer.prepare(full_query)
1382 else:
1383 # If there are no params then it is likely that query
1384 # has a bunch of literals rendered already, no point
1385 # trying to prepare it.
1386 statement = cassandra.query.SimpleStatement(full_query)
1387 yield (statement, params)
1388

◆ _getSources()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._getSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
float mjd_start,
float mjd_end,
ApdbTables table_name )
protected
Return catalog of DiaSource instances given set of DiaObject IDs.

Parameters
----------
region : `lsst.sphgeom.Region`
    Spherical region.
object_ids :
    Collection of DiaObject IDs
mjd_start : `float`
    Lower bound of time interval.
mjd_end : `float`
    Upper bound of time interval.
table_name : `ApdbTables`
    Name of the table.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. Empty catalog is returned if
    ``object_ids`` is empty.

Definition at line 923 of file apdbCassandra.py.

930 ) -> pandas.DataFrame:
931 """Return catalog of DiaSource instances given set of DiaObject IDs.
932
933 Parameters
934 ----------
935 region : `lsst.sphgeom.Region`
936 Spherical region.
937 object_ids :
938 Collection of DiaObject IDs
939 mjd_start : `float`
940 Lower bound of time interval.
941 mjd_end : `float`
942 Upper bound of time interval.
943 table_name : `ApdbTables`
944 Name of the table.
945
946 Returns
947 -------
948 catalog : `pandas.DataFrame`, or `None`
949 Catalog containing DiaSource records. Empty catalog is returned if
950 ``object_ids`` is empty.
951 """
952 object_id_set: Set[int] = set()
953 if object_ids is not None:
954 object_id_set = set(object_ids)
955 if len(object_id_set) == 0:
956 return self._make_empty_catalog(table_name)
957
958 sp_where = self._spatial_where(region)
959 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
960
961 # We need to exclude extra partitioning columns from result.
962 column_names = self._schema.apdbColumnNames(table_name)
963 what = ",".join(quote_id(column) for column in column_names)
964
965 # Build all queries
966 statements: list[tuple] = []
967 for table in tables:
968 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
969 statements += list(self._combine_where(prefix, sp_where, temporal_where))
970 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
971
972 with Timer(table_name.name + " select", self.config.timer):
973 catalog = cast(
974 pandas.DataFrame,
975 select_concurrent(
976 self._session, statements, "read_pandas_multi", self.config.read_concurrency
977 ),
978 )
979
980 # filter by given object IDs
981 if len(object_id_set) > 0:
982 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
983
984 # precise filtering on midpointMjdTai
985 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
986
987 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
988 return catalog
989
daf::base::PropertySet * set
Definition fits.cc:931

◆ _make_auth_provider()

AuthProvider | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_auth_provider ( cls,
ApdbCassandraConfig config )
protected
Make Cassandra authentication provider instance.

Definition at line 365 of file apdbCassandra.py.

365 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
366 """Make Cassandra authentication provider instance."""
367 try:
368 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
369 except DbAuthNotFoundError:
370 # Credentials file doesn't exist, use anonymous login.
371 return None
372
373 empty_username = True
374 # Try every contact point in turn.
375 for hostname in config.contact_points:
376 try:
377 username, password = dbauth.getAuth(
378 "cassandra", config.username, hostname, config.port, config.keyspace
379 )
380 if not username:
381 # Password without user name, try next hostname, but give
382 # warning later if no better match is found.
383 empty_username = True
384 else:
385 return PlainTextAuthProvider(username=username, password=password)
386 except DbAuthNotFoundError:
387 pass
388
389 if empty_username:
390 _LOG.warning(
391 f"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
392 f"user name, anonymous Cassandra logon will be attempted."
393 )
394
395 return None
396

◆ _make_empty_catalog()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_empty_catalog ( self,
ApdbTables table_name )
protected
Make an empty catalog for a table with a given name.

Parameters
----------
table_name : `ApdbTables`
    Name of the table.

Returns
-------
catalog : `pandas.DataFrame`
    An empty catalog.

Definition at line 1324 of file apdbCassandra.py.

1324 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1325 """Make an empty catalog for a table with a given name.
1326
1327 Parameters
1328 ----------
1329 table_name : `ApdbTables`
1330 Name of the table.
1331
1332 Returns
1333 -------
1334 catalog : `pandas.DataFrame`
1335 An empty catalog.
1336 """
1337 table = self._schema.tableSchemas[table_name]
1338
1339 data = {
1340 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1341 for columnDef in table.columns
1342 }
1343 return pandas.DataFrame(data)
1344

◆ _make_session()

tuple[Cluster, Session] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_session ( cls,
ApdbCassandraConfig config )
protected
Make Cassandra session.

Definition at line 344 of file apdbCassandra.py.

344 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
345 """Make Cassandra session."""
346 addressTranslator: AddressTranslator | None = None
347 if config.private_ips:
348 addressTranslator = _AddressTranslator(list(config.contact_points), list(config.private_ips))
349
350 cluster = Cluster(
351 execution_profiles=cls._makeProfiles(config),
352 contact_points=config.contact_points,
353 port=config.port,
354 address_translator=addressTranslator,
355 protocol_version=config.protocol_version,
356 auth_provider=cls._make_auth_provider(config),
357 )
358 session = cluster.connect()
359 # Disable result paging
360 session.default_fetch_size = None
361
362 return cluster, session
363

◆ _makeProfiles()

Mapping[Any, ExecutionProfile] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._makeProfiles ( cls,
ApdbCassandraConfig config )
protected
Make all execution profiles used in the code.

Definition at line 864 of file apdbCassandra.py.

864 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
865 """Make all execution profiles used in the code."""
866 if config.private_ips:
867 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
868 else:
869 loadBalancePolicy = RoundRobinPolicy()
870
871 read_tuples_profile = ExecutionProfile(
872 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
873 request_timeout=config.read_timeout,
874 row_factory=cassandra.query.tuple_factory,
875 load_balancing_policy=loadBalancePolicy,
876 )
877 read_pandas_profile = ExecutionProfile(
878 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
879 request_timeout=config.read_timeout,
880 row_factory=pandas_dataframe_factory,
881 load_balancing_policy=loadBalancePolicy,
882 )
883 read_raw_profile = ExecutionProfile(
884 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
885 request_timeout=config.read_timeout,
886 row_factory=raw_data_factory,
887 load_balancing_policy=loadBalancePolicy,
888 )
889 # Profile to use with select_concurrent to return pandas data frame
890 read_pandas_multi_profile = ExecutionProfile(
891 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
892 request_timeout=config.read_timeout,
893 row_factory=pandas_dataframe_factory,
894 load_balancing_policy=loadBalancePolicy,
895 )
896 # Profile to use with select_concurrent to return raw data (columns and
897 # rows)
898 read_raw_multi_profile = ExecutionProfile(
899 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
900 request_timeout=config.read_timeout,
901 row_factory=raw_data_factory,
902 load_balancing_policy=loadBalancePolicy,
903 )
904 write_profile = ExecutionProfile(
905 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
906 request_timeout=config.write_timeout,
907 load_balancing_policy=loadBalancePolicy,
908 )
909 # To replace default DCAwareRoundRobinPolicy
910 default_profile = ExecutionProfile(
911 load_balancing_policy=loadBalancePolicy,
912 )
913 return {
914 "read_tuples": read_tuples_profile,
915 "read_pandas": read_pandas_profile,
916 "read_raw": read_raw_profile,
917 "read_pandas_multi": read_pandas_multi_profile,
918 "read_raw_multi": read_raw_multi_profile,
919 "write": write_profile,
920 EXEC_PROFILE_DEFAULT: default_profile,
921 }
922

◆ _makeSchema()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._makeSchema ( cls,
ApdbConfig config,
*bool drop = False,
int | None replication_factor = None )
protected

Definition at line 593 of file apdbCassandra.py.

595 ) -> None:
596 # docstring is inherited from a base class
597
598 if not isinstance(config, ApdbCassandraConfig):
599 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
600
601 cluster, session = cls._make_session(config)
602
603 schema = ApdbCassandraSchema(
604 session=session,
605 keyspace=config.keyspace,
606 schema_file=config.schema_file,
607 schema_name=config.schema_name,
608 prefix=config.prefix,
609 time_partition_tables=config.time_partition_tables,
610 enable_replica=config.use_insert_id,
611 )
612
613 # Ask schema to create all tables.
614 if config.time_partition_tables:
615 time_partition_start = astropy.time.Time(config.time_partition_start, format="isot", scale="tai")
616 time_partition_end = astropy.time.Time(config.time_partition_end, format="isot", scale="tai")
617 part_epoch = float(cls.partition_zero_epoch.mjd)
618 part_days = config.time_partition_days
619 part_range = (
620 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
621 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
622 )
623 schema.makeSchema(drop=drop, part_range=part_range, replication_factor=replication_factor)
624 else:
625 schema.makeSchema(drop=drop, replication_factor=replication_factor)
626
627 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
628 metadata = ApdbMetadataCassandra(session, meta_table_name, config.keyspace, "read_tuples", "write")
629
630 # Fill version numbers, overrides if they existed before.
631 if metadata.table_exists():
632 metadata.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
633 metadata.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
634
635 if config.use_insert_id:
636 # Only store replica code version if replica is enabled.
637 metadata.set(
638 cls.metadataReplicaVersionKey,
639 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
640 force=True,
641 )
642
643 # Store frozen part of a configuration in metadata.
644 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
645 metadata.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
646
647 cluster.shutdown()
648

◆ _spatial_where()

list[tuple[str, tuple]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._spatial_where ( self,
sphgeom.Region | None region,
bool use_ranges = False )
protected
Generate expressions for spatial part of WHERE clause.

Parameters
----------
region : `sphgeom.Region`
    Spatial region for query results.
use_ranges : `bool`
    If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
    p2") instead of exact list of pixels. Should be set to True for
    large regions covering very many pixels.

Returns
-------
expressions : `list` [ `tuple` ]
    Empty list is returned if ``region`` is `None`, otherwise a list
    of one or more (expression, parameters) tuples

Definition at line 1389 of file apdbCassandra.py.

1391 ) -> list[tuple[str, tuple]]:
1392 """Generate expressions for spatial part of WHERE clause.
1393
1394 Parameters
1395 ----------
1396 region : `sphgeom.Region`
1397 Spatial region for query results.
1398 use_ranges : `bool`
1399 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1400 p2") instead of exact list of pixels. Should be set to True for
1401 large regions covering very many pixels.
1402
1403 Returns
1404 -------
1405 expressions : `list` [ `tuple` ]
1406 Empty list is returned if ``region`` is `None`, otherwise a list
1407 of one or more (expression, parameters) tuples
1408 """
1409 if region is None:
1410 return []
1411 if use_ranges:
1412 pixel_ranges = self._pixelization.envelope(region)
1413 expressions: list[tuple[str, tuple]] = []
1414 for lower, upper in pixel_ranges:
1415 upper -= 1
1416 if lower == upper:
1417 expressions.append(('"apdb_part" = ?', (lower,)))
1418 else:
1419 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1420 return expressions
1421 else:
1422 pixels = self._pixelization.pixels(region)
1423 if self.config.query_per_spatial_part:
1424 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1425 else:
1426 pixels_str = ",".join([str(pix) for pix in pixels])
1427 return [(f'"apdb_part" IN ({pixels_str})', ())]
1428

◆ _storeDiaObjects()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaObjects ( self,
pandas.DataFrame objs,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store catalog of DiaObjects from current visit.

Parameters
----------
objs : `pandas.DataFrame`
    Catalog with DiaObject records
visit_time : `astropy.time.Time`
    Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
    Replica chunk identifier if replication is configured.

Definition at line 1011 of file apdbCassandra.py.

1013 ) -> None:
1014 """Store catalog of DiaObjects from current visit.
1015
1016 Parameters
1017 ----------
1018 objs : `pandas.DataFrame`
1019 Catalog with DiaObject records
1020 visit_time : `astropy.time.Time`
1021 Time of the current visit.
1022 replica_chunk : `ReplicaChunk` or `None`
1023 Replica chunk identifier if replication is configured.
1024 """
1025 if len(objs) == 0:
1026 _LOG.debug("No objects to write to database.")
1027 return
1028
1029 visit_time_dt = visit_time.datetime
1030 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1031 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1032
1033 extra_columns["validityStart"] = visit_time_dt
1034 time_part: int | None = self._time_partition(visit_time)
1035 if not self.config.time_partition_tables:
1036 extra_columns["apdb_time_part"] = time_part
1037 time_part = None
1038
1039 # Only store DiaObects if not doing replication or explicitly
1040 # configured to always store them.
1041 if replica_chunk is None or not self.config.use_insert_id_skips_diaobjects:
1042 self._storeObjectsPandas(
1043 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1044 )
1045
1046 if replica_chunk is not None:
1047 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1048 self._storeObjectsPandas(objs, ExtraTables.DiaObjectChunks, extra_columns=extra_columns)
1049

◆ _storeDiaSources()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaSources ( self,
ApdbTables table_name,
pandas.DataFrame sources,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store catalog of DIASources or DIAForcedSources from current visit.

Parameters
----------
table_name : `ApdbTables`
    Table where to store the data.
sources : `pandas.DataFrame`
    Catalog containing DiaSource records
visit_time : `astropy.time.Time`
    Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
    Replica chunk identifier if replication is configured.

Definition at line 1050 of file apdbCassandra.py.

1056 ) -> None:
1057 """Store catalog of DIASources or DIAForcedSources from current visit.
1058
1059 Parameters
1060 ----------
1061 table_name : `ApdbTables`
1062 Table where to store the data.
1063 sources : `pandas.DataFrame`
1064 Catalog containing DiaSource records
1065 visit_time : `astropy.time.Time`
1066 Time of the current visit.
1067 replica_chunk : `ReplicaChunk` or `None`
1068 Replica chunk identifier if replication is configured.
1069 """
1070 time_part: int | None = self._time_partition(visit_time)
1071 extra_columns: dict[str, Any] = {}
1072 if not self.config.time_partition_tables:
1073 extra_columns["apdb_time_part"] = time_part
1074 time_part = None
1075
1076 self._storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
1077
1078 if replica_chunk is not None:
1079 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1080 if table_name is ApdbTables.DiaSource:
1081 extra_table = ExtraTables.DiaSourceChunks
1082 else:
1083 extra_table = ExtraTables.DiaForcedSourceChunks
1084 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1085

◆ _storeDiaSourcesPartitions()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaSourcesPartitions ( self,
pandas.DataFrame sources,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store mapping of diaSourceId to its partitioning values.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaSource records
visit_time : `astropy.time.Time`
    Time of the current visit.

Definition at line 1086 of file apdbCassandra.py.

1088 ) -> None:
1089 """Store mapping of diaSourceId to its partitioning values.
1090
1091 Parameters
1092 ----------
1093 sources : `pandas.DataFrame`
1094 Catalog containing DiaSource records
1095 visit_time : `astropy.time.Time`
1096 Time of the current visit.
1097 """
1098 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1099 extra_columns = {
1100 "apdb_time_part": self._time_partition(visit_time),
1101 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1102 }
1103
1104 self._storeObjectsPandas(
1105 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1106 )
1107

◆ _storeObjectsPandas()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeObjectsPandas ( self,
pandas.DataFrame records,
ApdbTables | ExtraTables table_name,
Mapping | None extra_columns = None,
int | None time_part = None )
protected
Store generic objects.

Takes Pandas catalog and stores a bunch of records in a table.

Parameters
----------
records : `pandas.DataFrame`
    Catalog containing object records
table_name : `ApdbTables`
    Name of the table as defined in APDB schema.
extra_columns : `dict`, optional
    Mapping (column_name, column_value) which gives fixed values for
    columns in each row, overrides values in ``records`` if matching
    columns exist there.
time_part : `int`, optional
    If not `None` then insert into a per-partition table.

Notes
-----
If Pandas catalog contains additional columns not defined in table
schema they are ignored. Catalog does not have to contain all columns
defined in a table, but partition and clustering keys must be present
in a catalog or ``extra_columns``.

Definition at line 1108 of file apdbCassandra.py.

1114 ) -> None:
1115 """Store generic objects.
1116
1117 Takes Pandas catalog and stores a bunch of records in a table.
1118
1119 Parameters
1120 ----------
1121 records : `pandas.DataFrame`
1122 Catalog containing object records
1123 table_name : `ApdbTables`
1124 Name of the table as defined in APDB schema.
1125 extra_columns : `dict`, optional
1126 Mapping (column_name, column_value) which gives fixed values for
1127 columns in each row, overrides values in ``records`` if matching
1128 columns exist there.
1129 time_part : `int`, optional
1130 If not `None` then insert into a per-partition table.
1131
1132 Notes
1133 -----
1134 If Pandas catalog contains additional columns not defined in table
1135 schema they are ignored. Catalog does not have to contain all columns
1136 defined in a table, but partition and clustering keys must be present
1137 in a catalog or ``extra_columns``.
1138 """
1139 # use extra columns if specified
1140 if extra_columns is None:
1141 extra_columns = {}
1142 extra_fields = list(extra_columns.keys())
1143
1144 # Fields that will come from dataframe.
1145 df_fields = [column for column in records.columns if column not in extra_fields]
1146
1147 column_map = self._schema.getColumnMap(table_name)
1148 # list of columns (as in felis schema)
1149 fields = [column_map[field].name for field in df_fields if field in column_map]
1150 fields += extra_fields
1151
1152 # check that all partitioning and clustering columns are defined
1153 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1154 table_name
1155 )
1156 missing_columns = [column for column in required_columns if column not in fields]
1157 if missing_columns:
1158 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1159
1160 qfields = [quote_id(field) for field in fields]
1161 qfields_str = ",".join(qfields)
1162
1163 with Timer(table_name.name + " query build", self.config.timer):
1164 table = self._schema.tableName(table_name)
1165 if time_part is not None:
1166 table = f"{table}_{time_part}"
1167
1168 holders = ",".join(["?"] * len(qfields))
1169 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1170 statement = self._preparer.prepare(query)
1171 queries = cassandra.query.BatchStatement()
1172 for rec in records.itertuples(index=False):
1173 values = []
1174 for field in df_fields:
1175 if field not in column_map:
1176 continue
1177 value = getattr(rec, field)
1178 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1179 if isinstance(value, pandas.Timestamp):
1180 value = literal(value.to_pydatetime())
1181 else:
1182 # Assume it's seconds since epoch, Cassandra
1183 # datetime is in milliseconds
1184 value = int(value * 1000)
1185 values.append(literal(value))
1186 for field in extra_fields:
1187 value = extra_columns[field]
1188 values.append(literal(value))
1189 queries.add(statement, values)
1190
1191 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1192 with Timer(table_name.name + " insert", self.config.timer):
1193 self._session.execute(queries, timeout=self.config.write_timeout, execution_profile="write")
1194

◆ _storeReplicaChunk()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeReplicaChunk ( self,
ReplicaChunk replica_chunk,
astropy.time.Time visit_time )
protected

Definition at line 990 of file apdbCassandra.py.

990 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
991 # Cassandra timestamp uses milliseconds since epoch
992 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
993
994 # everything goes into a single partition
995 partition = 0
996
997 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
998 query = (
999 f'INSERT INTO "{self._keyspace}"."{table_name}" '
1000 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1001 "VALUES (?, ?, ?, ?)"
1002 )
1003
1004 self._session.execute(
1005 self._preparer.prepare(query),
1006 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1007 timeout=self.config.write_timeout,
1008 execution_profile="write",
1009 )
1010

◆ _temporal_where()

tuple[list[str], list[tuple[str, tuple]]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._temporal_where ( self,
ApdbTables table,
float | astropy.time.Time start_time,
float | astropy.time.Time end_time,
bool | None query_per_time_part = None )
protected
Generate table names and expressions for temporal part of WHERE
clauses.

Parameters
----------
table : `ApdbTables`
    Table to select from.
start_time : `astropy.time.Time` or `float`
    Starting Datetime of MJD value of the time range.
end_time : `astropy.time.Time` or `float`
    Starting Datetime of MJD value of the time range.
query_per_time_part : `bool`, optional
    If None then use ``query_per_time_part`` from configuration.

Returns
-------
tables : `list` [ `str` ]
    List of the table names to query.
expressions : `list` [ `tuple` ]
    A list of zero or more (expression, parameters) tuples.

Definition at line 1429 of file apdbCassandra.py.

1435 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1436 """Generate table names and expressions for temporal part of WHERE
1437 clauses.
1438
1439 Parameters
1440 ----------
1441 table : `ApdbTables`
1442 Table to select from.
1443 start_time : `astropy.time.Time` or `float`
1444 Starting Datetime of MJD value of the time range.
1445 end_time : `astropy.time.Time` or `float`
1446 Starting Datetime of MJD value of the time range.
1447 query_per_time_part : `bool`, optional
1448 If None then use ``query_per_time_part`` from configuration.
1449
1450 Returns
1451 -------
1452 tables : `list` [ `str` ]
1453 List of the table names to query.
1454 expressions : `list` [ `tuple` ]
1455 A list of zero or more (expression, parameters) tuples.
1456 """
1457 tables: list[str]
1458 temporal_where: list[tuple[str, tuple]] = []
1459 table_name = self._schema.tableName(table)
1460 time_part_start = self._time_partition(start_time)
1461 time_part_end = self._time_partition(end_time)
1462 time_parts = list(range(time_part_start, time_part_end + 1))
1463 if self.config.time_partition_tables:
1464 tables = [f"{table_name}_{part}" for part in time_parts]
1465 else:
1466 tables = [table_name]
1467 if query_per_time_part is None:
1468 query_per_time_part = self.config.query_per_time_part
1469 if query_per_time_part:
1470 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1471 else:
1472 time_part_list = ",".join([str(part) for part in time_parts])
1473 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1474
1475 return tables, temporal_where

◆ _time_partition()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._time_partition ( self,
float | astropy.time.Time time )
protected
Calculate time partition number for a given time.

Parameters
----------
time : `float` or `astropy.time.Time`
    Time for which to calculate partition number. Can be float to mean
    MJD or `astropy.time.Time`

Returns
-------
partition : `int`
    Partition number for a given time.

Definition at line 1302 of file apdbCassandra.py.

1302 def _time_partition(self, time: float | astropy.time.Time) -> int:
1303 """Calculate time partition number for a given time.
1304
1305 Parameters
1306 ----------
1307 time : `float` or `astropy.time.Time`
1308 Time for which to calculate partition number. Can be float to mean
1309 MJD or `astropy.time.Time`
1310
1311 Returns
1312 -------
1313 partition : `int`
1314 Partition number for a given time.
1315 """
1316 if isinstance(time, astropy.time.Time):
1317 mjd = float(time.mjd)
1318 else:
1319 mjd = time
1320 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1321 partition = int(days_since_epoch) // self.config.time_partition_days
1322 return partition
1323

◆ _time_partition_cls()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._time_partition_cls ( cls,
float | astropy.time.Time time,
float epoch_mjd,
int part_days )
protected
Calculate time partition number for a given time.

Parameters
----------
time : `float` or `astropy.time.Time`
    Time for which to calculate partition number. Can be float to mean
    MJD or `astropy.time.Time`
epoch_mjd : `float`
    Epoch time for partition 0.
part_days : `int`
    Number of days per partition.

Returns
-------
partition : `int`
    Partition number for a given time.

Definition at line 1276 of file apdbCassandra.py.

1276 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1277 """Calculate time partition number for a given time.
1278
1279 Parameters
1280 ----------
1281 time : `float` or `astropy.time.Time`
1282 Time for which to calculate partition number. Can be float to mean
1283 MJD or `astropy.time.Time`
1284 epoch_mjd : `float`
1285 Epoch time for partition 0.
1286 part_days : `int`
1287 Number of days per partition.
1288
1289 Returns
1290 -------
1291 partition : `int`
1292 Partition number for a given time.
1293 """
1294 if isinstance(time, astropy.time.Time):
1295 mjd = float(time.mjd)
1296 else:
1297 mjd = time
1298 days_since_epoch = mjd - epoch_mjd
1299 partition = int(days_since_epoch) // part_days
1300 return partition
1301

◆ _versionCheck()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._versionCheck ( self,
ApdbMetadataCassandra metadata )
protected
Check schema version compatibility.

Definition at line 397 of file apdbCassandra.py.

397 def _versionCheck(self, metadata: ApdbMetadataCassandra) -> None:
398 """Check schema version compatibility."""
399
400 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
401 """Retrieve version number from given metadata key."""
402 if metadata.table_exists():
403 version_str = metadata.get(key)
404 if version_str is None:
405 # Should not happen with existing metadata table.
406 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
407 return VersionTuple.fromString(version_str)
408 return default
409
410 # For old databases where metadata table does not exist we assume that
411 # version of both code and schema is 0.1.0.
412 initial_version = VersionTuple(0, 1, 0)
413 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
414 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
415
416 # For now there is no way to make read-only APDB instances, assume that
417 # any access can do updates.
418 if not self._schema.schemaVersion().checkCompatibility(db_schema_version, True):
419 raise IncompatibleVersionError(
420 f"Configured schema version {self._schema.schemaVersion()} "
421 f"is not compatible with database version {db_schema_version}"
422 )
423 if not self.apdbImplementationVersion().checkCompatibility(db_code_version, True):
424 raise IncompatibleVersionError(
425 f"Current code version {self.apdbImplementationVersion()} "
426 f"is not compatible with database version {db_code_version}"
427 )
428
429 # Check replica code version only if replica is enabled.
430 if self._schema.has_replica_chunks:
431 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
432 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
433 if not code_replica_version.checkCompatibility(db_replica_version, True):
434 raise IncompatibleVersionError(
435 f"Current replication code version {code_replica_version} "
436 f"is not compatible with database version {db_replica_version}"
437 )
438

◆ apdbImplementationVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.apdbImplementationVersion ( cls)
Return version number for current APDB implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 440 of file apdbCassandra.py.

440 def apdbImplementationVersion(cls) -> VersionTuple:
441 # Docstring inherited from base class.
442 return VERSION
443

◆ apdbSchemaVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.apdbSchemaVersion ( self)
Return schema version number as defined in config file.

Returns
-------
version : `VersionTuple`
    Version of the schema defined in schema config file.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 444 of file apdbCassandra.py.

444 def apdbSchemaVersion(self) -> VersionTuple:
445 # Docstring inherited from base class.
446 return self._schema.schemaVersion()
447

◆ containsVisitDetector()

bool lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.containsVisitDetector ( self,
int visit,
int detector )
Test whether data for a given visit-detector is present in the APDB.

Parameters
----------
visit, detector : `int`
    The ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaObject, DiaSource, or DiaForcedSource records
    exist for the specified observation, `False` otherwise.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 709 of file apdbCassandra.py.

709 def containsVisitDetector(self, visit: int, detector: int) -> bool:
710 # docstring is inherited from a base class
711 raise NotImplementedError()
712

◆ countUnassociatedObjects()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.countUnassociatedObjects ( self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 850 of file apdbCassandra.py.

850 def countUnassociatedObjects(self) -> int:
851 # docstring is inherited from a base class
852
853 # It's too inefficient to implement it for Cassandra in current schema.
854 raise NotImplementedError()
855

◆ dailyJob()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.dailyJob ( self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 846 of file apdbCassandra.py.

846 def dailyJob(self) -> None:
847 # docstring is inherited from a base class
848 pass
849

◆ get_replica()

ApdbCassandraReplica lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.get_replica ( self)
Return `ApdbReplica` instance for this database.

Definition at line 586 of file apdbCassandra.py.

586 def get_replica(self) -> ApdbCassandraReplica:
587 """Return `ApdbReplica` instance for this database."""
588 # Note that this instance has to stay alive while replica exists, so
589 # we pass reference to self.
590 return ApdbCassandraReplica(self, self._schema, self._session)
591

◆ getDiaForcedSources()

pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaForcedSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained. Some implementations may not support latter case.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_forced_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 697 of file apdbCassandra.py.

699 ) -> pandas.DataFrame | None:
700 # docstring is inherited from a base class
701 months = self.config.read_forced_sources_months
702 if months == 0:
703 return None
704 mjd_end = visit_time.mjd
705 mjd_start = mjd_end - months * 30
706
707 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
708

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaObjects ( self,
sphgeom.Region region )
Return catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 649 of file apdbCassandra.py.

649 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
650 # docstring is inherited from a base class
651
652 sp_where = self._spatial_where(region)
653 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
654
655 # We need to exclude extra partitioning columns from result.
656 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
657 what = ",".join(quote_id(column) for column in column_names)
658
659 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
660 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
661 statements: list[tuple] = []
662 for where, params in sp_where:
663 full_query = f"{query} WHERE {where}"
664 if params:
665 statement = self._preparer.prepare(full_query)
666 else:
667 # If there are no params then it is likely that query has a
668 # bunch of literals rendered already, no point trying to
669 # prepare it because it's not reusable.
670 statement = cassandra.query.SimpleStatement(full_query)
671 statements.append((statement, params))
672 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
673
674 with Timer("DiaObject select", self.config.timer):
675 objects = cast(
676 pandas.DataFrame,
677 select_concurrent(
678 self._session, statements, "read_pandas_multi", self.config.read_concurrency
679 ),
680 )
681
682 _LOG.debug("found %s DiaObjects", objects.shape[0])
683 return objects
684

◆ getDiaSources()

pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 685 of file apdbCassandra.py.

687 ) -> pandas.DataFrame | None:
688 # docstring is inherited from a base class
689 months = self.config.read_sources_months
690 if months == 0:
691 return None
692 mjd_end = visit_time.mjd
693 mjd_start = mjd_end - months * 30
694
695 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
696

◆ getSSObjects()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getSSObjects ( self)
Return catalog of SSObject instances.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing SSObject records, all existing records are
    returned.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 713 of file apdbCassandra.py.

713 def getSSObjects(self) -> pandas.DataFrame:
714 # docstring is inherited from a base class
715 tableName = self._schema.tableName(ApdbTables.SSObject)
716 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
717
718 objects = None
719 with Timer("SSObject select", self.config.timer):
720 result = self._session.execute(query, execution_profile="read_pandas")
721 objects = result._current_rows
722
723 _LOG.debug("found %s DiaObjects", objects.shape[0])
724 return objects
725

◆ init_database()

ApdbCassandraConfig lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.init_database ( cls,
list[str] hosts,
str keyspace,
*str | None schema_file = None,
str | None schema_name = None,
int | None read_sources_months = None,
int | None read_forced_sources_months = None,
bool use_insert_id = False,
bool replica_skips_diaobjects = False,
int | None port = None,
str | None username = None,
str | None prefix = None,
str | None part_pixelization = None,
int | None part_pix_level = None,
bool time_partition_tables = True,
str | None time_partition_start = None,
str | None time_partition_end = None,
str | None read_consistency = None,
str | None write_consistency = None,
int | None read_timeout = None,
int | None write_timeout = None,
list[str] | None ra_dec_columns = None,
int | None replication_factor = None,
bool drop = False )
Initialize new APDB instance and make configuration object for it.

Parameters
----------
hosts : `list` [`str`]
    List of host names or IP addresses for Cassandra cluster.
keyspace : `str`
    Name of the keyspace for APDB tables.
schema_file : `str`, optional
    Location of (YAML) configuration file with APDB schema. If not
    specified then default location will be used.
schema_name : `str`, optional
    Name of the schema in YAML configuration file. If not specified
    then default name will be used.
read_sources_months : `int`, optional
    Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
    Number of months of history to read from DiaForcedSource.
use_insert_id : `bool`, optional
    If True, make additional tables used for replication to PPDB.
replica_skips_diaobjects : `bool`, optional
    If `True` then do not fill regular ``DiaObject`` table when
    ``use_insert_id`` is `True`.
port : `int`, optional
    Port number to use for Cassandra connections.
username : `str`, optional
    User name for Cassandra connections.
prefix : `str`, optional
    Optional prefix for all table names.
part_pixelization : `str`, optional
    Name of the MOC pixelization used for partitioning.
part_pix_level : `int`, optional
    Pixelization level.
time_partition_tables : `bool`, optional
    Create per-partition tables.
time_partition_start : `str`, optional
    Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
    format, in TAI.
time_partition_end : `str`, optional
    Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
    format, in TAI.
read_consistency : `str`, optional
    Name of the consistency level for read operations.
write_consistency : `str`, optional
    Name of the consistency level for write operations.
read_timeout : `int`, optional
    Read timeout in seconds.
write_timeout : `int`, optional
    Write timeout in seconds.
ra_dec_columns : `list` [`str`], optional
    Names of ra/dec columns in DiaObject table.
replication_factor : `int`, optional
    Replication factor used when creating new keyspace, if keyspace
    already exists its replication factor is not changed.
drop : `bool`, optional
    If `True` then drop existing tables before re-creating the schema.

Returns
-------
config : `ApdbCassandraConfig`
    Resulting configuration object for a created APDB instance.

Definition at line 453 of file apdbCassandra.py.

479 ) -> ApdbCassandraConfig:
480 """Initialize new APDB instance and make configuration object for it.
481
482 Parameters
483 ----------
484 hosts : `list` [`str`]
485 List of host names or IP addresses for Cassandra cluster.
486 keyspace : `str`
487 Name of the keyspace for APDB tables.
488 schema_file : `str`, optional
489 Location of (YAML) configuration file with APDB schema. If not
490 specified then default location will be used.
491 schema_name : `str`, optional
492 Name of the schema in YAML configuration file. If not specified
493 then default name will be used.
494 read_sources_months : `int`, optional
495 Number of months of history to read from DiaSource.
496 read_forced_sources_months : `int`, optional
497 Number of months of history to read from DiaForcedSource.
498 use_insert_id : `bool`, optional
499 If True, make additional tables used for replication to PPDB.
500 replica_skips_diaobjects : `bool`, optional
501 If `True` then do not fill regular ``DiaObject`` table when
502 ``use_insert_id`` is `True`.
503 port : `int`, optional
504 Port number to use for Cassandra connections.
505 username : `str`, optional
506 User name for Cassandra connections.
507 prefix : `str`, optional
508 Optional prefix for all table names.
509 part_pixelization : `str`, optional
510 Name of the MOC pixelization used for partitioning.
511 part_pix_level : `int`, optional
512 Pixelization level.
513 time_partition_tables : `bool`, optional
514 Create per-partition tables.
515 time_partition_start : `str`, optional
516 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
517 format, in TAI.
518 time_partition_end : `str`, optional
519 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
520 format, in TAI.
521 read_consistency : `str`, optional
522 Name of the consistency level for read operations.
523 write_consistency : `str`, optional
524 Name of the consistency level for write operations.
525 read_timeout : `int`, optional
526 Read timeout in seconds.
527 write_timeout : `int`, optional
528 Write timeout in seconds.
529 ra_dec_columns : `list` [`str`], optional
530 Names of ra/dec columns in DiaObject table.
531 replication_factor : `int`, optional
532 Replication factor used when creating new keyspace, if keyspace
533 already exists its replication factor is not changed.
534 drop : `bool`, optional
535 If `True` then drop existing tables before re-creating the schema.
536
537 Returns
538 -------
539 config : `ApdbCassandraConfig`
540 Resulting configuration object for a created APDB instance.
541 """
542 config = ApdbCassandraConfig(
543 contact_points=hosts,
544 keyspace=keyspace,
545 use_insert_id=use_insert_id,
546 use_insert_id_skips_diaobjects=replica_skips_diaobjects,
547 time_partition_tables=time_partition_tables,
548 )
549 if schema_file is not None:
550 config.schema_file = schema_file
551 if schema_name is not None:
552 config.schema_name = schema_name
553 if read_sources_months is not None:
554 config.read_sources_months = read_sources_months
555 if read_forced_sources_months is not None:
556 config.read_forced_sources_months = read_forced_sources_months
557 if port is not None:
558 config.port = port
559 if username is not None:
560 config.username = username
561 if prefix is not None:
562 config.prefix = prefix
563 if part_pixelization is not None:
564 config.part_pixelization = part_pixelization
565 if part_pix_level is not None:
566 config.part_pix_level = part_pix_level
567 if time_partition_start is not None:
568 config.time_partition_start = time_partition_start
569 if time_partition_end is not None:
570 config.time_partition_end = time_partition_end
571 if read_consistency is not None:
572 config.read_consistency = read_consistency
573 if write_consistency is not None:
574 config.write_consistency = write_consistency
575 if read_timeout is not None:
576 config.read_timeout = read_timeout
577 if write_timeout is not None:
578 config.write_timeout = write_timeout
579 if ra_dec_columns is not None:
580 config.ra_dec_columns = ra_dec_columns
581
582 cls._makeSchema(config, drop=drop, replication_factor=replication_factor)
583
584 return config
585

◆ metadata()

ApdbMetadata lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadata ( self)
Object controlling access to APDB metadata (`ApdbMetadata`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 857 of file apdbCassandra.py.

857 def metadata(self) -> ApdbMetadata:
858 # docstring is inherited from a base class
859 if self._metadata is None:
860 raise RuntimeError("Database schema was not initialized.")
861 return self._metadata
862

◆ reassignDiaSources()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.reassignDiaSources ( self,
Mapping[int, int] idMap )
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.

Parameters
----------
idMap : `Mapping`
    Maps DiaSource IDs to their new SSObject IDs.

Raises
------
ValueError
    Raised if DiaSource ID does not exist in the database.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 758 of file apdbCassandra.py.

758 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
759 # docstring is inherited from a base class
760
761 # To update a record we need to know its exact primary key (including
762 # partition key) so we start by querying for diaSourceId to find the
763 # primary keys.
764
765 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
766 # split it into 1k IDs per query
767 selects: list[tuple] = []
768 for ids in chunk_iterable(idMap.keys(), 1_000):
769 ids_str = ",".join(str(item) for item in ids)
770 selects.append(
771 (
772 (
773 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
774 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
775 ),
776 {},
777 )
778 )
779
780 # No need for DataFrame here, read data as tuples.
781 result = cast(
782 list[tuple[int, int, int, int | None]],
783 select_concurrent(self._session, selects, "read_tuples", self.config.read_concurrency),
784 )
785
786 # Make mapping from source ID to its partition.
787 id2partitions: dict[int, tuple[int, int]] = {}
788 id2chunk_id: dict[int, int] = {}
789 for row in result:
790 id2partitions[row[0]] = row[1:3]
791 if row[3] is not None:
792 id2chunk_id[row[0]] = row[3]
793
794 # make sure we know partitions for each ID
795 if set(id2partitions) != set(idMap):
796 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
797 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
798
799 # Reassign in standard tables
800 queries = cassandra.query.BatchStatement()
801 table_name = self._schema.tableName(ApdbTables.DiaSource)
802 for diaSourceId, ssObjectId in idMap.items():
803 apdb_part, apdb_time_part = id2partitions[diaSourceId]
804 values: tuple
805 if self.config.time_partition_tables:
806 query = (
807 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
808 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
809 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
810 )
811 values = (ssObjectId, apdb_part, diaSourceId)
812 else:
813 query = (
814 f'UPDATE "{self._keyspace}"."{table_name}"'
815 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
816 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
817 )
818 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
819 queries.add(self._preparer.prepare(query), values)
820
821 # Reassign in replica tables, only if replication is enabled
822 if id2chunk_id:
823 # Filter out chunks that have been deleted already. There is a
824 # potential race with concurrent removal of chunks, but it
825 # should be handled by WHERE in UPDATE.
826 known_ids = set()
827 if replica_chunks := self.get_replica().getReplicaChunks():
828 known_ids = set(replica_chunk.id for replica_chunk in replica_chunks)
829 id2chunk_id = {key: value for key, value in id2chunk_id.items() if value in known_ids}
830 if id2chunk_id:
831 table_name = self._schema.tableName(ExtraTables.DiaSourceChunks)
832 for diaSourceId, ssObjectId in idMap.items():
833 if replica_chunk := id2chunk_id.get(diaSourceId):
834 query = (
835 f'UPDATE "{self._keyspace}"."{table_name}" '
836 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
837 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
838 )
839 values = (ssObjectId, replica_chunk, diaSourceId)
840 queries.add(self._preparer.prepare(query), values)
841
842 _LOG.debug("%s: will update %d records", table_name, len(idMap))
843 with Timer(table_name + " update", self.config.timer):
844 self._session.execute(queries, execution_profile="write")
845

◆ store()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.store ( self,
astropy.time.Time visit_time,
pandas.DataFrame objects,
pandas.DataFrame | None sources = None,
pandas.DataFrame | None forced_sources = None )
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `astropy.time.Time`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 726 of file apdbCassandra.py.

732 ) -> None:
733 # docstring is inherited from a base class
734
735 replica_chunk: ReplicaChunk | None = None
736 if self._schema.has_replica_chunks:
737 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
738 self._storeReplicaChunk(replica_chunk, visit_time)
739
740 # fill region partition column for DiaObjects
741 objects = self._add_obj_part(objects)
742 self._storeDiaObjects(objects, visit_time, replica_chunk)
743
744 if sources is not None:
745 # copy apdb_part column from DiaObjects to DiaSources
746 sources = self._add_src_part(sources, objects)
747 self._storeDiaSources(ApdbTables.DiaSource, sources, visit_time, replica_chunk)
748 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
749
750 if forced_sources is not None:
751 forced_sources = self._add_fsrc_part(forced_sources, objects)
752 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time, replica_chunk)
753

◆ storeSSObjects()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.storeSSObjects ( self,
pandas.DataFrame objects )
Store or update SSObject catalog.

Parameters
----------
objects : `pandas.DataFrame`
    Catalog with SSObject records.

Notes
-----
If SSObjects with matching IDs already exist in the database, their
records will be updated with the information from provided records.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 754 of file apdbCassandra.py.

754 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
755 # docstring is inherited from a base class
756 self._storeObjectsPandas(objects, ApdbTables.SSObject)
757

◆ tableDef()

Table | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.tableDef ( self,
ApdbTables table )
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `.schema_model.Table` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 448 of file apdbCassandra.py.

448 def tableDef(self, table: ApdbTables) -> Table | None:
449 # docstring is inherited from a base class
450 return self._schema.tableSchemas.get(table)
451

Member Data Documentation

◆ _cluster

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._cluster
protected

Definition at line 295 of file apdbCassandra.py.

◆ _frozen_parameters

tuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._frozen_parameters
staticprotected
Initial value:
= (
"use_insert_id",
"part_pixelization",
"part_pix_level",
"ra_dec_columns",
"time_partition_tables",
"time_partition_days",
"use_insert_id_skips_diaobjects",
)

Definition at line 275 of file apdbCassandra.py.

◆ _keyspace

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._keyspace
protected

Definition at line 293 of file apdbCassandra.py.

◆ _metadata

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._metadata
protected

Definition at line 298 of file apdbCassandra.py.

◆ _partition_zero_epoch_mjd

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._partition_zero_epoch_mjd
protected

Definition at line 327 of file apdbCassandra.py.

◆ _pixelization

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._pixelization
protected

Definition at line 312 of file apdbCassandra.py.

◆ _preparer

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._preparer
protected

Definition at line 333 of file apdbCassandra.py.

◆ _schema

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._schema
protected

Definition at line 318 of file apdbCassandra.py.

◆ _session

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._session
protected

Definition at line 295 of file apdbCassandra.py.

◆ config

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.config

Definition at line 307 of file apdbCassandra.py.

◆ metadataCodeVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataCodeVersionKey = "version:ApdbCassandra"
static

Definition at line 266 of file apdbCassandra.py.

◆ metadataCodeVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataCodeVersionKey

Definition at line 633 of file apdbCassandra.py.

◆ metadataConfigKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataConfigKey = "config:apdb-cassandra.json"
static

Definition at line 272 of file apdbCassandra.py.

◆ metadataConfigKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataConfigKey

Definition at line 645 of file apdbCassandra.py.

◆ metadataReplicaVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataReplicaVersionKey = "version:ApdbCassandraReplica"
static

Definition at line 269 of file apdbCassandra.py.

◆ metadataReplicaVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataReplicaVersionKey

Definition at line 638 of file apdbCassandra.py.

◆ metadataSchemaVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataSchemaVersionKey = "version:schema"
static

Definition at line 263 of file apdbCassandra.py.

◆ metadataSchemaVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataSchemaVersionKey

Definition at line 632 of file apdbCassandra.py.

◆ partition_zero_epoch

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
static

Definition at line 286 of file apdbCassandra.py.


The documentation for this class was generated from the following file: