LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema Class Reference

Public Member Functions

 __init__ (self, cassandra.cluster.Session session, str keyspace, Mapping[ApdbTables, Table] table_schemas, str prefix="", bool time_partition_tables=False, bool enable_replica=False, bool replica_skips_diaobjects=False, bool has_chunk_sub_partitions=True, bool has_visit_detector_table=True)
 
bool replication_enabled (self)
 
bool empty (self)
 
dict[ApdbTables, list[str]] existing_tables (self, *ApdbTables args)
 
bool check_column (self, ApdbTables|ExtraTables table_enum, str column)
 
str tableName (self, ApdbTables|ExtraTables table_name, int|None time_partition=None)
 
str keyspace (self)
 
Mapping[str, schema_model.ColumngetColumnMap (self, ApdbTables|ExtraTables table_name)
 
list[str] apdbColumnNames (self, ApdbTables|ExtraTables table_name)
 
list[str] partitionColumns (self, ApdbTables|ExtraTables table_name)
 
list[str] clusteringColumns (self, ApdbTables|ExtraTables table_name)
 
None makeSchema (self, *, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, int|None replication_factor=None, CreateTableOptions|None table_options=None)
 
int table_row_size (self, ApdbTables|ExtraTables table)
 
list[ApdbTablestime_partitioned_tables (self)
 

Protected Member Functions

Mapping[ApdbTables, schema_model.Table_apdb_tables_schema (self, bool time_partition_tables)
 
Mapping[ExtraTables, schema_model.Table_extra_tables_schema (self)
 
CreateTableOptions|None _update_table_options (self, CreateTableOptions|None options)
 
None _makeTableSchema (self, ApdbTables|ExtraTables table, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, CreateTableOptions|None table_options=None)
 
list[str] _tableColumns (self, ApdbTables|ExtraTables table_name)
 
schema_model.Table _table_schema (self, ApdbTables|ExtraTables table)
 

Protected Attributes

 _session = session
 
str _keyspace = keyspace
 
 _table_schemas = table_schemas
 
 _prefix = prefix
 
 _time_partition_tables = time_partition_tables
 
 _enable_replica = enable_replica
 
 _replica_skips_diaobjects = replica_skips_diaobjects
 
 _has_chunk_sub_partitions = has_chunk_sub_partitions
 
 _has_visit_detector_table = has_visit_detector_table
 
Mapping[ApdbTables, schema_model.Table_apdb_tables = self._apdb_tables_schema(time_partition_tables)
 
Mapping[ExtraTables, schema_model.Table_extra_tables = self._extra_tables_schema()
 

Static Protected Attributes

dict _type_map
 
list _time_partitioned_tables
 
list _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
 

Detailed Description

Class for management of APDB schema.

Parameters
----------
session : `cassandra.cluster.Session`
    Cassandra session object
keyspace : `str`
    Keyspace name for all tables.
schema_file : `str`
    Name of the YAML schema file.
schema_name : `str`, optional
    Name of the schema in YAML files.
prefix : `str`, optional
    Prefix to add to all schema elements.
time_partition_tables : `bool`
    If `True` then schema will have a separate table for each time
    partition.
enable_replica : `bool`, optional
    If `True` then use additional tables for replica chunks.
has_chunk_sub_partitions : `bool`, optional
    If `True` then replica chunk tables have sub-partition columns. Only
    used if ``enable_replica`` is `True`.

Definition at line 154 of file apdbCassandraSchema.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.__init__ ( self,
cassandra.cluster.Session session,
str keyspace,
Mapping[ApdbTables, Table] table_schemas,
str prefix = "",
bool time_partition_tables = False,
bool enable_replica = False,
bool replica_skips_diaobjects = False,
bool has_chunk_sub_partitions = True,
bool has_visit_detector_table = True )

Definition at line 204 of file apdbCassandraSchema.py.

215 ):
216 self._session = session
217 self._keyspace = keyspace
218 self._table_schemas = table_schemas
219 self._prefix = prefix
220 self._time_partition_tables = time_partition_tables
221 self._enable_replica = enable_replica
222 self._replica_skips_diaobjects = replica_skips_diaobjects
223 self._has_chunk_sub_partitions = has_chunk_sub_partitions
224 self._has_visit_detector_table = has_visit_detector_table
225
226 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
227 self._extra_tables = self._extra_tables_schema()
228

Member Function Documentation

◆ _apdb_tables_schema()

Mapping[ApdbTables, schema_model.Table] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._apdb_tables_schema ( self,
bool time_partition_tables )
protected
Generate schema for regular APDB tables.

Definition at line 229 of file apdbCassandraSchema.py.

229 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]:
230 """Generate schema for regular APDB tables."""
231 apdb_tables: dict[ApdbTables, schema_model.Table] = {}
232
233 # add columns and index for partitioning.
234 for table, apdb_table_def in self._table_schemas.items():
235 part_columns = []
236 add_columns = []
237 primary_key = apdb_table_def.primary_key[:]
238 if table in self._spatially_partitioned_tables:
239 # DiaObjectLast does not need temporal partitioning
240 part_columns = ["apdb_part"]
241 add_columns = part_columns
242 elif table in self._time_partitioned_tables:
243 if time_partition_tables:
244 part_columns = ["apdb_part"]
245 else:
246 part_columns = ["apdb_part", "apdb_time_part"]
247 add_columns = part_columns
248 elif table is ApdbTables.SSObject:
249 # For SSObject there is no natural partition key but we have
250 # to partition it because there are too many of them. I'm
251 # going to partition on its primary key (and drop separate
252 # primary key index).
253 part_columns = ["ssObjectId"]
254 primary_key = []
255 elif table is ApdbTables.metadata:
256 # Metadata is in one partition because we want to read all of
257 # it in one query, add an extra column for partition.
258 part_columns = ["meta_part"]
259 add_columns = part_columns
260 else:
261 # TODO: Do not know what to do with the other tables
262 continue
263
264 column_defs = []
265 if add_columns:
266 column_defs = [
267 schema_model.Column(
268 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False
269 )
270 for name in add_columns
271 ]
272
273 if table is ApdbTables.DiaObjectLast:
274 # In the past DiaObjectLast table did not have validityStart.
275 validity_start_column = "validityStartMjdTai"
276 try:
277 if not self.check_column(ApdbTables.DiaObjectLast, validity_start_column):
278 for column in apdb_table_def.columns:
279 if column.name == validity_start_column:
280 apdb_table_def.columns.remove(column)
281 break
282 except LookupError:
283 # Table has not been created yet.
284 pass
285
286 annotations = dict(apdb_table_def.annotations)
287 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
288 if part_columns:
289 annotations["cassandra:partitioning_columns"] = part_columns
290
291 apdb_tables[table] = schema_model.Table(
292 id=apdb_table_def.id,
293 name=apdb_table_def.name,
294 columns=column_defs + apdb_table_def.columns,
295 primary_key=primary_key,
296 indexes=[],
297 constraints=[],
298 annotations=annotations,
299 )
300
301 return apdb_tables
302

◆ _extra_tables_schema()

Mapping[ExtraTables, schema_model.Table] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._extra_tables_schema ( self)
protected
Generate schema for extra tables.

Definition at line 303 of file apdbCassandraSchema.py.

303 def _extra_tables_schema(self) -> Mapping[ExtraTables, schema_model.Table]:
304 """Generate schema for extra tables."""
305 extra_tables: dict[ExtraTables, schema_model.Table] = {}
306
307 if self._has_visit_detector_table:
308 columns = [
309 schema_model.Column(
310 id="#visit",
311 name="visit",
312 datatype=felis.datamodel.DataType.long,
313 nullable=False,
314 ),
315 schema_model.Column(
316 id="#detector",
317 name="detector",
318 datatype=felis.datamodel.DataType.short,
319 nullable=False,
320 ),
321 ]
322 extra_tables[ExtraTables.ApdbVisitDetector] = schema_model.Table(
323 id="#" + ExtraTables.ApdbVisitDetector.value,
324 name=ExtraTables.ApdbVisitDetector.table_name(self._prefix),
325 columns=columns,
326 primary_key=[],
327 indexes=[],
328 constraints=[],
329 annotations={"cassandra:partitioning_columns": ["visit", "detector"]},
330 )
331
332 # This table maps DiaSource ID to its partitions in DiaSource table and
333 # DiaSourceChunks tables.
334 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table(
335 id="#" + ExtraTables.DiaSourceToPartition.value,
336 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
337 columns=[
338 schema_model.Column(
339 id="#diaSourceId",
340 name="diaSourceId",
341 datatype=felis.datamodel.DataType.long,
342 nullable=False,
343 ),
344 schema_model.Column(
345 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
346 ),
347 schema_model.Column(
348 id="#apdb_time_part",
349 name="apdb_time_part",
350 datatype=felis.datamodel.DataType.int,
351 nullable=False,
352 ),
353 schema_model.Column(
354 id="#apdb_replica_chunk",
355 name="apdb_replica_chunk",
356 datatype=felis.datamodel.DataType.long,
357 nullable=True,
358 ),
359 schema_model.Column(
360 id="#apdb_replica_subchunk",
361 name="apdb_replica_subchunk",
362 datatype=felis.datamodel.DataType.int,
363 nullable=True,
364 ),
365 ],
366 primary_key=[],
367 indexes=[],
368 constraints=[],
369 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
370 )
371
372 # This table maps diaObjectId to its partition in DiaObjectLast table.
373 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table(
374 id="#" + ExtraTables.DiaObjectLastToPartition.value,
375 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix),
376 columns=[
377 schema_model.Column(
378 id="#diaObjectId",
379 name="diaObjectId",
380 datatype=felis.datamodel.DataType.long,
381 nullable=False,
382 ),
383 schema_model.Column(
384 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
385 ),
386 ],
387 primary_key=[],
388 indexes=[],
389 constraints=[],
390 annotations={"cassandra:partitioning_columns": ["diaObjectId"]},
391 )
392
393 if not self._enable_replica:
394 return extra_tables
395
396 replica_chunk_column = schema_model.Column(
397 id="#apdb_replica_chunk",
398 name="apdb_replica_chunk",
399 datatype=felis.datamodel.DataType.long,
400 nullable=False,
401 )
402
403 replica_chunk_columns = [replica_chunk_column]
404 if self._has_chunk_sub_partitions:
405 replica_chunk_columns.append(
406 schema_model.Column(
407 id="#apdb_replica_subchunk",
408 name="apdb_replica_subchunk",
409 datatype=felis.datamodel.DataType.int,
410 nullable=False,
411 )
412 )
413
414 # Table containing replica chunks, this one is not partitioned, but
415 # partition key must be defined.
416 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table(
417 id="#" + ExtraTables.ApdbReplicaChunks.value,
418 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
419 columns=[
420 schema_model.Column(
421 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False
422 ),
423 replica_chunk_column,
424 schema_model.Column(
425 id="#last_update_time",
426 name="last_update_time",
427 datatype=felis.datamodel.DataType.timestamp,
428 nullable=False,
429 ),
430 schema_model.Column(
431 id="#unique_id",
432 name="unique_id",
433 datatype=schema_model.ExtraDataTypes.UUID,
434 nullable=False,
435 ),
436 schema_model.Column(
437 id="#has_subchunks",
438 name="has_subchunks",
439 datatype=felis.datamodel.DataType.boolean,
440 nullable=True,
441 ),
442 ],
443 primary_key=[replica_chunk_column],
444 indexes=[],
445 constraints=[],
446 annotations={"cassandra:partitioning_columns": ["partition"]},
447 )
448
449 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions)
450 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items():
451 apdb_table_def = self._table_schemas[apdb_table_enum]
452
453 extra_tables[chunk_table_enum] = schema_model.Table(
454 id="#" + chunk_table_enum.value,
455 name=chunk_table_enum.table_name(self._prefix),
456 columns=replica_chunk_columns + apdb_table_def.columns,
457 primary_key=apdb_table_def.primary_key[:],
458 indexes=[],
459 constraints=[],
460 annotations={
461 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
462 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
463 },
464 )
465
466 # Table with replica chunk data for ApdbUpdateRecord.
467 columns = [
468 schema_model.Column(
469 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
470 name="update_time_ns",
471 datatype=felis.datamodel.DataType.long,
472 nullable=False,
473 ),
474 schema_model.Column(
475 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
476 name="update_order",
477 datatype=felis.datamodel.DataType.int,
478 nullable=False,
479 ),
480 schema_model.Column(
481 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
482 name="update_unique_id",
483 datatype=schema_model.ExtraDataTypes.UUID,
484 nullable=False,
485 ),
486 schema_model.Column(
487 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
488 name="update_payload",
489 datatype=felis.datamodel.DataType.string,
490 nullable=False,
491 ),
492 ]
493 extra_tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table(
494 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}",
495 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix),
496 columns=replica_chunk_columns + columns,
497 primary_key=columns[:3],
498 indexes=[],
499 constraints=[],
500 annotations={
501 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
502 },
503 )
504
505 return extra_tables
506

