LSST Applications g180d380827+6621f76652,g2079a07aa2+86d27d4dc4,g2305ad1205+f5a9e323a1,g2bbee38e9b+c6a8a0fb72,g337abbeb29+c6a8a0fb72,g33d1c0ed96+c6a8a0fb72,g3a166c0a6a+c6a8a0fb72,g3ddfee87b4+9a10e1fe7b,g48712c4677+c9a099281a,g487adcacf7+f2e03ea30b,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+aead732c78,g64a986408d+eddffb812c,g858d7b2824+eddffb812c,g864b0138d7+aa38e45daa,g974c55ee3d+f37bf00e57,g99cad8db69+119519a52d,g9c22b2923f+e2510deafe,g9ddcbc5298+9a081db1e4,ga1e77700b3+03d07e1c1f,gb0e22166c9+60f28cb32d,gb23b769143+eddffb812c,gba4ed39666+c2a2e4ac27,gbb8dafda3b+27317ec8e9,gbd998247f1+585e252eca,gc120e1dc64+5817c176a8,gc28159a63d+c6a8a0fb72,gc3e9b769f7+6707aea8b4,gcf0d15dbbd+9a10e1fe7b,gdaeeff99f8+f9a426f77a,ge6526c86ff+6a2e01d432,ge79ae78c31+c6a8a0fb72,gee10cc3b42+585e252eca,gff1a9f87cc+eddffb812c,v27.0.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Static Protected Attributes | List of all members
lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra Class Reference
Inheritance diagram for lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

 __init__ (self, ApdbCassandraConfig config)
 
None __del__ (self)
 
VersionTuple apdbImplementationVersion (cls)
 
VersionTuple apdbSchemaVersion (self)
 
Table|None tableDef (self, ApdbTables table)
 
ApdbCassandraConfig init_database (cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool use_insert_id_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False)
 
ApdbCassandraReplica get_replica (self)
 
pandas.DataFrame getDiaObjects (self, sphgeom.Region region)
 
pandas.DataFrame|None getDiaSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame|None getDiaForcedSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
bool containsVisitDetector (self, int visit, int detector)
 
pandas.DataFrame getSSObjects (self)
 
None store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
 
None storeSSObjects (self, pandas.DataFrame objects)
 
None reassignDiaSources (self, Mapping[int, int] idMap)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ApdbMetadata metadata (self)
 

Public Attributes

 config
 
 metadataSchemaVersionKey
 
 metadataCodeVersionKey
 
 metadataReplicaVersionKey
 
 metadataConfigKey
 

Static Public Attributes

str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbCassandra"
 
str metadataReplicaVersionKey = "version:ApdbCassandraReplica"
 
str metadataConfigKey = "config:apdb-cassandra.json"
 
 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
 

Protected Member Functions

Timer _timer (self, str name, *Mapping[str, str|int]|None tags=None)
 
tuple[Cluster, Session] _make_session (cls, ApdbCassandraConfig config)
 
AuthProvider|None _make_auth_provider (cls, ApdbCassandraConfig config)
 
None _versionCheck (self, ApdbMetadataCassandra metadata)
 
None _makeSchema (cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None)
 
Mapping[Any, ExecutionProfile] _makeProfiles (cls, ApdbCassandraConfig config)
 
