427 def test_getChunks(self) -> None:
428 """Store and retrieve replica chunks."""
429
430 config = self.make_instance()
431 apdb = Apdb.from_config(config)
432 apdb_replica = ApdbReplica.from_config(config)
433 visit_time = self.visit_time
434
435 region1 = _make_region((1.0, 1.0, -1.0))
436 region2 = _make_region((-1.0, -1.0, -1.0))
437 nobj = 100
438 objects1 = makeObjectCatalog(region1, nobj, visit_time)
439 objects2 = makeObjectCatalog(region2, nobj, visit_time, start_id=nobj * 2)
440
441
442
443 visits = [
444 (astropy.time.Time("2021-01-01T00:01:00", format="isot", scale="tai"), objects1),
445 (astropy.time.Time("2021-01-01T00:02:00", format="isot", scale="tai"), objects2),
446 (astropy.time.Time("2021-01-01T00:11:00", format="isot", scale="tai"), objects1),
447 (astropy.time.Time("2021-01-01T00:12:00", format="isot", scale="tai"), objects2),
448 (astropy.time.Time("2021-01-01T00:45:00", format="isot", scale="tai"), objects1),
449 (astropy.time.Time("2021-01-01T00:46:00", format="isot", scale="tai"), objects2),
450 (astropy.time.Time("2021-03-01T00:01:00", format="isot", scale="tai"), objects1),
451 (astropy.time.Time("2021-03-01T00:02:00", format="isot", scale="tai"), objects2),
452 ]
453
454 start_id = 0
455 for visit_time, objects in visits:
456 sources = makeSourceCatalog(objects, visit_time, start_id=start_id)
457 fsources = makeForcedSourceCatalog(objects, visit_time, ccdVisitId=start_id)
458 apdb.store(visit_time, objects, sources, fsources)
459 start_id += nobj
460
461 replica_chunks = apdb_replica.getReplicaChunks()
462 if not self.enable_replica:
463 self.assertIsNone(replica_chunks)
464
465 with self.assertRaisesRegex(ValueError, "APDB is not configured for replication"):
466 apdb_replica.getDiaObjectsChunks([])
467
468 else:
469 assert replica_chunks is not None
470 self.assertEqual(len(replica_chunks), 4)
471
472 def _check_chunks(replica_chunks: list[ReplicaChunk], n_records: int | None = None) -> None:
473 if n_records is None:
474 n_records = len(replica_chunks) * nobj
475 res = apdb_replica.getDiaObjectsChunks(chunk.id for chunk in replica_chunks)
476 self.assert_table_data(res, n_records, ApdbTables.DiaObject)
477 res = apdb_replica.getDiaSourcesChunks(chunk.id for chunk in replica_chunks)
478 self.assert_table_data(res, n_records, ApdbTables.DiaSource)
479 res = apdb_replica.getDiaForcedSourcesChunks(chunk.id for chunk in replica_chunks)
480 self.assert_table_data(res, n_records, ApdbTables.DiaForcedSource)
481
482
483 _check_chunks(replica_chunks, 800)
484 _check_chunks(replica_chunks[1:], 600)
485 _check_chunks(replica_chunks[1:-1], 400)
486 _check_chunks(replica_chunks[2:3], 200)
487 _check_chunks([])
488
489
490 deleted_chunks = replica_chunks[:1]
491 apdb_replica.deleteReplicaChunks(chunk.id for chunk in deleted_chunks)
492
493
494 _check_chunks(deleted_chunks, 0)
495
496 replica_chunks = apdb_replica.getReplicaChunks()
497 assert replica_chunks is not None
498 self.assertEqual(len(replica_chunks), 3)
499
500 _check_chunks(replica_chunks, 600)
501