◆ _makeTableSchema()

None lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._makeTableSchema ( self,
ApdbTables | ExtraTables table,
bool drop = False,
ApdbCassandraTimePartitionRange | None part_range = None,
CreateTableOptions | None table_options = None )
protected

Definition at line 801 of file apdbCassandraSchema.py.

807 ) -> None:
808 _LOG.debug("Making table %s", table)
809
810 fullTable = table.table_name(self._prefix)
811
812 table_list = [fullTable]
813 if part_range is not None:
814 if table in self._time_partitioned_tables:
815 table_list = [table.table_name(self._prefix, part) for part in part_range.range()]
816
817 if drop:
818 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
819 futures = [self._session.execute_async(query, timeout=None) for query in queries]
820 for future in futures:
821 _LOG.debug("wait for query: %s", future.query)
822 future.result()
823 _LOG.debug("query finished: %s", future.query)
824
825 queries = []
826 options = table_options.get_options(fullTable).strip() if table_options else None
827 for table_name in table_list:
828 if_not_exists = "" if drop else "IF NOT EXISTS"
829 columns = ", ".join(self._tableColumns(table))
830 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
831 if options:
832 query = f"{query} WITH {options}"
833 _LOG.debug("query: %s", query)
834 queries.append(query)
835 futures = [self._session.execute_async(query, timeout=None) for query in queries]
836 for future in futures:
837 _LOG.debug("wait for query: %s", future.query)
838 future.result()
839 _LOG.debug("query finished: %s", future.query)
840

◆ _table_schema()

schema_model.Table lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._table_schema ( self,
ApdbTables | ExtraTables table )
protected
Return schema definition for a table.

Definition at line 882 of file apdbCassandraSchema.py.

882 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table:
883 """Return schema definition for a table."""
884 if isinstance(table, ApdbTables):
885 table_schema = self._apdb_tables[table]
886 else:
887 table_schema = self._extra_tables[table]
888 return table_schema
889

◆ _tableColumns()

list[str] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._tableColumns ( self,
ApdbTables | ExtraTables table_name )
protected
Return set of columns in a table

Parameters
----------
table_name : `ApdbTables`
    Name of the table.

Returns
-------
column_defs : `list`
    List of strings in the format "column_name type".

Definition at line 841 of file apdbCassandraSchema.py.

841 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
842 """Return set of columns in a table
843
844 Parameters
845 ----------
846 table_name : `ApdbTables`
847 Name of the table.
848
849 Returns
850 -------
851 column_defs : `list`
852 List of strings in the format "column_name type".
853 """
854 table_schema = self._table_schema(table_name)
855
856 # must have partition columns and clustering columns
857 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
858 clust_columns = [column.name for column in table_schema.primary_key]
859 _LOG.debug("part_columns: %s", part_columns)
860 _LOG.debug("clust_columns: %s", clust_columns)
861 if not part_columns:
862 raise ValueError(f"Table {table_name} configuration is missing partition index")
863
864 # all columns
865 column_defs = []
866 for column in table_schema.columns:
867 ctype = self._type_map[column.datatype]
868 column_defs.append(f'"{column.name}" {ctype}')
869
870 # primary key definition
871 part_columns = [f'"{col}"' for col in part_columns]
872 clust_columns = [f'"{col}"' for col in clust_columns]
873 if len(part_columns) > 1:
874 columns = ", ".join(part_columns)
875 part_columns = [f"({columns})"]
876 pkey = ", ".join(part_columns + clust_columns)
877 _LOG.debug("pkey: %s", pkey)
878 column_defs.append(f"PRIMARY KEY ({pkey})")
879
880 return column_defs
881