pandas.DataFrame _getSources (self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
 
None _storeReplicaChunk (self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
 
None _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeDiaSources (self, ApdbTables table_name, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeDiaSourcesPartitions (self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
 
None _storeObjectsPandas (self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
 
pandas.DataFrame _add_obj_part (self, pandas.DataFrame df)
 
pandas.DataFrame _add_src_part (self, pandas.DataFrame sources, pandas.DataFrame objs)
 
pandas.DataFrame _add_fsrc_part (self, pandas.DataFrame sources, pandas.DataFrame objs)
 
int _time_partition_cls (cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
 
int _time_partition (self, float|astropy.time.Time time)
 
pandas.DataFrame _make_empty_catalog (self, ApdbTables table_name)
 
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where (self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
 
list[tuple[str, tuple]] _spatial_where (self, sphgeom.Region|None region, bool use_ranges=False)
 
tuple[list[str], list[tuple[str, tuple]]] _temporal_where (self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
 

Protected Attributes

 _keyspace
 
 _cluster
 
 _session
 
 _metadata
 
 _pixelization
 
 _schema
 
 _partition_zero_epoch_mjd
 
 _preparer
 
 _timer_args
 

Static Protected Attributes

tuple _frozen_parameters
 

Detailed Description

Implementation of APDB database on to of Apache Cassandra.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbCassandraConfig` configuration class. For an example of
different configurations check config/ folder.

Parameters
----------
config : `ApdbCassandraConfig`
    Configuration object.

Definition at line 253 of file apdbCassandra.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.__init__ ( self,
ApdbCassandraConfig config )

Definition at line 292 of file apdbCassandra.py.

292 def __init__(self, config: ApdbCassandraConfig):
293 if not CASSANDRA_IMPORTED:
294 raise CassandraMissingError()
295
296 self._keyspace = config.keyspace
297
298 self._cluster, self._session = self._make_session(config)
299
300 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
301 self._metadata = ApdbMetadataCassandra(
302 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
303 )
304
305 # Read frozen config from metadata.
306 config_json = self._metadata.get(self.metadataConfigKey)
307 if config_json is not None:
308 # Update config from metadata.
309 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
310 self.config = freezer.update(config, config_json)
311 else:
312 self.config = config
313 self.config.validate()
314
315 self._pixelization = Pixelization(
316 self.config.part_pixelization,
317 self.config.part_pix_level,
318 config.part_pix_max_ranges,
319 )
320
321 self._schema = ApdbCassandraSchema(
322 session=self._session,
323 keyspace=self._keyspace,
324 schema_file=self.config.schema_file,
325 schema_name=self.config.schema_name,
326 prefix=self.config.prefix,
327 time_partition_tables=self.config.time_partition_tables,
328 enable_replica=self.config.use_insert_id,
329 )
330 self._partition_zero_epoch_mjd = float(self.partition_zero_epoch.mjd)
331
332 if self._metadata.table_exists():
333 self._versionCheck(self._metadata)
334
335 # Cache for prepared statements
336 self._preparer = PreparedStatementCache(self._session)
337
338 _LOG.debug("ApdbCassandra Configuration:")
339 for key, value in self.config.items():
340 _LOG.debug(" %s: %s", key, value)
341
342 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
343 if self.config.timer:
344 self._timer_args.append(_LOG)
345
std::vector< SchemaItem< Flag > > * items

◆ __del__()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.__del__ ( self)

Definition at line 346 of file apdbCassandra.py.

346 def __del__(self) -> None:
347 if hasattr(self, "_cluster"):
348 self._cluster.shutdown()
349

Member Function Documentation

◆ _add_fsrc_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_fsrc_part ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add apdb_part column to DiaForcedSource catalog.

Notes
-----
This method copies apdb_part value from a matching DiaObject record.
DiaObject catalog needs to have a apdb_part column filled by
``_add_obj_part`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1270 of file apdbCassandra.py.

1270 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1271 """Add apdb_part column to DiaForcedSource catalog.
1272
1273 Notes
1274 -----
1275 This method copies apdb_part value from a matching DiaObject record.
1276 DiaObject catalog needs to have a apdb_part column filled by
1277 ``_add_obj_part`` method and DiaSource records need to be
1278 associated to DiaObjects via ``diaObjectId`` column.
1279
1280 This overrides any existing column in a DataFrame with the same name
1281 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1282 returned.
1283 """
1284 pixel_id_map: dict[int, int] = {
1285 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1286 }
1287 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1288 for i, diaObjId in enumerate(sources["diaObjectId"]):
1289 apdb_part[i] = pixel_id_map[diaObjId]
1290 sources = sources.copy()
1291 sources["apdb_part"] = apdb_part
1292 return sources
1293

◆ _add_obj_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_obj_part ( self,
pandas.DataFrame df )
protected
Calculate spatial partition for each record and add it to a
DataFrame.

Notes
-----
This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1214 of file apdbCassandra.py.

1214 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1215 """Calculate spatial partition for each record and add it to a
1216 DataFrame.
1217
1218 Notes
1219 -----
1220 This overrides any existing column in a DataFrame with the same name
1221 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1222 returned.
1223 """
1224 # calculate HTM index for every DiaObject
1225 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1226 ra_col, dec_col = self.config.ra_dec_columns
1227 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1228 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1229 idx = self._pixelization.pixel(uv3d)
1230 apdb_part[i] = idx
1231 df = df.copy()
1232 df["apdb_part"] = apdb_part
1233 return df
1234

◆ _add_src_part()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._add_src_part ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add apdb_part column to DiaSource catalog.

Notes
-----
This method copies apdb_part value from a matching DiaObject record.
DiaObject catalog needs to have a apdb_part column filled by
``_add_obj_part`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(apdb_part). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1235 of file apdbCassandra.py.

1235 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1236 """Add apdb_part column to DiaSource catalog.
1237
1238 Notes
1239 -----
1240 This method copies apdb_part value from a matching DiaObject record.
1241 DiaObject catalog needs to have a apdb_part column filled by
1242 ``_add_obj_part`` method and DiaSource records need to be
1243 associated to DiaObjects via ``diaObjectId`` column.
1244
1245 This overrides any existing column in a DataFrame with the same name
1246 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1247 returned.
1248 """
1249 pixel_id_map: dict[int, int] = {
1250 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1251 }
1252 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1253 ra_col, dec_col = self.config.ra_dec_columns
1254 for i, (diaObjId, ra, dec) in enumerate(
1255 zip(sources["diaObjectId"], sources[ra_col], sources[dec_col])
1256 ):
1257 if diaObjId == 0:
1258 # DiaSources associated with SolarSystemObjects do not have an
1259 # associated DiaObject hence we skip them and set partition
1260 # based on its own ra/dec
1261 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1262 idx = self._pixelization.pixel(uv3d)
1263 apdb_part[i] = idx
1264 else:
1265 apdb_part[i] = pixel_id_map[diaObjId]
1266 sources = sources.copy()
1267 sources["apdb_part"] = apdb_part
1268 return sources
1269

◆ _combine_where()

Iterator[tuple[cassandra.query.Statement, tuple]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._combine_where ( self,
str prefix,
list[tuple[str, tuple]] where1,
list[tuple[str, tuple]] where2,
str | None suffix = None )
protected
Make cartesian product of two parts of WHERE clause into a series
of statements to execute.

Parameters
----------
prefix : `str`
    Initial statement prefix that comes before WHERE clause, e.g.
    "SELECT * from Table"

Definition at line 1364 of file apdbCassandra.py.

1370 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1371 """Make cartesian product of two parts of WHERE clause into a series
1372 of statements to execute.
1373
1374 Parameters
1375 ----------
1376 prefix : `str`
1377 Initial statement prefix that comes before WHERE clause, e.g.
1378 "SELECT * from Table"
1379 """
1380 # If lists are empty use special sentinels.
1381 if not where1:
1382 where1 = [("", ())]
1383 if not where2:
1384 where2 = [("", ())]
1385
1386 for expr1, params1 in where1:
1387 for expr2, params2 in where2:
1388 full_query = prefix
1389 wheres = []
1390 if expr1:
1391 wheres.append(expr1)
1392 if expr2:
1393 wheres.append(expr2)
1394 if wheres:
1395 full_query += " WHERE " + " AND ".join(wheres)
1396 if suffix:
1397 full_query += " " + suffix
1398 params = params1 + params2
1399 if params:
1400 statement = self._preparer.prepare(full_query)
1401 else:
1402 # If there are no params then it is likely that query
1403 # has a bunch of literals rendered already, no point
1404 # trying to prepare it.
1405 statement = cassandra.query.SimpleStatement(full_query)
1406 yield (statement, params)
1407

◆ _getSources()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._getSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
float mjd_start,
float mjd_end,
ApdbTables table_name )
protected
Return catalog of DiaSource instances given set of DiaObject IDs.

Parameters
----------
region : `lsst.sphgeom.Region`
    Spherical region.
object_ids :
    Collection of DiaObject IDs
mjd_start : `float`
    Lower bound of time interval.
mjd_end : `float`
    Upper bound of time interval.
table_name : `ApdbTables`
    Name of the table.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. Empty catalog is returned if
    ``object_ids`` is empty.

Definition at line 938 of file apdbCassandra.py.

945 ) -> pandas.DataFrame:
946 """Return catalog of DiaSource instances given set of DiaObject IDs.
947
948 Parameters
949 ----------
950 region : `lsst.sphgeom.Region`
951 Spherical region.
952 object_ids :
953 Collection of DiaObject IDs
954 mjd_start : `float`
955 Lower bound of time interval.
956 mjd_end : `float`
957 Upper bound of time interval.
958 table_name : `ApdbTables`
959 Name of the table.
960
961 Returns
962 -------
963 catalog : `pandas.DataFrame`, or `None`
964 Catalog containing DiaSource records. Empty catalog is returned if
965 ``object_ids`` is empty.
966 """
967 object_id_set: Set[int] = set()
968 if object_ids is not None:
969 object_id_set = set(object_ids)
970 if len(object_id_set) == 0:
971 return self._make_empty_catalog(table_name)
972
973 sp_where = self._spatial_where(region)
974 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
975
976 # We need to exclude extra partitioning columns from result.
977 column_names = self._schema.apdbColumnNames(table_name)
978 what = ",".join(quote_id(column) for column in column_names)
979
980 # Build all queries
981 statements: list[tuple] = []
982 for table in tables:
983 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
984 statements += list(self._combine_where(prefix, sp_where, temporal_where))
985 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
986
987 with _MON.context_tags({"table": table_name.name}):
988 _MON.add_record(
989 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
990 )
991 with self._timer("select_time"):
992 catalog = cast(
993 pandas.DataFrame,
994 select_concurrent(
995 self._session, statements, "read_pandas_multi", self.config.read_concurrency
996 ),
997 )
998
999 # filter by given object IDs
1000 if len(object_id_set) > 0:
1001 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
1002
1003 # precise filtering on midpointMjdTai
1004 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
1005
1006 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
1007 return catalog
1008
daf::base::PropertySet * set
Definition fits.cc:931

◆ _make_auth_provider()

AuthProvider | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_auth_provider ( cls,
ApdbCassandraConfig config )
protected
Make Cassandra authentication provider instance.

Definition at line 376 of file apdbCassandra.py.

376 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
377 """Make Cassandra authentication provider instance."""
378 try:
379 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
380 except DbAuthNotFoundError:
381 # Credentials file doesn't exist, use anonymous login.
382 return None
383
384 empty_username = True
385 # Try every contact point in turn.
386 for hostname in config.contact_points:
387 try:
388 username, password = dbauth.getAuth(
389 "cassandra", config.username, hostname, config.port, config.keyspace
390 )
391 if not username:
392 # Password without user name, try next hostname, but give
393 # warning later if no better match is found.
394 empty_username = True
395 else:
396 return PlainTextAuthProvider(username=username, password=password)
397 except DbAuthNotFoundError:
398 pass
399
400 if empty_username:
401 _LOG.warning(
402 f"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
403 f"user name, anonymous Cassandra logon will be attempted."
404 )
405
406 return None
407

◆ _make_empty_catalog()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_empty_catalog ( self,
ApdbTables table_name )
protected
Make an empty catalog for a table with a given name.

Parameters
----------
table_name : `ApdbTables`
    Name of the table.

Returns
-------
catalog : `pandas.DataFrame`
    An empty catalog.

Definition at line 1343 of file apdbCassandra.py.

1343 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1344 """Make an empty catalog for a table with a given name.
1345
1346 Parameters
1347 ----------
1348 table_name : `ApdbTables`
1349 Name of the table.
1350
1351 Returns
1352 -------
1353 catalog : `pandas.DataFrame`
1354 An empty catalog.
1355 """
1356 table = self._schema.tableSchemas[table_name]
1357
1358 data = {
1359 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1360 for columnDef in table.columns
1361 }
1362 return pandas.DataFrame(data)
1363

◆ _make_session()

tuple[Cluster, Session] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._make_session ( cls,
ApdbCassandraConfig config )
protected
Make Cassandra session.

Definition at line 355 of file apdbCassandra.py.

355 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
356 """Make Cassandra session."""
357 addressTranslator: AddressTranslator | None = None
358 if config.private_ips:
359 addressTranslator = _AddressTranslator(list(config.contact_points), list(config.private_ips))
360
361 cluster = Cluster(
362 execution_profiles=cls._makeProfiles(config),
363 contact_points=config.contact_points,
364 port=config.port,
365 address_translator=addressTranslator,
366 protocol_version=config.protocol_version,
367 auth_provider=cls._make_auth_provider(config),
368 )
369 session = cluster.connect()
370 # Disable result paging
371 session.default_fetch_size = None
372
373 return cluster, session
374

◆ _makeProfiles()

Mapping[Any, ExecutionProfile] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._makeProfiles ( cls,
ApdbCassandraConfig config )
protected
Make all execution profiles used in the code.

Definition at line 879 of file apdbCassandra.py.

879 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
880 """Make all execution profiles used in the code."""
881 if config.private_ips:
882 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
883 else:
884 loadBalancePolicy = RoundRobinPolicy()
885
886 read_tuples_profile = ExecutionProfile(
887 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
888 request_timeout=config.read_timeout,
889 row_factory=cassandra.query.tuple_factory,
890 load_balancing_policy=loadBalancePolicy,
891 )
892 read_pandas_profile = ExecutionProfile(
893 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
894 request_timeout=config.read_timeout,
895 row_factory=pandas_dataframe_factory,
896 load_balancing_policy=loadBalancePolicy,
897 )
898 read_raw_profile = ExecutionProfile(
899 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
900 request_timeout=config.read_timeout,
901 row_factory=raw_data_factory,
902 load_balancing_policy=loadBalancePolicy,
903 )
904 # Profile to use with select_concurrent to return pandas data frame
905 read_pandas_multi_profile = ExecutionProfile(
906 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
907 request_timeout=config.read_timeout,
908 row_factory=pandas_dataframe_factory,
909 load_balancing_policy=loadBalancePolicy,
910 )
911 # Profile to use with select_concurrent to return raw data (columns and
912 # rows)
913 read_raw_multi_profile = ExecutionProfile(
914 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
915 request_timeout=config.read_timeout,
916 row_factory=raw_data_factory,
917 load_balancing_policy=loadBalancePolicy,
918 )
919 write_profile = ExecutionProfile(
920 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
921 request_timeout=config.write_timeout,
922 load_balancing_policy=loadBalancePolicy,
923 )
924 # To replace default DCAwareRoundRobinPolicy
925 default_profile = ExecutionProfile(
926 load_balancing_policy=loadBalancePolicy,
927 )
928 return {
929 "read_tuples": read_tuples_profile,
930 "read_pandas": read_pandas_profile,
931 "read_raw": read_raw_profile,
932 "read_pandas_multi": read_pandas_multi_profile,
933 "read_raw_multi": read_raw_multi_profile,
934 "write": write_profile,
935 EXEC_PROFILE_DEFAULT: default_profile,
936 }
937

◆ _makeSchema()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._makeSchema ( cls,
ApdbConfig config,
*bool drop = False,
int | None replication_factor = None )
protected

Definition at line 604 of file apdbCassandra.py.

606 ) -> None:
607 # docstring is inherited from a base class
608
609 if not isinstance(config, ApdbCassandraConfig):
610 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
611
612 cluster, session = cls._make_session(config)
613
614 schema = ApdbCassandraSchema(
615 session=session,
616 keyspace=config.keyspace,
617 schema_file=config.schema_file,
618 schema_name=config.schema_name,
619 prefix=config.prefix,
620 time_partition_tables=config.time_partition_tables,
621 enable_replica=config.use_insert_id,
622 )
623
624 # Ask schema to create all tables.
625 if config.time_partition_tables:
626 time_partition_start = astropy.time.Time(config.time_partition_start, format="isot", scale="tai")
627 time_partition_end = astropy.time.Time(config.time_partition_end, format="isot", scale="tai")
628 part_epoch = float(cls.partition_zero_epoch.mjd)
629 part_days = config.time_partition_days
630 part_range = (
631 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
632 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
633 )
634 schema.makeSchema(drop=drop, part_range=part_range, replication_factor=replication_factor)
635 else:
636 schema.makeSchema(drop=drop, replication_factor=replication_factor)
637
638 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
639 metadata = ApdbMetadataCassandra(session, meta_table_name, config.keyspace, "read_tuples", "write")
640
641 # Fill version numbers, overrides if they existed before.
642 if metadata.table_exists():
643 metadata.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
644 metadata.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
645
646 if config.use_insert_id:
647 # Only store replica code version if replica is enabled.
648 metadata.set(
649 cls.metadataReplicaVersionKey,
650 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
651 force=True,
652 )
653
654 # Store frozen part of a configuration in metadata.
655 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
656 metadata.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
657
658 cluster.shutdown()
659

◆ _spatial_where()

list[tuple[str, tuple]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._spatial_where ( self,
sphgeom.Region | None region,
bool use_ranges = False )
protected
Generate expressions for spatial part of WHERE clause.

Parameters
----------
region : `sphgeom.Region`
    Spatial region for query results.
use_ranges : `bool`
    If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
    p2") instead of exact list of pixels. Should be set to True for
    large regions covering very many pixels.

Returns
-------
expressions : `list` [ `tuple` ]
    Empty list is returned if ``region`` is `None`, otherwise a list
    of one or more (expression, parameters) tuples

Definition at line 1408 of file apdbCassandra.py.

1410 ) -> list[tuple[str, tuple]]:
1411 """Generate expressions for spatial part of WHERE clause.
1412
1413 Parameters
1414 ----------
1415 region : `sphgeom.Region`
1416 Spatial region for query results.
1417 use_ranges : `bool`
1418 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1419 p2") instead of exact list of pixels. Should be set to True for
1420 large regions covering very many pixels.
1421
1422 Returns
1423 -------
1424 expressions : `list` [ `tuple` ]
1425 Empty list is returned if ``region`` is `None`, otherwise a list
1426 of one or more (expression, parameters) tuples
1427 """
1428 if region is None:
1429 return []
1430 if use_ranges:
1431 pixel_ranges = self._pixelization.envelope(region)
1432 expressions: list[tuple[str, tuple]] = []
1433 for lower, upper in pixel_ranges:
1434 upper -= 1
1435 if lower == upper:
1436 expressions.append(('"apdb_part" = ?', (lower,)))
1437 else:
1438 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1439 return expressions
1440 else:
1441 pixels = self._pixelization.pixels(region)
1442 if self.config.query_per_spatial_part:
1443 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1444 else:
1445 pixels_str = ",".join([str(pix) for pix in pixels])
1446 return [(f'"apdb_part" IN ({pixels_str})', ())]
1447

◆ _storeDiaObjects()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaObjects ( self,
pandas.DataFrame objs,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store catalog of DiaObjects from current visit.

Parameters
----------
objs : `pandas.DataFrame`
    Catalog with DiaObject records
visit_time : `astropy.time.Time`
    Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
    Replica chunk identifier if replication is configured.

Definition at line 1030 of file apdbCassandra.py.

1032 ) -> None:
1033 """Store catalog of DiaObjects from current visit.
1034
1035 Parameters
1036 ----------
1037 objs : `pandas.DataFrame`
1038 Catalog with DiaObject records
1039 visit_time : `astropy.time.Time`
1040 Time of the current visit.
1041 replica_chunk : `ReplicaChunk` or `None`
1042 Replica chunk identifier if replication is configured.
1043 """
1044 if len(objs) == 0:
1045 _LOG.debug("No objects to write to database.")
1046 return
1047
1048 visit_time_dt = visit_time.datetime
1049 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1050 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1051
1052 extra_columns["validityStart"] = visit_time_dt
1053 time_part: int | None = self._time_partition(visit_time)
1054 if not self.config.time_partition_tables:
1055 extra_columns["apdb_time_part"] = time_part
1056 time_part = None
1057
1058 # Only store DiaObects if not doing replication or explicitly
1059 # configured to always store them.
1060 if replica_chunk is None or not self.config.use_insert_id_skips_diaobjects:
1061 self._storeObjectsPandas(
1062 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1063 )
1064
1065 if replica_chunk is not None:
1066 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1067 self._storeObjectsPandas(objs, ExtraTables.DiaObjectChunks, extra_columns=extra_columns)
1068

◆ _storeDiaSources()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaSources ( self,
ApdbTables table_name,
pandas.DataFrame sources,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store catalog of DIASources or DIAForcedSources from current visit.

Parameters
----------
table_name : `ApdbTables`
    Table where to store the data.
sources : `pandas.DataFrame`
    Catalog containing DiaSource records
visit_time : `astropy.time.Time`
    Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
    Replica chunk identifier if replication is configured.

Definition at line 1069 of file apdbCassandra.py.

1075 ) -> None:
1076 """Store catalog of DIASources or DIAForcedSources from current visit.
1077
1078 Parameters
1079 ----------
1080 table_name : `ApdbTables`
1081 Table where to store the data.
1082 sources : `pandas.DataFrame`
1083 Catalog containing DiaSource records
1084 visit_time : `astropy.time.Time`
1085 Time of the current visit.
1086 replica_chunk : `ReplicaChunk` or `None`
1087 Replica chunk identifier if replication is configured.
1088 """
1089 time_part: int | None = self._time_partition(visit_time)
1090 extra_columns: dict[str, Any] = {}
1091 if not self.config.time_partition_tables:
1092 extra_columns["apdb_time_part"] = time_part
1093 time_part = None
1094
1095 self._storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
1096
1097 if replica_chunk is not None:
1098 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1099 if table_name is ApdbTables.DiaSource:
1100 extra_table = ExtraTables.DiaSourceChunks
1101 else:
1102 extra_table = ExtraTables.DiaForcedSourceChunks
1103 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1104

◆ _storeDiaSourcesPartitions()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeDiaSourcesPartitions ( self,
pandas.DataFrame sources,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk )
protected
Store mapping of diaSourceId to its partitioning values.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaSource records
visit_time : `astropy.time.Time`
    Time of the current visit.

Definition at line 1105 of file apdbCassandra.py.

1107 ) -> None:
1108 """Store mapping of diaSourceId to its partitioning values.
1109
1110 Parameters
1111 ----------
1112 sources : `pandas.DataFrame`
1113 Catalog containing DiaSource records
1114 visit_time : `astropy.time.Time`
1115 Time of the current visit.
1116 """
1117 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1118 extra_columns = {
1119 "apdb_time_part": self._time_partition(visit_time),
1120 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1121 }
1122
1123 self._storeObjectsPandas(
1124 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1125 )
1126

◆ _storeObjectsPandas()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeObjectsPandas ( self,
pandas.DataFrame records,
ApdbTables | ExtraTables table_name,
Mapping | None extra_columns = None,
int | None time_part = None )
protected
Store generic objects.

Takes Pandas catalog and stores a bunch of records in a table.

Parameters
----------
records : `pandas.DataFrame`
    Catalog containing object records
table_name : `ApdbTables`
    Name of the table as defined in APDB schema.
extra_columns : `dict`, optional
    Mapping (column_name, column_value) which gives fixed values for
    columns in each row, overrides values in ``records`` if matching
    columns exist there.
time_part : `int`, optional
    If not `None` then insert into a per-partition table.

Notes
-----
If Pandas catalog contains additional columns not defined in table
schema they are ignored. Catalog does not have to contain all columns
defined in a table, but partition and clustering keys must be present
in a catalog or ``extra_columns``.

Definition at line 1127 of file apdbCassandra.py.

1133 ) -> None:
1134 """Store generic objects.
1135
1136 Takes Pandas catalog and stores a bunch of records in a table.
1137
1138 Parameters
1139 ----------
1140 records : `pandas.DataFrame`
1141 Catalog containing object records
1142 table_name : `ApdbTables`
1143 Name of the table as defined in APDB schema.
1144 extra_columns : `dict`, optional
1145 Mapping (column_name, column_value) which gives fixed values for
1146 columns in each row, overrides values in ``records`` if matching
1147 columns exist there.
1148 time_part : `int`, optional
1149 If not `None` then insert into a per-partition table.
1150
1151 Notes
1152 -----
1153 If Pandas catalog contains additional columns not defined in table
1154 schema they are ignored. Catalog does not have to contain all columns
1155 defined in a table, but partition and clustering keys must be present
1156 in a catalog or ``extra_columns``.
1157 """
1158 # use extra columns if specified
1159 if extra_columns is None:
1160 extra_columns = {}
1161 extra_fields = list(extra_columns.keys())
1162
1163 # Fields that will come from dataframe.
1164 df_fields = [column for column in records.columns if column not in extra_fields]
1165
1166 column_map = self._schema.getColumnMap(table_name)
1167 # list of columns (as in felis schema)
1168 fields = [column_map[field].name for field in df_fields if field in column_map]
1169 fields += extra_fields
1170
1171 # check that all partitioning and clustering columns are defined
1172 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1173 table_name
1174 )
1175 missing_columns = [column for column in required_columns if column not in fields]
1176 if missing_columns:
1177 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1178
1179 qfields = [quote_id(field) for field in fields]
1180 qfields_str = ",".join(qfields)
1181
1182 with self._timer("insert_build_time", tags={"table": table_name.name}):
1183 table = self._schema.tableName(table_name)
1184 if time_part is not None:
1185 table = f"{table}_{time_part}"
1186
1187 holders = ",".join(["?"] * len(qfields))
1188 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1189 statement = self._preparer.prepare(query)
1190 queries = cassandra.query.BatchStatement()
1191 for rec in records.itertuples(index=False):
1192 values = []
1193 for field in df_fields:
1194 if field not in column_map:
1195 continue
1196 value = getattr(rec, field)
1197 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1198 if isinstance(value, pandas.Timestamp):
1199 value = literal(value.to_pydatetime())
1200 else:
1201 # Assume it's seconds since epoch, Cassandra
1202 # datetime is in milliseconds
1203 value = int(value * 1000)
1204 values.append(literal(value))
1205 for field in extra_fields:
1206 value = extra_columns[field]
1207 values.append(literal(value))
1208 queries.add(statement, values)
1209
1210 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1211 with self._timer("insert_time", tags={"table": table_name.name}):
1212 self._session.execute(queries, timeout=self.config.write_timeout, execution_profile="write")
1213

◆ _storeReplicaChunk()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._storeReplicaChunk ( self,
ReplicaChunk replica_chunk,
astropy.time.Time visit_time )
protected

Definition at line 1009 of file apdbCassandra.py.

1009 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
1010 # Cassandra timestamp uses milliseconds since epoch
1011 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1012
1013 # everything goes into a single partition
1014 partition = 0
1015
1016 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
1017 query = (
1018 f'INSERT INTO "{self._keyspace}"."{table_name}" '
1019 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1020 "VALUES (?, ?, ?, ?)"
1021 )
1022
1023 self._session.execute(
1024 self._preparer.prepare(query),
1025 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1026 timeout=self.config.write_timeout,
1027 execution_profile="write",
1028 )
1029

◆ _temporal_where()

tuple[list[str], list[tuple[str, tuple]]] lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._temporal_where ( self,
ApdbTables table,
float | astropy.time.Time start_time,
float | astropy.time.Time end_time,
bool | None query_per_time_part = None )
protected
Generate table names and expressions for temporal part of WHERE
clauses.

Parameters
----------
table : `ApdbTables`
    Table to select from.
start_time : `astropy.time.Time` or `float`
    Starting Datetime of MJD value of the time range.
end_time : `astropy.time.Time` or `float`
    Starting Datetime of MJD value of the time range.
query_per_time_part : `bool`, optional
    If None then use ``query_per_time_part`` from configuration.

Returns
-------
tables : `list` [ `str` ]
    List of the table names to query.
expressions : `list` [ `tuple` ]
    A list of zero or more (expression, parameters) tuples.

Definition at line 1448 of file apdbCassandra.py.

1454 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1455 """Generate table names and expressions for temporal part of WHERE
1456 clauses.
1457
1458 Parameters
1459 ----------
1460 table : `ApdbTables`
1461 Table to select from.
1462 start_time : `astropy.time.Time` or `float`
1463 Starting Datetime of MJD value of the time range.
1464 end_time : `astropy.time.Time` or `float`
1465 Starting Datetime of MJD value of the time range.
1466 query_per_time_part : `bool`, optional
1467 If None then use ``query_per_time_part`` from configuration.
1468
1469 Returns
1470 -------
1471 tables : `list` [ `str` ]
1472 List of the table names to query.
1473 expressions : `list` [ `tuple` ]
1474 A list of zero or more (expression, parameters) tuples.
1475 """
1476 tables: list[str]
1477 temporal_where: list[tuple[str, tuple]] = []
1478 table_name = self._schema.tableName(table)
1479 time_part_start = self._time_partition(start_time)
1480 time_part_end = self._time_partition(end_time)
1481 time_parts = list(range(time_part_start, time_part_end + 1))
1482 if self.config.time_partition_tables:
1483 tables = [f"{table_name}_{part}" for part in time_parts]
1484 else:
1485 tables = [table_name]
1486 if query_per_time_part is None:
1487 query_per_time_part = self.config.query_per_time_part
1488 if query_per_time_part:
1489 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1490 else:
1491 time_part_list = ",".join([str(part) for part in time_parts])
1492 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1493
1494 return tables, temporal_where

◆ _time_partition()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._time_partition ( self,
float | astropy.time.Time time )
protected
Calculate time partition number for a given time.

Parameters
----------
time : `float` or `astropy.time.Time`
    Time for which to calculate partition number. Can be float to mean
    MJD or `astropy.time.Time`

Returns
-------
partition : `int`
    Partition number for a given time.

Definition at line 1321 of file apdbCassandra.py.

1321 def _time_partition(self, time: float | astropy.time.Time) -> int:
1322 """Calculate time partition number for a given time.
1323
1324 Parameters
1325 ----------
1326 time : `float` or `astropy.time.Time`
1327 Time for which to calculate partition number. Can be float to mean
1328 MJD or `astropy.time.Time`
1329
1330 Returns
1331 -------
1332 partition : `int`
1333 Partition number for a given time.
1334 """
1335 if isinstance(time, astropy.time.Time):
1336 mjd = float(time.mjd)
1337 else:
1338 mjd = time
1339 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1340 partition = int(days_since_epoch) // self.config.time_partition_days
1341 return partition
1342

◆ _time_partition_cls()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._time_partition_cls ( cls,
float | astropy.time.Time time,
float epoch_mjd,
int part_days )
protected
Calculate time partition number for a given time.

Parameters
----------
time : `float` or `astropy.time.Time`
    Time for which to calculate partition number. Can be float to mean
    MJD or `astropy.time.Time`
epoch_mjd : `float`
    Epoch time for partition 0.
part_days : `int`
    Number of days per partition.

Returns
-------
partition : `int`
    Partition number for a given time.

Definition at line 1295 of file apdbCassandra.py.

1295 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1296 """Calculate time partition number for a given time.
1297
1298 Parameters
1299 ----------
1300 time : `float` or `astropy.time.Time`
1301 Time for which to calculate partition number. Can be float to mean
1302 MJD or `astropy.time.Time`
1303 epoch_mjd : `float`
1304 Epoch time for partition 0.
1305 part_days : `int`
1306 Number of days per partition.
1307
1308 Returns
1309 -------
1310 partition : `int`
1311 Partition number for a given time.
1312 """
1313 if isinstance(time, astropy.time.Time):
1314 mjd = float(time.mjd)
1315 else:
1316 mjd = time
1317 days_since_epoch = mjd - epoch_mjd
1318 partition = int(days_since_epoch) // part_days
1319 return partition
1320

◆ _timer()

Timer lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._timer ( self,
str name,
*Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 350 of file apdbCassandra.py.

350 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
351 """Create `Timer` instance given its name."""
352 return Timer(name, *self._timer_args, tags=tags)
353

◆ _versionCheck()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._versionCheck ( self,
ApdbMetadataCassandra metadata )
protected
Check schema version compatibility.

Definition at line 408 of file apdbCassandra.py.

408 def _versionCheck(self, metadata: ApdbMetadataCassandra) -> None:
409 """Check schema version compatibility."""
410
411 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
412 """Retrieve version number from given metadata key."""
413 if metadata.table_exists():
414 version_str = metadata.get(key)
415 if version_str is None:
416 # Should not happen with existing metadata table.
417 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
418 return VersionTuple.fromString(version_str)
419 return default
420
421 # For old databases where metadata table does not exist we assume that
422 # version of both code and schema is 0.1.0.
423 initial_version = VersionTuple(0, 1, 0)
424 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
425 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
426
427 # For now there is no way to make read-only APDB instances, assume that
428 # any access can do updates.
429 if not self._schema.schemaVersion().checkCompatibility(db_schema_version, True):
430 raise IncompatibleVersionError(
431 f"Configured schema version {self._schema.schemaVersion()} "
432 f"is not compatible with database version {db_schema_version}"
433 )
434 if not self.apdbImplementationVersion().checkCompatibility(db_code_version, True):
435 raise IncompatibleVersionError(
436 f"Current code version {self.apdbImplementationVersion()} "
437 f"is not compatible with database version {db_code_version}"
438 )
439
440 # Check replica code version only if replica is enabled.
441 if self._schema.has_replica_chunks:
442 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
443 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
444 if not code_replica_version.checkCompatibility(db_replica_version, True):
445 raise IncompatibleVersionError(
446 f"Current replication code version {code_replica_version} "
447 f"is not compatible with database version {db_replica_version}"
448 )
449

◆ apdbImplementationVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.apdbImplementationVersion ( cls)
Return version number for current APDB implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 451 of file apdbCassandra.py.

451 def apdbImplementationVersion(cls) -> VersionTuple:
452 # Docstring inherited from base class.
453 return VERSION
454

◆ apdbSchemaVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.apdbSchemaVersion ( self)
Return schema version number as defined in config file.

Returns
-------
version : `VersionTuple`
    Version of the schema defined in schema config file.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 455 of file apdbCassandra.py.

455 def apdbSchemaVersion(self) -> VersionTuple:
456 # Docstring inherited from base class.
457 return self._schema.schemaVersion()
458

◆ containsVisitDetector()

bool lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.containsVisitDetector ( self,
int visit,
int detector )
Test whether data for a given visit-detector is present in the APDB.

Parameters
----------
visit, detector : `int`
    The ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaObject, DiaSource, or DiaForcedSource records
    exist for the specified observation, `False` otherwise.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 724 of file apdbCassandra.py.

724 def containsVisitDetector(self, visit: int, detector: int) -> bool:
725 # docstring is inherited from a base class
726 raise NotImplementedError()
727

◆ countUnassociatedObjects()

int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.countUnassociatedObjects ( self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 865 of file apdbCassandra.py.

865 def countUnassociatedObjects(self) -> int:
866 # docstring is inherited from a base class
867
868 # It's too inefficient to implement it for Cassandra in current schema.
869 raise NotImplementedError()
870

◆ dailyJob()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.dailyJob ( self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 861 of file apdbCassandra.py.

861 def dailyJob(self) -> None:
862 # docstring is inherited from a base class
863 pass
864

◆ get_replica()

ApdbCassandraReplica lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.get_replica ( self)
Return `ApdbReplica` instance for this database.

Definition at line 597 of file apdbCassandra.py.

597 def get_replica(self) -> ApdbCassandraReplica:
598 """Return `ApdbReplica` instance for this database."""
599 # Note that this instance has to stay alive while replica exists, so
600 # we pass reference to self.
601 return ApdbCassandraReplica(self, self._schema, self._session)
602

◆ getDiaForcedSources()

pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaForcedSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained. Some implementations may not support latter case.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_forced_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 712 of file apdbCassandra.py.

714 ) -> pandas.DataFrame | None:
715 # docstring is inherited from a base class
716 months = self.config.read_forced_sources_months
717 if months == 0:
718 return None
719 mjd_end = visit_time.mjd
720 mjd_start = mjd_end - months * 30
721
722 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
723

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaObjects ( self,
sphgeom.Region region )
Return catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 660 of file apdbCassandra.py.

660 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
661 # docstring is inherited from a base class
662
663 sp_where = self._spatial_where(region)
664 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
665
666 # We need to exclude extra partitioning columns from result.
667 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
668 what = ",".join(quote_id(column) for column in column_names)
669
670 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
671 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
672 statements: list[tuple] = []
673 for where, params in sp_where:
674 full_query = f"{query} WHERE {where}"
675 if params:
676 statement = self._preparer.prepare(full_query)
677 else:
678 # If there are no params then it is likely that query has a
679 # bunch of literals rendered already, no point trying to
680 # prepare it because it's not reusable.
681 statement = cassandra.query.SimpleStatement(full_query)
682 statements.append((statement, params))
683 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
684
685 with _MON.context_tags({"table": "DiaObject"}):
686 _MON.add_record(
687 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
688 )
689 with self._timer("select_time"):
690 objects = cast(
691 pandas.DataFrame,
692 select_concurrent(
693 self._session, statements, "read_pandas_multi", self.config.read_concurrency
694 ),
695 )
696
697 _LOG.debug("found %s DiaObjects", objects.shape[0])
698 return objects
699

◆ getDiaSources()

pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaSources ( self,
sphgeom.Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 700 of file apdbCassandra.py.

702 ) -> pandas.DataFrame | None:
703 # docstring is inherited from a base class
704 months = self.config.read_sources_months
705 if months == 0:
706 return None
707 mjd_end = visit_time.mjd
708 mjd_start = mjd_end - months * 30
709
710 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
711

◆ getSSObjects()

pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getSSObjects ( self)
Return catalog of SSObject instances.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing SSObject records, all existing records are
    returned.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 728 of file apdbCassandra.py.

728 def getSSObjects(self) -> pandas.DataFrame:
729 # docstring is inherited from a base class
730 tableName = self._schema.tableName(ApdbTables.SSObject)
731 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
732
733 objects = None
734 with self._timer("select_time", tags={"table": "SSObject"}):
735 result = self._session.execute(query, execution_profile="read_pandas")
736 objects = result._current_rows
737
738 _LOG.debug("found %s SSObjects", objects.shape[0])
739 return objects
740

◆ init_database()

ApdbCassandraConfig lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.init_database ( cls,
list[str] hosts,
str keyspace,
*str | None schema_file = None,
str | None schema_name = None,
int | None read_sources_months = None,
int | None read_forced_sources_months = None,
bool use_insert_id = False,
bool use_insert_id_skips_diaobjects = False,
int | None port = None,
str | None username = None,
str | None prefix = None,
str | None part_pixelization = None,
int | None part_pix_level = None,
bool time_partition_tables = True,
str | None time_partition_start = None,
str | None time_partition_end = None,
str | None read_consistency = None,
str | None write_consistency = None,
int | None read_timeout = None,
int | None write_timeout = None,
list[str] | None ra_dec_columns = None,
int | None replication_factor = None,
bool drop = False )
Initialize new APDB instance and make configuration object for it.

Parameters
----------
hosts : `list` [`str`]
    List of host names or IP addresses for Cassandra cluster.
keyspace : `str`
    Name of the keyspace for APDB tables.
schema_file : `str`, optional
    Location of (YAML) configuration file with APDB schema. If not
    specified then default location will be used.
schema_name : `str`, optional
    Name of the schema in YAML configuration file. If not specified
    then default name will be used.
read_sources_months : `int`, optional
    Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
    Number of months of history to read from DiaForcedSource.
use_insert_id : `bool`, optional
    If True, make additional tables used for replication to PPDB.
use_insert_id_skips_diaobjects : `bool`, optional
    If `True` then do not fill regular ``DiaObject`` table when
    ``use_insert_id`` is `True`.
port : `int`, optional
    Port number to use for Cassandra connections.
username : `str`, optional
    User name for Cassandra connections.
prefix : `str`, optional
    Optional prefix for all table names.
part_pixelization : `str`, optional
    Name of the MOC pixelization used for partitioning.
part_pix_level : `int`, optional
    Pixelization level.
time_partition_tables : `bool`, optional
    Create per-partition tables.
time_partition_start : `str`, optional
    Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
    format, in TAI.
time_partition_end : `str`, optional
    Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
    format, in TAI.
read_consistency : `str`, optional
    Name of the consistency level for read operations.
write_consistency : `str`, optional
    Name of the consistency level for write operations.
read_timeout : `int`, optional
    Read timeout in seconds.
write_timeout : `int`, optional
    Write timeout in seconds.
ra_dec_columns : `list` [`str`], optional
    Names of ra/dec columns in DiaObject table.
replication_factor : `int`, optional
    Replication factor used when creating new keyspace, if keyspace
    already exists its replication factor is not changed.
drop : `bool`, optional
    If `True` then drop existing tables before re-creating the schema.

Returns
-------
config : `ApdbCassandraConfig`
    Resulting configuration object for a created APDB instance.

Definition at line 464 of file apdbCassandra.py.

490 ) -> ApdbCassandraConfig:
491 """Initialize new APDB instance and make configuration object for it.
492
493 Parameters
494 ----------
495 hosts : `list` [`str`]
496 List of host names or IP addresses for Cassandra cluster.
497 keyspace : `str`
498 Name of the keyspace for APDB tables.
499 schema_file : `str`, optional
500 Location of (YAML) configuration file with APDB schema. If not
501 specified then default location will be used.
502 schema_name : `str`, optional
503 Name of the schema in YAML configuration file. If not specified
504 then default name will be used.
505 read_sources_months : `int`, optional
506 Number of months of history to read from DiaSource.
507 read_forced_sources_months : `int`, optional
508 Number of months of history to read from DiaForcedSource.
509 use_insert_id : `bool`, optional
510 If True, make additional tables used for replication to PPDB.
511 use_insert_id_skips_diaobjects : `bool`, optional
512 If `True` then do not fill regular ``DiaObject`` table when
513 ``use_insert_id`` is `True`.
514 port : `int`, optional
515 Port number to use for Cassandra connections.
516 username : `str`, optional
517 User name for Cassandra connections.
518 prefix : `str`, optional
519 Optional prefix for all table names.
520 part_pixelization : `str`, optional
521 Name of the MOC pixelization used for partitioning.
522 part_pix_level : `int`, optional
523 Pixelization level.
524 time_partition_tables : `bool`, optional
525 Create per-partition tables.
526 time_partition_start : `str`, optional
527 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
528 format, in TAI.
529 time_partition_end : `str`, optional
530 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
531 format, in TAI.
532 read_consistency : `str`, optional
533 Name of the consistency level for read operations.
534 write_consistency : `str`, optional
535 Name of the consistency level for write operations.
536 read_timeout : `int`, optional
537 Read timeout in seconds.
538 write_timeout : `int`, optional
539 Write timeout in seconds.
540 ra_dec_columns : `list` [`str`], optional
541 Names of ra/dec columns in DiaObject table.
542 replication_factor : `int`, optional
543 Replication factor used when creating new keyspace, if keyspace
544 already exists its replication factor is not changed.
545 drop : `bool`, optional
546 If `True` then drop existing tables before re-creating the schema.
547
548 Returns
549 -------
550 config : `ApdbCassandraConfig`
551 Resulting configuration object for a created APDB instance.
552 """
553 config = ApdbCassandraConfig(
554 contact_points=hosts,
555 keyspace=keyspace,
556 use_insert_id=use_insert_id,
557 use_insert_id_skips_diaobjects=use_insert_id_skips_diaobjects,
558 time_partition_tables=time_partition_tables,
559 )
560 if schema_file is not None:
561 config.schema_file = schema_file
562 if schema_name is not None:
563 config.schema_name = schema_name
564 if read_sources_months is not None:
565 config.read_sources_months = read_sources_months
566 if read_forced_sources_months is not None:
567 config.read_forced_sources_months = read_forced_sources_months
568 if port is not None:
569 config.port = port
570 if username is not None:
571 config.username = username
572 if prefix is not None:
573 config.prefix = prefix
574 if part_pixelization is not None:
575 config.part_pixelization = part_pixelization
576 if part_pix_level is not None:
577 config.part_pix_level = part_pix_level
578 if time_partition_start is not None:
579 config.time_partition_start = time_partition_start
580 if time_partition_end is not None:
581 config.time_partition_end = time_partition_end
582 if read_consistency is not None:
583 config.read_consistency = read_consistency
584 if write_consistency is not None:
585 config.write_consistency = write_consistency
586 if read_timeout is not None:
587 config.read_timeout = read_timeout
588 if write_timeout is not None:
589 config.write_timeout = write_timeout
590 if ra_dec_columns is not None:
591 config.ra_dec_columns = ra_dec_columns
592
593 cls._makeSchema(config, drop=drop, replication_factor=replication_factor)
594
595 return config
596

◆ metadata()

ApdbMetadata lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadata ( self)
Object controlling access to APDB metadata (`ApdbMetadata`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 872 of file apdbCassandra.py.

872 def metadata(self) -> ApdbMetadata:
873 # docstring is inherited from a base class
874 if self._metadata is None:
875 raise RuntimeError("Database schema was not initialized.")
876 return self._metadata
877

◆ reassignDiaSources()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.reassignDiaSources ( self,
Mapping[int, int] idMap )
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.

Parameters
----------
idMap : `Mapping`
    Maps DiaSource IDs to their new SSObject IDs.

Raises
------
ValueError
    Raised if DiaSource ID does not exist in the database.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 773 of file apdbCassandra.py.

773 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
774 # docstring is inherited from a base class
775
776 # To update a record we need to know its exact primary key (including
777 # partition key) so we start by querying for diaSourceId to find the
778 # primary keys.
779
780 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
781 # split it into 1k IDs per query
782 selects: list[tuple] = []
783 for ids in chunk_iterable(idMap.keys(), 1_000):
784 ids_str = ",".join(str(item) for item in ids)
785 selects.append(
786 (
787 (
788 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
789 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
790 ),
791 {},
792 )
793 )
794
795 # No need for DataFrame here, read data as tuples.
796 result = cast(
797 list[tuple[int, int, int, int | None]],
798 select_concurrent(self._session, selects, "read_tuples", self.config.read_concurrency),
799 )
800
801 # Make mapping from source ID to its partition.
802 id2partitions: dict[int, tuple[int, int]] = {}
803 id2chunk_id: dict[int, int] = {}
804 for row in result:
805 id2partitions[row[0]] = row[1:3]
806 if row[3] is not None:
807 id2chunk_id[row[0]] = row[3]
808
809 # make sure we know partitions for each ID
810 if set(id2partitions) != set(idMap):
811 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
812 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
813
814 # Reassign in standard tables
815 queries = cassandra.query.BatchStatement()
816 table_name = self._schema.tableName(ApdbTables.DiaSource)
817 for diaSourceId, ssObjectId in idMap.items():
818 apdb_part, apdb_time_part = id2partitions[diaSourceId]
819 values: tuple
820 if self.config.time_partition_tables:
821 query = (
822 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
823 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
824 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
825 )
826 values = (ssObjectId, apdb_part, diaSourceId)
827 else:
828 query = (
829 f'UPDATE "{self._keyspace}"."{table_name}"'
830 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
831 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
832 )
833 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
834 queries.add(self._preparer.prepare(query), values)
835
836 # Reassign in replica tables, only if replication is enabled
837 if id2chunk_id:
838 # Filter out chunks that have been deleted already. There is a
839 # potential race with concurrent removal of chunks, but it
840 # should be handled by WHERE in UPDATE.
841 known_ids = set()
842 if replica_chunks := self.get_replica().getReplicaChunks():
843 known_ids = set(replica_chunk.id for replica_chunk in replica_chunks)
844 id2chunk_id = {key: value for key, value in id2chunk_id.items() if value in known_ids}
845 if id2chunk_id:
846 table_name = self._schema.tableName(ExtraTables.DiaSourceChunks)
847 for diaSourceId, ssObjectId in idMap.items():
848 if replica_chunk := id2chunk_id.get(diaSourceId):
849 query = (
850 f'UPDATE "{self._keyspace}"."{table_name}" '
851 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
852 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
853 )
854 values = (ssObjectId, replica_chunk, diaSourceId)
855 queries.add(self._preparer.prepare(query), values)
856
857 _LOG.debug("%s: will update %d records", table_name, len(idMap))
858 with self._timer("update_time", tags={"table": table_name}):
859 self._session.execute(queries, execution_profile="write")
860

◆ store()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.store ( self,
astropy.time.Time visit_time,
pandas.DataFrame objects,
pandas.DataFrame | None sources = None,
pandas.DataFrame | None forced_sources = None )
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `astropy.time.Time`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 741 of file apdbCassandra.py.

747 ) -> None:
748 # docstring is inherited from a base class
749
750 replica_chunk: ReplicaChunk | None = None
751 if self._schema.has_replica_chunks:
752 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
753 self._storeReplicaChunk(replica_chunk, visit_time)
754
755 # fill region partition column for DiaObjects
756 objects = self._add_obj_part(objects)
757 self._storeDiaObjects(objects, visit_time, replica_chunk)
758
759 if sources is not None:
760 # copy apdb_part column from DiaObjects to DiaSources
761 sources = self._add_src_part(sources, objects)
762 self._storeDiaSources(ApdbTables.DiaSource, sources, visit_time, replica_chunk)
763 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
764
765 if forced_sources is not None:
766 forced_sources = self._add_fsrc_part(forced_sources, objects)
767 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time, replica_chunk)
768

◆ storeSSObjects()

None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.storeSSObjects ( self,
pandas.DataFrame objects )
Store or update SSObject catalog.

Parameters
----------
objects : `pandas.DataFrame`
    Catalog with SSObject records.

Notes
-----
If SSObjects with matching IDs already exist in the database, their
records will be updated with the information from provided records.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 769 of file apdbCassandra.py.

769 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
770 # docstring is inherited from a base class
771 self._storeObjectsPandas(objects, ApdbTables.SSObject)
772

◆ tableDef()

Table | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.tableDef ( self,
ApdbTables table )
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `.schema_model.Table` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 459 of file apdbCassandra.py.

459 def tableDef(self, table: ApdbTables) -> Table | None:
460 # docstring is inherited from a base class
461 return self._schema.tableSchemas.get(table)
462

Member Data Documentation

◆ _cluster

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._cluster
protected

Definition at line 298 of file apdbCassandra.py.

◆ _frozen_parameters

tuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._frozen_parameters
staticprotected
Initial value:
= (
"use_insert_id",
"part_pixelization",
"part_pix_level",
"ra_dec_columns",
"time_partition_tables",
"time_partition_days",
"use_insert_id_skips_diaobjects",
)

Definition at line 278 of file apdbCassandra.py.

◆ _keyspace

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._keyspace
protected

Definition at line 296 of file apdbCassandra.py.

◆ _metadata

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._metadata
protected

Definition at line 301 of file apdbCassandra.py.

◆ _partition_zero_epoch_mjd

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._partition_zero_epoch_mjd
protected

Definition at line 330 of file apdbCassandra.py.

◆ _pixelization

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._pixelization
protected

Definition at line 315 of file apdbCassandra.py.

◆ _preparer

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._preparer
protected

Definition at line 336 of file apdbCassandra.py.

◆ _schema

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._schema
protected

Definition at line 321 of file apdbCassandra.py.

◆ _session

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._session
protected

Definition at line 298 of file apdbCassandra.py.

◆ _timer_args

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra._timer_args
protected

Definition at line 352 of file apdbCassandra.py.

◆ config

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.config

Definition at line 310 of file apdbCassandra.py.

◆ metadataCodeVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataCodeVersionKey = "version:ApdbCassandra"
static

Definition at line 269 of file apdbCassandra.py.

◆ metadataCodeVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataCodeVersionKey

Definition at line 644 of file apdbCassandra.py.

◆ metadataConfigKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataConfigKey = "config:apdb-cassandra.json"
static

Definition at line 275 of file apdbCassandra.py.

◆ metadataConfigKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataConfigKey

Definition at line 656 of file apdbCassandra.py.

◆ metadataReplicaVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataReplicaVersionKey = "version:ApdbCassandraReplica"
static

Definition at line 272 of file apdbCassandra.py.

◆ metadataReplicaVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataReplicaVersionKey

Definition at line 649 of file apdbCassandra.py.

◆ metadataSchemaVersionKey [1/2]

str lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataSchemaVersionKey = "version:schema"
static

Definition at line 266 of file apdbCassandra.py.

◆ metadataSchemaVersionKey [2/2]

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadataSchemaVersionKey

Definition at line 643 of file apdbCassandra.py.

◆ partition_zero_epoch

lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
static

Definition at line 289 of file apdbCassandra.py.


The documentation for this class was generated from the following file: