101 """Return the list of keyspaces with APDB databases.
106 Name of one of the hosts in Cassandra cluster.
110 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
111 Information about databases that contain APDB instance.
118 table_name = ApdbTables.DiaSource.table_name()
119 query =
"select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
120 result = session.execute(query, (table_name,))
121 keyspaces = [row[0]
for row
in result.all()]
127 template =
", ".join([
"%s"] * len(keyspaces))
129 "SELECT resource, role, permissions FROM system_auth.role_permissions "
130 f
"WHERE resource IN ({template}) ALLOW FILTERING"
132 resources = [f
"data/{keyspace}" for keyspace
in keyspaces]
134 result = session.execute(query, resources)
137 infos = {keyspace:
DatabaseInfo(name=keyspace, permissions={})
for keyspace
in keyspaces}
139 _, _, keyspace = row[0].partition(
"/")
141 role_permissions: set[str] = set(row[2])
142 infos[keyspace].permissions[role] = role_permissions
143 except cassandra.Unauthorized
as exc:
147 f
"Authentication information is not accessible to current user - {exc}", stacklevel=2
149 infos = {keyspace:
DatabaseInfo(name=keyspace)
for keyspace
in keyspaces}
153 return infos.values()
194 objects: Iterable[DiaObjectLocator],
195 sources: Iterable[DiaSourceLocator],
196 forced_sources: Iterable[DiaForcedSourceLocator],
199 context = self.
_apdb._context
200 config = context.config
201 keyspace = self.
_apdb._keyspace
202 has_dia_object_table =
not (config.enable_replica
and config.replica_skips_diaobjects)
205 partitions = defaultdict(list)
206 for object
in objects:
207 apdb_part = self.
apdb_part(object.ra, object.dec)
208 partitions[apdb_part].append(object.diaObjectId)
209 object_ids = set(itertools.chain.from_iterable(partitions.values()))
212 source_groups = defaultdict(list)
213 for source
in sources:
214 if source.diaObjectId
in object_ids:
215 source_groups[source.diaObjectId].append(source)
220 for apdb_part, oids
in partitions.items():
222 object_count += len(oids)
223 for oid_chunk
in chunk_iterable(oids, 1000):
224 oids_str =
",".join(str(oid)
for oid
in oid_chunk)
225 object_deletes.append(
227 f
'DELETE FROM "{keyspace}"."DiaObjectLast" '
228 f
'WHERE apdb_part = {apdb_part} and "diaObjectId" IN ({oids_str});',
234 if has_dia_object_table:
246 oids_by_partition: dict[tuple[int, int], list[int]] = defaultdict(list)
247 for apdb_part, oids
in partitions.items():
249 temporal_partitions = {
250 self.
apdb_time_part(src.midpointMjdTai)
for src
in source_groups.get(oid, [])
252 for time_part
in temporal_partitions:
253 oids_by_partition[(apdb_part, time_part)].append(oid)
254 for (apdb_part, time_part), oids
in oids_by_partition.items():
255 for oid_chunk
in chunk_iterable(oids, 1000):
256 oids_str =
",".join(str(oid)
for oid
in oid_chunk)
257 if config.partitioning.time_partition_tables:
258 table_name = context.schema.tableName(ApdbTables.DiaObject, time_part)
259 object_deletes.append(
261 f
'DELETE FROM "{keyspace}"."{table_name}" '
262 f
'WHERE apdb_part = {apdb_part} AND "diaObjectId" IN ({oids_str})',
267 table_name = context.schema.tableName(ApdbTables.DiaObject)
268 object_deletes.append(
270 f
'DELETE FROM "{keyspace}"."{table_name}" '
271 f
"WHERE apdb_part = {apdb_part} AND apdb_time_part = {time_part} "
272 f
'AND "diaObjectId" IN ({oids_str})',
278 for oid_chunk
in chunk_iterable(sorted(object_ids), 1000):
279 oids_str =
",".join(str(oid)
for oid
in oid_chunk)
280 object_deletes.append(
282 f
'DELETE FROM "{keyspace}"."DiaObjectLastToPartition" '
283 f
'WHERE "diaObjectId" IN ({oids_str})',
289 source_partitions = defaultdict(list)
290 for source
in itertools.chain.from_iterable(source_groups.values()):
291 apdb_part = self.
apdb_part(source.ra, source.dec)
293 source_partitions[(apdb_part, apdb_time_part)].append(source)
297 for (apdb_part, apdb_time_part), source_list
in source_partitions.items():
298 source_ids = sorted(source.diaSourceId
for source
in source_list)
299 source_count += len(source_ids)
300 for id_chunk
in chunk_iterable(source_ids, 1000):
301 ids_str =
",".join(str(id)
for id
in id_chunk)
302 if config.partitioning.time_partition_tables:
303 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part)
304 source_deletes.append(
306 f
'DELETE FROM "{keyspace}"."{table_name}" '
307 f
'WHERE apdb_part = {apdb_part} and "diaSourceId" IN ({ids_str})',
312 table_name = context.schema.tableName(ApdbTables.DiaSource)
313 source_deletes.append(
315 f
'DELETE FROM "{keyspace}"."{table_name}" '
316 f
"WHERE apdb_part = {apdb_part} AND apdb_time_part = {apdb_time_part} "
317 f
'AND "diaSourceId" IN ({ids_str})',
323 forced_source_partitions = defaultdict(list)
324 for forced_source
in forced_sources:
325 if forced_source.diaObjectId
in object_ids:
326 apdb_part = self.
apdb_part(forced_source.ra, forced_source.dec)
327 apdb_time_part = self.
apdb_time_part(forced_source.midpointMjdTai)
328 forced_source_partitions[(apdb_part, apdb_time_part)].append(forced_source)
330 forced_source_deletes = []
331 forced_source_count = 0
332 for (apdb_part, apdb_time_part), forced_source_list
in forced_source_partitions.items():
333 clustering_keys = sorted(
334 (fsource.diaObjectId, fsource.visit, fsource.detector)
for fsource
in forced_source_list
336 forced_source_count += len(clustering_keys)
337 for key_chunk
in chunk_iterable(clustering_keys, 1000):
338 cl_str =
",".join(f
"({oid}, {v}, {d})" for oid, v, d
in key_chunk)
339 if config.partitioning.time_partition_tables:
340 table_name = context.schema.tableName(ApdbTables.DiaForcedSource, apdb_time_part)
341 forced_source_deletes.append(
343 f
'DELETE FROM "{keyspace}"."{table_name}" '
344 f
"WHERE apdb_part = {apdb_part}"
345 f
'AND ("diaObjectId", visit, detector) IN ({cl_str})',
350 table_name = context.schema.tableName(ApdbTables.DiaForcedSource)
351 forced_source_deletes.append(
353 f
'DELETE FROM "{keyspace}"."{table_name}" '
354 f
"WHERE apdb_part = {apdb_part} "
355 f
"AND apdb_time_part = {apdb_time_part} "
356 f
'AND ("diaObjectId", visit, detector) IN ({cl_str})',
362 "Deleting %d objects, %d sources, and %d forced sources",
369 with self.
_timer(
"delete_forced_sources"):
370 execute_concurrent(context.session, forced_source_deletes)
371 with self.
_timer(
"delete_sources"):
372 execute_concurrent(context.session, source_deletes)
373 with self.
_timer(
"delete_objects"):
374 execute_concurrent(context.session, object_deletes)
397 time: astropy.time.Time,
398 forward: bool =
True,
399 max_delta: astropy.time.TimeDelta |
None =
None,
401 """Extend set of time-partitioned tables to include specified time.
405 time : `astropy.time.Time`
406 Time to which to extend partitions.
407 forward : `bool`, optional
408 If `True` then extend partitions into the future, time should be
409 later than the end time of the last existing partition. If `False`
410 then extend partitions into the past, time should be earlier than
411 the start time of the first existing partition.
412 max_delta : `astropy.time.TimeDelta`, optional
413 Maximum possible extension of the aprtitions, default is 365 days.
417 partitions : `list` [`int`]
418 List of partitons added to the database, empty list returned if
419 ``time`` is already in the existing partition range.
424 Raised if APDB instance does not use time-partition tables.
426 Raised if extension request exceeds time limit of ``max_delta``.
428 if max_delta
is None:
429 max_delta = astropy.time.TimeDelta(365, format=
"jd")
431 context = self.
_apdb._context
434 part_range = context.time_partitions_range
436 raise TypeError(
"This APDB instance does not use time-partitioned tables.")
443 _LOG.debug(
"New partitions to create: %s", partitions)
446 keyspace = self.
_apdb._keyspace
447 tables = context.schema.time_partitioned_tables()
451 table_name_token =
"%TABLE_NAME%"
454 existing_table_name = context.schema.tableName(table, part_range.end)
455 query = f
'DESCRIBE TABLE "{keyspace}"."{existing_table_name}"'
456 result = context.session.execute(query).one()
458 raise LookupError(f
'Failed to read schema for table "{keyspace}"."{existing_table_name}"')
459 schema: str = result.create_statement
460 schema = schema.replace(existing_table_name, table_name_token)
461 table_schemas[table] = schema
464 exsisting_tables = context.schema.existing_tables(*tables)
466 new_tables = {context.schema.tableName(table, partition)
for partition
in partitions}
467 old_tables = new_tables.intersection(exsisting_tables[table])
469 raise ValueError(f
"Some to be created tables already exist: {old_tables}")
472 for table, schema
in table_schemas.items():
473 for partition
in partitions:
474 new_table_name = context.schema.tableName(table, partition)
475 _LOG.debug(
"Creating table %s", new_table_name)
476 new_ddl = schema.replace(table_name_token, new_table_name)
477 context.session.execute(new_ddl)
480 if context.has_time_partition_meta:
482 part_range.end = max(partitions)
484 part_range.start = min(partitions)
485 part_range.save_to_meta(context.metadata)
491 time: astropy.time.Time,
493 max_delta: astropy.time.TimeDelta,
495 """Make the list of time partitions to add to current range."""
496 context = self.
_apdb._context
497 part_range = context.time_partitions_range
498 assert part_range
is not None
500 new_partition = context.partitioner.time_partition(time)
502 if new_partition <= part_range.end:
504 "Partition for time=%s (%d) is below existing end (%d)",
510 _, end = context.partitioner.partition_period(part_range.end)
511 if time - end > max_delta:
513 f
"Extension exceeds limit: current end time = {end.isot}, new end time = {time.isot}, "
514 f
"limit = {max_delta.jd} days"
516 partitions = list(range(part_range.end + 1, new_partition + 1))
518 if new_partition >= part_range.start:
520 "Partition for time=%s (%d) is above existing start (%d)",
526 start, _ = context.partitioner.partition_period(part_range.start)
527 if start - time > max_delta:
529 f
"Extension exceeds limit: current start time = {start.isot}, "
530 f
"new start time = {time.isot}, "
531 f
"limit = {max_delta.jd} days"
533 partitions = list(range(new_partition, part_range.start))
538 self, time: astropy.time.Time, after: bool =
False, *, confirm: ConfirmDeletePartitions |
None =
None
540 """Delete time-partitioned tables before or after specified time.
544 time : `astropy.time.Time`
545 Time before or after which to remove partitions. Partition that
546 includes this time is not deleted.
547 after : `bool`, optional
548 If `True` then delete partitions after the specified time. Default
549 is to delete partitions before this time.
550 confirm : `~collections.abc.Callable`, optional
551 A callable that will be called to confirm deletion of the
552 partitions. The callable needs to accept three keyword arguments:
554 - `partitions` - a list of partition numbers to be deleted,
555 - `tables` - a list of table names to be deleted,
556 - `partitioner` - a `Partitioner` instance.
558 Partitions are deleted only if callable returns `True`.
562 partitions : `list` [`int`]
563 List of partitons deleted from the database, empty list returned if
569 Raised if APDB instance does not use time-partition tables.
571 Raised if requested to delete all partitions.
573 context = self.
_apdb._context
576 part_range = context.time_partitions_range
578 raise TypeError(
"This APDB instance does not use time-partitioned tables.")
585 if min(partitions) == part_range.start
and max(partitions) == part_range.end:
586 raise ValueError(
"Cannot delete all partitions.")
589 keyspace = self.
_apdb._keyspace
590 tables = context.schema.time_partitioned_tables()
594 for partition
in partitions:
595 table_names.append(context.schema.tableName(table, partition))
597 if confirm
is not None:
600 answer = confirm(partitions=partitions, tables=table_names, partitioner=context.partitioner)
604 for table_name
in table_names:
605 _LOG.debug(
"Dropping table %s", table_name)
607 query = f
'DROP TABLE IF EXISTS "{keyspace}"."{table_name}"'
608 context.session.execute(query)
611 if context.has_time_partition_meta:
613 part_range.end = min(partitions) - 1
615 part_range.start = max(partitions) + 1
616 part_range.save_to_meta(context.metadata)