◆ _update_table_options()

CreateTableOptions | None lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._update_table_options ( self,
CreateTableOptions | None options )
protected
Extend table options with options for internal tables.

Definition at line 782 of file apdbCassandraSchema.py.

782 def _update_table_options(self, options: CreateTableOptions | None) -> CreateTableOptions | None:
783 """Extend table options with options for internal tables."""
784 # We want to add TTL option to ApdbVisitDetector table.
785 if not self._has_visit_detector_table:
786 return options
787
788 if not options:
789 options = CreateTableOptions()
790
791 # set both TTL and gc_grace_seconds to 24h.
792 options.table_options.append(
793 TableOptions(
794 tables=[ExtraTables.ApdbVisitDetector.table_name(self._prefix)],
795 options="default_time_to_live=86400 AND gc_grace_seconds=86400",
796 )
797 )
798
799 return options
800

◆ apdbColumnNames()

list[str] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.apdbColumnNames ( self,
ApdbTables | ExtraTables table_name )
Return a list of columns names for a table as defined in APDB
schema.

Parameters
----------
table_name : `ApdbTables` or `ExtraTables`
    Enum for a table in APDB schema.

Returns
-------
columns : `list` of `str`
    Names of regular columns in the table.

Definition at line 671 of file apdbCassandraSchema.py.

671 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]:
672 """Return a list of columns names for a table as defined in APDB
673 schema.
674
675 Parameters
676 ----------
677 table_name : `ApdbTables` or `ExtraTables`
678 Enum for a table in APDB schema.
679
680 Returns
681 -------
682 columns : `list` of `str`
683 Names of regular columns in the table.
684 """
685 table_schema = self._table_schema(table_name)
686 return table_schema.annotations["cassandra:apdb_column_names"]
687

◆ check_column()

bool lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.check_column ( self,
ApdbTables | ExtraTables table_enum,
str column )
Check for the existence of the column in a given table.

Parameters
----------
table_enum : `ApdbTables` or `ExtraTables`
    Table to check for a column.
column : `str`
    Name of the column to check.

Returns
-------
exists : `bool`
    True if column exists, False otherwise.

Raises
------
LookupError
    Raised if table does not exist.

Definition at line 586 of file apdbCassandraSchema.py.

586 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool:
587 """Check for the existence of the column in a given table.
588
589 Parameters
590 ----------
591 table_enum : `ApdbTables` or `ExtraTables`
592 Table to check for a column.
593 column : `str`
594 Name of the column to check.
595
596 Returns
597 -------
598 exists : `bool`
599 True if column exists, False otherwise.
600
601 Raises
602 ------
603 LookupError
604 Raised if table does not exist.
605 """
606 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
607 query = (
608 "SELECT table_name FROM system_schema.columns WHERE keyspace_name = %s AND column_name = %s "
609 "ALLOW FILTERING"
610 )
611 result = self._session.execute(query, (self._keyspace, column))
612 base_name = table_enum.table_name(self._prefix)
613 for row in result.all():
614 table_name = row[0]
615 if table_name.startswith(f"{base_name}_"):
616 return True
617 # Check that there is any table with matching name.
618 assert isinstance(table_enum, ApdbTables), "Can only be ApdbTables"
619 tables = self.existing_tables(table_enum)
620 if not tables[table_enum]:
621 raise LookupError(f"Table {base_name} does not exist.")
622 return False
623 else:
624 table_name = table_enum.table_name(self._prefix)
625 query = (
626 "SELECT column_name FROM system_schema.columns WHERE keyspace_name = %s AND table_name = %s"
627 )
628 result = self._session.execute(query, (self._keyspace, table_name))
629 rows = list(result)
630 if not rows:
631 raise LookupError(f"Table {table_name} does not exist.")
632 for row in rows:
633 if row.column_name == column:
634 return True
635 return False
636

◆ clusteringColumns()

list[str] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.clusteringColumns ( self,
ApdbTables | ExtraTables table_name )
Return a list of columns used for clustering.

Parameters
----------
table_name : `ApdbTables`
    Table name in APDB schema

Returns
-------
columns : `list` of `str`
    Names of columns for used for clustering.

Definition at line 704 of file apdbCassandraSchema.py.

704 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
705 """Return a list of columns used for clustering.
706
707 Parameters
708 ----------
709 table_name : `ApdbTables`
710 Table name in APDB schema
711
712 Returns
713 -------
714 columns : `list` of `str`
715 Names of columns for used for clustering.
716 """
717 table_schema = self._table_schema(table_name)
718 return [column.name for column in table_schema.primary_key]
719

◆ empty()

bool lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.empty ( self)
Return True if database schema is empty.

Returns
-------
empty : `bool`
    `True` if none of the required APDB tables exist in the database,
    `False` if all required tables exist.

Raises
------
InconsistentSchemaError
    Raised when some of the required tables exist but not all.

Definition at line 512 of file apdbCassandraSchema.py.

512 def empty(self) -> bool:
513 """Return True if database schema is empty.
514
515 Returns
516 -------
517 empty : `bool`
518 `True` if none of the required APDB tables exist in the database,
519 `False` if all required tables exist.
520
521 Raises
522 ------
523 InconsistentSchemaError
524 Raised when some of the required tables exist but not all.
525 """
526 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
527 result = self._session.execute(query, (self._keyspace,))
528 table_names = {row[0] for row in result.all()}
529
530 existing_tables = []
531 missing_tables = []
532 for table_enum in self._apdb_tables:
533 table_name = table_enum.table_name(self._prefix)
534 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
535 # Check prefix for time-partitioned tables.
536 exists = any(table.startswith(f"{table_name}_") for table in table_names)
537 else:
538 exists = table_name in table_names
539 if exists:
540 existing_tables.append(table_name)
541 else:
542 missing_tables.append(table_name)
543
544 if not missing_tables:
545 return False
546 elif not existing_tables:
547 return True
548 else:
549 raise InconsistentSchemaError(
550 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
551 )
552

◆ existing_tables()

dict[ApdbTables, list[str]] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.existing_tables ( self,
*ApdbTables args )
Return the list of existing table names for given table.

Parameters
----------
*args : `ApdbTables`
    Tables for which to return their existing table names.

Returns
-------
tables : `dict` [`ApdbTables`, `list`[`str`]]
    Mapping of the APDB table to the list of the existing table names.
    More than one name can be present in the list if configuration
    specifies per-partition tables.

Definition at line 553 of file apdbCassandraSchema.py.

553 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]:
554 """Return the list of existing table names for given table.
555
556 Parameters
557 ----------
558 *args : `ApdbTables`
559 Tables for which to return their existing table names.
560
561 Returns
562 -------
563 tables : `dict` [`ApdbTables`, `list`[`str`]]
564 Mapping of the APDB table to the list of the existing table names.
565 More than one name can be present in the list if configuration
566 specifies per-partition tables.
567 """
568 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables):
569 # Some of the tables should have per-partition tables.
570 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
571 result = self._session.execute(query, (self._keyspace,))
572 table_names = {row[0] for row in result.all()}
573
574 tables = {}
575 for table_enum in args:
576 base_name = table_enum.table_name(self._prefix)
577 if table_enum in self._time_partitioned_tables:
578 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")]
579 else:
580 tables[table_enum] = [base_name]
581 return tables
582 else:
583 # Do not check that they exist, we know that they should.
584 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args}
585

◆ getColumnMap()

Mapping[str, schema_model.Column] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.getColumnMap ( self,
ApdbTables | ExtraTables table_name )
Return mapping of column names to Column definitions.

Parameters
----------
table_name : `ApdbTables`
    One of known APDB table names.

Returns
-------
column_map : `dict`
    Mapping of column names to `ColumnDef` instances.

Definition at line 654 of file apdbCassandraSchema.py.

654 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]:
655 """Return mapping of column names to Column definitions.
656
657 Parameters
658 ----------
659 table_name : `ApdbTables`
660 One of known APDB table names.
661
662 Returns
663 -------
664 column_map : `dict`
665 Mapping of column names to `ColumnDef` instances.
666 """
667 table_schema = self._table_schema(table_name)
668 cmap = {column.name: column for column in table_schema.columns}
669 return cmap
670

◆ keyspace()

str lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.keyspace ( self)
Return Cassandra keyspace for APDB tables.

Definition at line 650 of file apdbCassandraSchema.py.

650 def keyspace(self) -> str:
651 """Return Cassandra keyspace for APDB tables."""
652 return self._keyspace
653

◆ makeSchema()

None lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.makeSchema ( self,
* ,
bool drop = False,
ApdbCassandraTimePartitionRange | None part_range = None,
int | None replication_factor = None,
CreateTableOptions | None table_options = None )
Create or re-create all tables.

Parameters
----------
drop : `bool`
    If True then drop tables before creating new ones. Note that
    only tables are dropped and not the whole keyspace.
part_range : `ApdbCassandraTimePartitionRange` or `None`
    Start and end partition number for time partitions. Used to create
    per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
    `None` then per-partition tables are not created.
replication_factor : `int`, optional
    Replication factor used when creating new keyspace, if keyspace
    already exists its replication factor is not changed.

Definition at line 720 of file apdbCassandraSchema.py.

727 ) -> None:
728 """Create or re-create all tables.
729
730 Parameters
731 ----------
732 drop : `bool`
733 If True then drop tables before creating new ones. Note that
734 only tables are dropped and not the whole keyspace.
735 part_range : `ApdbCassandraTimePartitionRange` or `None`
736 Start and end partition number for time partitions. Used to create
737 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
738 `None` then per-partition tables are not created.
739 replication_factor : `int`, optional
740 Replication factor used when creating new keyspace, if keyspace
741 already exists its replication factor is not changed.
742 """
743 # Try to create keyspace if it does not exist
744 if replication_factor is None:
745 replication_factor = 1
746
747 # If keyspace exists check its replication factor.
748 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
749 result = self._session.execute(query, (self._keyspace,))
750 if row := result.one():
751 # Check replication factor, ignore strategy class.
752 repl_config = cast(Mapping[str, str], row[0])
753 current_repl = int(repl_config["replication_factor"])
754 if replication_factor != current_repl:
755 raise ValueError(
756 f"New replication factor {replication_factor} differs from the replication factor "
757 f"for already existing keyspace: {current_repl}"
758 )
759 else:
760 # Need a new keyspace.
761 query = (
762 f'CREATE KEYSPACE "{self._keyspace}"'
763 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
764 f"{replication_factor}"
765 "}"
766 )
767 self._session.execute(query)
768
769 table_options = self._update_table_options(table_options)
770 for table in self._apdb_tables:
771 if table is ApdbTables.DiaObject and self._enable_replica and self._replica_skips_diaobjects:
772 continue
773 if table in (ApdbTables.SSObject, ApdbTables.SSSource):
774 # SSObject/SSSource do not exist in APDB, but are defined in
775 # ApdbTables. The reason is that AP wants to have schema of
776 # these tables for alert-related business.
777 continue
778 self._makeTableSchema(table, drop, part_range, table_options)
779 for extra_table in self._extra_tables:
780 self._makeTableSchema(extra_table, drop, part_range, table_options)
781

◆ partitionColumns()

list[str] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.partitionColumns ( self,
ApdbTables | ExtraTables table_name )
Return a list of columns used for table partitioning.

Parameters
----------
table_name : `ApdbTables`
    Table name in APDB schema

Returns
-------
columns : `list` of `str`
    Names of columns used for partitioning.

Definition at line 688 of file apdbCassandraSchema.py.

688 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
689 """Return a list of columns used for table partitioning.
690
691 Parameters
692 ----------
693 table_name : `ApdbTables`
694 Table name in APDB schema
695
696 Returns
697 -------
698 columns : `list` of `str`
699 Names of columns used for partitioning.
700 """
701 table_schema = self._table_schema(table_name)
702 return table_schema.annotations.get("cassandra:partitioning_columns", [])
703

◆ replication_enabled()

bool lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.replication_enabled ( self)
True when replication is enabled (`bool`).

Definition at line 508 of file apdbCassandraSchema.py.

508 def replication_enabled(self) -> bool:
509 """True when replication is enabled (`bool`)."""
510 return self._enable_replica
511

◆ table_row_size()

int lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.table_row_size ( self,
ApdbTables | ExtraTables table )
Return an estimate of the row size of a given table.

Parameters
----------
table : `ApdbTables` or `ExtraTables`

Returns
-------
size : `int`
    An estimate of a table row size.

Notes
-----
Returned size is not exact. When table has variable-size columns (e.g.
strings) may be incorrect. Stored data size or wire-level protocol size
can be smaller if some columns are not set or set to NULL.

Definition at line 890 of file apdbCassandraSchema.py.

890 def table_row_size(self, table: ApdbTables | ExtraTables) -> int:
891 """Return an estimate of the row size of a given table.
892
893 Parameters
894 ----------
895 table : `ApdbTables` or `ExtraTables`
896
897 Returns
898 -------
899 size : `int`
900 An estimate of a table row size.
901
902 Notes
903 -----
904 Returned size is not exact. When table has variable-size columns (e.g.
905 strings) may be incorrect. Stored data size or wire-level protocol size
906 can be smaller if some columns are not set or set to NULL.
907 """
908 table_schema = self._table_schema(table)
909 size = sum(column.size() for column in table_schema.columns)
910 return size
911

◆ tableName()

str lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.tableName ( self,
ApdbTables | ExtraTables table_name,
int | None time_partition = None )
Return Cassandra table name for APDB table.

Parameters
----------
table_name : `ApdbTables` or `ExtraTables`
    Table enum for which to generate table name.
time_partition : `int`, optional
    Optional time partition, only used for tables that support time
    patitioning.

Definition at line 637 of file apdbCassandraSchema.py.

637 def tableName(self, table_name: ApdbTables | ExtraTables, time_partition: int | None = None) -> str:
638 """Return Cassandra table name for APDB table.
639
640 Parameters
641 ----------
642 table_name : `ApdbTables` or `ExtraTables`
643 Table enum for which to generate table name.
644 time_partition : `int`, optional
645 Optional time partition, only used for tables that support time
646 patitioning.
647 """
648 return table_name.table_name(self._prefix, time_partition)
649

◆ time_partitioned_tables()

list[ApdbTables] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema.time_partitioned_tables ( self)
Make the list of time-partitioned tables.

Returns
-------
tables : `list` [`ApdbTables`]
    Tables the are time-partitioned.

Definition at line 912 of file apdbCassandraSchema.py.

912 def time_partitioned_tables(self) -> list[ApdbTables]:
913 """Make the list of time-partitioned tables.
914
915 Returns
916 -------
917 tables : `list` [`ApdbTables`]
918 Tables the are time-partitioned.
919 """
920 if not self._time_partition_tables:
921 return []
922 has_dia_object_table = not (self._enable_replica and self._replica_skips_diaobjects)
923 tables = [ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
924 if has_dia_object_table:
925 tables.append(ApdbTables.DiaObject)
926 return tables

Member Data Documentation

◆ _apdb_tables

Mapping[ApdbTables, schema_model.Table] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._apdb_tables = self._apdb_tables_schema(time_partition_tables)
protected

Definition at line 226 of file apdbCassandraSchema.py.

◆ _enable_replica

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._enable_replica = enable_replica
protected

Definition at line 221 of file apdbCassandraSchema.py.

◆ _extra_tables

Mapping[ExtraTables, schema_model.Table] lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._extra_tables = self._extra_tables_schema()
protected

Definition at line 227 of file apdbCassandraSchema.py.

◆ _has_chunk_sub_partitions

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._has_chunk_sub_partitions = has_chunk_sub_partitions
protected

Definition at line 223 of file apdbCassandraSchema.py.

◆ _has_visit_detector_table

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._has_visit_detector_table = has_visit_detector_table
protected

Definition at line 224 of file apdbCassandraSchema.py.

◆ _keyspace

str lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._keyspace = keyspace
protected

Definition at line 217 of file apdbCassandraSchema.py.

◆ _prefix

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._prefix = prefix
protected

Definition at line 219 of file apdbCassandraSchema.py.

◆ _replica_skips_diaobjects

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._replica_skips_diaobjects = replica_skips_diaobjects
protected

Definition at line 222 of file apdbCassandraSchema.py.

◆ _session

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._session = session
protected

Definition at line 216 of file apdbCassandraSchema.py.

◆ _spatially_partitioned_tables

list lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
staticprotected

Definition at line 202 of file apdbCassandraSchema.py.

◆ _table_schemas

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._table_schemas = table_schemas
protected

Definition at line 218 of file apdbCassandraSchema.py.

◆ _time_partition_tables

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._time_partition_tables = time_partition_tables
protected

Definition at line 220 of file apdbCassandraSchema.py.

◆ _time_partitioned_tables

lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._time_partitioned_tables
staticprotected
Initial value:
= [
ApdbTables.DiaObject,
ApdbTables.DiaSource,
ApdbTables.DiaForcedSource,
]

Definition at line 197 of file apdbCassandraSchema.py.

◆ _type_map

dict lsst.dax.apdb.cassandra.apdbCassandraSchema.ApdbCassandraSchema._type_map
staticprotected
Initial value:
= {
felis.datamodel.DataType.double: "DOUBLE",
felis.datamodel.DataType.float: "FLOAT",
felis.datamodel.DataType.timestamp: "TIMESTAMP",
felis.datamodel.DataType.long: "BIGINT",
felis.datamodel.DataType.int: "INT",
felis.datamodel.DataType.short: "SMALLINT",
felis.datamodel.DataType.byte: "TINYINT",
felis.datamodel.DataType.binary: "BLOB",
felis.datamodel.DataType.char: "TEXT",
felis.datamodel.DataType.string: "TEXT",
felis.datamodel.DataType.unicode: "TEXT",
felis.datamodel.DataType.text: "TEXT",
felis.datamodel.DataType.boolean: "BOOLEAN",
schema_model.ExtraDataTypes.UUID: "UUID",
}

Definition at line 179 of file apdbCassandraSchema.py.


The documentation for this class was generated from the following file: