LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
_apdb.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbSchemaUpdateTest", "ApdbTest", "update_schema_yaml"]
25
26import contextlib
27import datetime
28import os
29import tempfile
30import unittest
31from abc import ABC, abstractmethod
32from collections.abc import Iterator
33from tempfile import TemporaryDirectory
34from typing import TYPE_CHECKING, Any
35
36import astropy.time
37import pandas
38import yaml
39from lsst.sphgeom import Angle, Circle, LonLat, Region, UnitVector3d
40
41from .. import (
42 Apdb,
43 ApdbConfig,
44 ApdbReplica,
45 ApdbTableData,
46 ApdbTables,
47 IncompatibleVersionError,
48 ReplicaChunk,
49 VersionTuple,
50)
51from .data_factory import makeForcedSourceCatalog, makeObjectCatalog, makeSourceCatalog, makeSSObjectCatalog
52
53if TYPE_CHECKING:
54 from ..pixelization import Pixelization
55
56 class TestCaseMixin(unittest.TestCase):
57 """Base class for mixin test classes that use TestCase methods."""
58
59else:
60
61 class TestCaseMixin:
62 """Do-nothing definition of mixin base class for regular execution."""
63
64
65def _make_region(xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
66 """Make a region to use in tests"""
67 pointing_v = UnitVector3d(*xyz)
68 fov = 0.05 # radians
69 region = Circle(pointing_v, Angle(fov / 2))
70 return region
71
72
73@contextlib.contextmanager
75 schema_file: str,
76 drop_metadata: bool = False,
77 version: str | None = None,
78) -> Iterator[str]:
79 """Update schema definition and return name of the new schema file.
80
81 Parameters
82 ----------
83 schema_file : `str`
84 Path for the existing YAML file with APDB schema.
85 drop_metadata : `bool`
86 If `True` then remove metadata table from the list of tables.
87 version : `str` or `None`
88 If non-empty string then set schema version to this string, if empty
89 string then remove schema version from config, if `None` - don't change
90 the version in config.
91
92 Yields
93 ------
94 Path for the updated configuration file.
95 """
96 with open(schema_file) as yaml_stream:
97 schemas_list = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
98 # Edit YAML contents.
99 for schema in schemas_list:
100 # Optionally drop metadata table.
101 if drop_metadata:
102 schema["tables"] = [table for table in schema["tables"] if table["name"] != "metadata"]
103 if version is not None:
104 if version == "":
105 del schema["version"]
106 else:
107 schema["version"] = version
108
109 with TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
110 output_path = os.path.join(tmpdir, "schema.yaml")
111 with open(output_path, "w") as yaml_stream:
112 yaml.dump_all(schemas_list, stream=yaml_stream)
113 yield output_path
114
115
117 """Base class for Apdb tests that can be specialized for concrete
118 implementation.
119
120 This can only be used as a mixin class for a unittest.TestCase and it
121 calls various assert methods.
122 """
123
124 time_partition_tables = False
125 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
126
127 fsrc_requires_id_list = False
128 """Should be set to True if getDiaForcedSources requires object IDs"""
129
130 enable_replica: bool = False
131 """Set to true when support for replication is configured"""
132
133 schema_path: str
134 """Location of the Felis schema file."""
135
136 timestamp_type_name: str
137 """Type name of timestamp columns in DataFrames returned from queries."""
138
139 # number of columns as defined in tests/config/schema.yaml
140 table_column_count = {
141 ApdbTables.DiaObject: 8,
142 ApdbTables.DiaObjectLast: 5,
143 ApdbTables.DiaSource: 12,
144 ApdbTables.DiaForcedSource: 8,
145 ApdbTables.SSObject: 3,
146 }
147
148 @abstractmethod
149 def make_instance(self, **kwargs: Any) -> ApdbConfig:
150 """Make database instance and return configuration for it."""
151 raise NotImplementedError()
152
153 @abstractmethod
154 def getDiaObjects_table(self) -> ApdbTables:
155 """Return type of table returned from getDiaObjects method."""
156 raise NotImplementedError()
157
158 @abstractmethod
159 def pixelization(self, config: ApdbConfig) -> Pixelization:
160 """Return pixelization used by implementation."""
161 raise NotImplementedError()
162
163 def assert_catalog(self, catalog: Any, rows: int, table: ApdbTables) -> None:
164 """Validate catalog type and size
165
166 Parameters
167 ----------
168 catalog : `object`
169 Expected type of this is ``pandas.DataFrame``.
170 rows : `int`
171 Expected number of rows in a catalog.
172 table : `ApdbTables`
173 APDB table type.
174 """
175 self.assertIsInstance(catalog, pandas.DataFrame)
176 self.assertEqual(catalog.shape[0], rows)
177 self.assertEqual(catalog.shape[1], self.table_column_count[table])
178
179 def assert_table_data(self, catalog: Any, rows: int, table: ApdbTables) -> None:
180 """Validate catalog type and size
181
182 Parameters
183 ----------
184 catalog : `object`
185 Expected type of this is `ApdbTableData`.
186 rows : `int`
187 Expected number of rows in a catalog.
188 table : `ApdbTables`
189 APDB table type.
190 extra_columns : `int`
191 Count of additional columns expected in ``catalog``.
192 """
193 self.assertIsInstance(catalog, ApdbTableData)
194 n_rows = sum(1 for row in catalog.rows())
195 self.assertEqual(n_rows, rows)
196 # One extra column for replica chunk id
197 self.assertEqual(len(catalog.column_names()), self.table_column_count[table] + 1)
198
199 def test_makeSchema(self) -> None:
200 """Test for making APDB schema."""
201 config = self.make_instance()
202 apdb = Apdb.from_config(config)
203
204 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
205 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
206 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
207 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
208 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
209
210 # Test from_uri factory method with the same config.
211 with tempfile.NamedTemporaryFile() as tmpfile:
212 config.save(tmpfile.name)
213 apdb = Apdb.from_uri(tmpfile.name)
214
215 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
216 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
217 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
218 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
219 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
220
221 def test_empty_gets(self) -> None:
222 """Test for getting data from empty database.
223
224 All get() methods should return empty results, only useful for
225 checking that code is not broken.
226 """
227 # use non-zero months for Forced/Source fetching
228 config = self.make_instance()
229 apdb = Apdb.from_config(config)
230
231 region = _make_region()
232 visit_time = self.visit_time
233
234 res: pandas.DataFrame | None
235
236 # get objects by region
237 res = apdb.getDiaObjects(region)
238 self.assert_catalog(res, 0, self.getDiaObjects_table())
239
240 # get sources by region
241 res = apdb.getDiaSources(region, None, visit_time)
242 self.assert_catalog(res, 0, ApdbTables.DiaSource)
243
244 res = apdb.getDiaSources(region, [], visit_time)
245 self.assert_catalog(res, 0, ApdbTables.DiaSource)
246
247 # get sources by object ID, non-empty object list
248 res = apdb.getDiaSources(region, [1, 2, 3], visit_time)
249 self.assert_catalog(res, 0, ApdbTables.DiaSource)
250
251 # get forced sources by object ID, empty object list
252 res = apdb.getDiaForcedSources(region, [], visit_time)
253 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
254
255 # get sources by object ID, non-empty object list
256 res = apdb.getDiaForcedSources(region, [1, 2, 3], visit_time)
257 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
258
259 # data_factory's ccdVisitId generation corresponds to (1, 1)
260 res = apdb.containsVisitDetector(visit=1, detector=1)
261 self.assertFalse(res)
262
263 # get sources by region
264 if self.fsrc_requires_id_list:
265 with self.assertRaises(NotImplementedError):
266 apdb.getDiaForcedSources(region, None, visit_time)
267 else:
268 res = apdb.getDiaForcedSources(region, None, visit_time)
269 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
270
271 def test_empty_gets_0months(self) -> None:
272 """Test for getting data from empty database.
273
274 All get() methods should return empty DataFrame or None.
275 """
276 # set read_sources_months to 0 so that Forced/Sources are None
277 config = self.make_instance(read_sources_months=0, read_forced_sources_months=0)
278 apdb = Apdb.from_config(config)
279
280 region = _make_region()
281 visit_time = self.visit_time
282
283 res: pandas.DataFrame | None
284
285 # get objects by region
286 res = apdb.getDiaObjects(region)
287 self.assert_catalog(res, 0, self.getDiaObjects_table())
288
289 # get sources by region
290 res = apdb.getDiaSources(region, None, visit_time)
291 self.assertIs(res, None)
292
293 # get sources by object ID, empty object list
294 res = apdb.getDiaSources(region, [], visit_time)
295 self.assertIs(res, None)
296
297 # get forced sources by object ID, empty object list
298 res = apdb.getDiaForcedSources(region, [], visit_time)
299 self.assertIs(res, None)
300
301 # Database is empty, no images exist.
302 res = apdb.containsVisitDetector(visit=1, detector=1)
303 self.assertFalse(res)
304
305 def test_storeObjects(self) -> None:
306 """Store and retrieve DiaObjects."""
307 # don't care about sources.
308 config = self.make_instance()
309 apdb = Apdb.from_config(config)
310
311 region = _make_region()
312 visit_time = self.visit_time
313
314 # make catalog with Objects
315 catalog = makeObjectCatalog(region, 100, visit_time)
316
317 # store catalog
318 apdb.store(visit_time, catalog)
319
320 # read it back and check sizes
321 res = apdb.getDiaObjects(region)
322 self.assert_catalog(res, len(catalog), self.getDiaObjects_table())
323
324 # TODO: test apdb.contains with generic implementation from DM-41671
325
326 def test_storeObjects_empty(self) -> None:
327 """Test calling storeObject when there are no objects: see DM-43270."""
328 config = self.make_instance()
329 apdb = Apdb.from_config(config)
330 region = _make_region()
331 visit_time = self.visit_time
332 # make catalog with no Objects
333 catalog = makeObjectCatalog(region, 0, visit_time)
334
335 with self.assertLogs("lsst.dax.apdb", level="DEBUG") as cm:
336 apdb.store(visit_time, catalog)
337 self.assertIn("No objects", "\n".join(cm.output))
338
339 def test_storeMovingObject(self) -> None:
340 """Store and retrieve DiaObject which changes its position."""
341 # don't care about sources.
342 config = self.make_instance()
343 apdb = Apdb.from_config(config)
344 pixelization = self.pixelization(config)
345
346 lon_deg, lat_deg = 0.0, 0.0
347 lonlat1 = LonLat.fromDegrees(lon_deg - 1.0, lat_deg)
348 lonlat2 = LonLat.fromDegrees(lon_deg + 1.0, lat_deg)
349 uv1 = UnitVector3d(lonlat1)
350 uv2 = UnitVector3d(lonlat2)
351
352 # Check that they fall into different pixels.
353 self.assertNotEqual(pixelization.pixel(uv1), pixelization.pixel(uv2))
354
355 # Store one object at two different positions.
356 visit_time1 = self.visit_time
357 catalog1 = makeObjectCatalog(lonlat1, 1, visit_time1)
358 apdb.store(visit_time1, catalog1)
359
360 visit_time2 = visit_time1 + astropy.time.TimeDelta(120.0, format="sec")
361 catalog1 = makeObjectCatalog(lonlat2, 1, visit_time2)
362 apdb.store(visit_time2, catalog1)
363
364 # Make region covering both points.
365 region = Circle(UnitVector3d(LonLat.fromDegrees(lon_deg, lat_deg)), Angle.fromDegrees(1.1))
366 self.assertTrue(region.contains(uv1))
367 self.assertTrue(region.contains(uv2))
368
369 # Read it back, must return the latest one.
370 res = apdb.getDiaObjects(region)
371 self.assert_catalog(res, 1, self.getDiaObjects_table())
372
373 def test_storeSources(self) -> None:
374 """Store and retrieve DiaSources."""
375 config = self.make_instance()
376 apdb = Apdb.from_config(config)
377
378 region = _make_region()
379 visit_time = self.visit_time
380
381 # have to store Objects first
382 objects = makeObjectCatalog(region, 100, visit_time)
383 oids = list(objects["diaObjectId"])
384 sources = makeSourceCatalog(objects, visit_time)
385
386 # save the objects and sources
387 apdb.store(visit_time, objects, sources)
388
389 # read it back, no ID filtering
390 res = apdb.getDiaSources(region, None, visit_time)
391 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
392
393 # read it back and filter by ID
394 res = apdb.getDiaSources(region, oids, visit_time)
395 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
396
397 # read it back to get schema
398 res = apdb.getDiaSources(region, [], visit_time)
399 self.assert_catalog(res, 0, ApdbTables.DiaSource)
400
401 # test if a visit is present
402 # data_factory's ccdVisitId generation corresponds to (1, 1)
403 res = apdb.containsVisitDetector(visit=1, detector=1)
404 self.assertTrue(res)
405 # non-existent image
406 res = apdb.containsVisitDetector(visit=2, detector=42)
407 self.assertFalse(res)
408
409 def test_storeForcedSources(self) -> None:
410 """Store and retrieve DiaForcedSources."""
411 config = self.make_instance()
412 apdb = Apdb.from_config(config)
413
414 region = _make_region()
415 visit_time = self.visit_time
416
417 # have to store Objects first
418 objects = makeObjectCatalog(region, 100, visit_time)
419 oids = list(objects["diaObjectId"])
420 catalog = makeForcedSourceCatalog(objects, visit_time)
421
422 apdb.store(visit_time, objects, forced_sources=catalog)
423
424 # read it back and check sizes
425 res = apdb.getDiaForcedSources(region, oids, visit_time)
426 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
427
428 # read it back to get schema
429 res = apdb.getDiaForcedSources(region, [], visit_time)
430 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
431
432 # data_factory's ccdVisitId generation corresponds to (1, 1)
433 res = apdb.containsVisitDetector(visit=1, detector=1)
434 self.assertTrue(res)
435 # non-existent image
436 res = apdb.containsVisitDetector(visit=2, detector=42)
437 self.assertFalse(res)
438
439 def test_timestamps(self) -> None:
440 """Check that timestamp return type is as expected."""
441 config = self.make_instance()
442 apdb = Apdb.from_config(config)
443
444 region = _make_region()
445 visit_time = self.visit_time
446
447 # have to store Objects first
448 time_before = datetime.datetime.now()
449 # Cassandra has a millisecond precision, so subtract 1ms to allow for
450 # truncated returned values.
451 time_before -= datetime.timedelta(milliseconds=1)
452 objects = makeObjectCatalog(region, 100, visit_time)
453 oids = list(objects["diaObjectId"])
454 catalog = makeForcedSourceCatalog(objects, visit_time)
455 time_after = datetime.datetime.now()
456
457 apdb.store(visit_time, objects, forced_sources=catalog)
458
459 # read it back and check sizes
460 res = apdb.getDiaForcedSources(region, oids, visit_time)
461 assert res is not None
462 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
463
464 self.assertIn("time_processed", res.dtypes)
465 dtype = res.dtypes["time_processed"]
466 self.assertEqual(dtype.name, self.timestamp_type_nametimestamp_type_name)
467 # Verify that returned time is sensible.
468 self.assertTrue(all(time_before <= dt <= time_after for dt in res["time_processed"]))
469
470 def test_getChunks(self) -> None:
471 """Store and retrieve replica chunks."""
472 # don't care about sources.
473 config = self.make_instance()
474 apdb = Apdb.from_config(config)
475 apdb_replica = ApdbReplica.from_config(config)
476 visit_time = self.visit_time
477
478 region1 = _make_region((1.0, 1.0, -1.0))
479 region2 = _make_region((-1.0, -1.0, -1.0))
480 nobj = 100
481 objects1 = makeObjectCatalog(region1, nobj, visit_time)
482 objects2 = makeObjectCatalog(region2, nobj, visit_time, start_id=nobj * 2)
483
484 # With the default 10 minutes replica chunk window we should have 4
485 # records.
486 visits = [
487 (astropy.time.Time("2021-01-01T00:01:00", format="isot", scale="tai"), objects1),
488 (astropy.time.Time("2021-01-01T00:02:00", format="isot", scale="tai"), objects2),
489 (astropy.time.Time("2021-01-01T00:11:00", format="isot", scale="tai"), objects1),
490 (astropy.time.Time("2021-01-01T00:12:00", format="isot", scale="tai"), objects2),
491 (astropy.time.Time("2021-01-01T00:45:00", format="isot", scale="tai"), objects1),
492 (astropy.time.Time("2021-01-01T00:46:00", format="isot", scale="tai"), objects2),
493 (astropy.time.Time("2021-03-01T00:01:00", format="isot", scale="tai"), objects1),
494 (astropy.time.Time("2021-03-01T00:02:00", format="isot", scale="tai"), objects2),
495 ]
496
497 start_id = 0
498 for visit_time, objects in visits:
499 sources = makeSourceCatalog(objects, visit_time, start_id=start_id)
500 fsources = makeForcedSourceCatalog(objects, visit_time, visit=start_id)
501 apdb.store(visit_time, objects, sources, fsources)
502 start_id += nobj
503
504 replica_chunks = apdb_replica.getReplicaChunks()
506 self.assertIsNone(replica_chunks)
507
508 with self.assertRaisesRegex(ValueError, "APDB is not configured for replication"):
509 apdb_replica.getDiaObjectsChunks([])
510
511 else:
512 assert replica_chunks is not None
513 self.assertEqual(len(replica_chunks), 4)
514
515 def _check_chunks(replica_chunks: list[ReplicaChunk], n_records: int | None = None) -> None:
516 if n_records is None:
517 n_records = len(replica_chunks) * nobj
518 res = apdb_replica.getDiaObjectsChunks(chunk.id for chunk in replica_chunks)
519 self.assert_table_data(res, n_records, ApdbTables.DiaObject)
520 res = apdb_replica.getDiaSourcesChunks(chunk.id for chunk in replica_chunks)
521 self.assert_table_data(res, n_records, ApdbTables.DiaSource)
522 res = apdb_replica.getDiaForcedSourcesChunks(chunk.id for chunk in replica_chunks)
523 self.assert_table_data(res, n_records, ApdbTables.DiaForcedSource)
524
525 # read it back and check sizes
526 _check_chunks(replica_chunks, 800)
527 _check_chunks(replica_chunks[1:], 600)
528 _check_chunks(replica_chunks[1:-1], 400)
529 _check_chunks(replica_chunks[2:3], 200)
530 _check_chunks([])
531
532 # try to remove some of those
533 deleted_chunks = replica_chunks[:1]
534 apdb_replica.deleteReplicaChunks(chunk.id for chunk in deleted_chunks)
535
536 # All queries on deleted ids should return empty set.
537 _check_chunks(deleted_chunks, 0)
538
539 replica_chunks = apdb_replica.getReplicaChunks()
540 assert replica_chunks is not None
541 self.assertEqual(len(replica_chunks), 3)
542
543 _check_chunks(replica_chunks, 600)
544
545 def test_storeSSObjects(self) -> None:
546 """Store and retrieve SSObjects."""
547 # don't care about sources.
548 config = self.make_instance()
549 apdb = Apdb.from_config(config)
550
551 # make catalog with SSObjects
552 catalog = makeSSObjectCatalog(100, flags=1)
553
554 # store catalog
555 apdb.storeSSObjects(catalog)
556
557 # read it back and check sizes
558 res = apdb.getSSObjects()
559 self.assert_catalog(res, len(catalog), ApdbTables.SSObject)
560
561 # check that override works, make catalog with SSObjects, ID = 51-150
562 catalog = makeSSObjectCatalog(100, 51, flags=2)
563 apdb.storeSSObjects(catalog)
564 res = apdb.getSSObjects()
565 self.assert_catalog(res, 150, ApdbTables.SSObject)
566 self.assertEqual(len(res[res["flags"] == 1]), 50)
567 self.assertEqual(len(res[res["flags"] == 2]), 100)
568
569 def test_reassignObjects(self) -> None:
570 """Reassign DiaObjects."""
571 # don't care about sources.
572 config = self.make_instance()
573 apdb = Apdb.from_config(config)
574
575 region = _make_region()
576 visit_time = self.visit_time
577 objects = makeObjectCatalog(region, 100, visit_time)
578 oids = list(objects["diaObjectId"])
579 sources = makeSourceCatalog(objects, visit_time)
580 apdb.store(visit_time, objects, sources)
581
582 catalog = makeSSObjectCatalog(100)
583 apdb.storeSSObjects(catalog)
584
585 # read it back and filter by ID
586 res = apdb.getDiaSources(region, oids, visit_time)
587 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
588
589 apdb.reassignDiaSources({1: 1, 2: 2, 5: 5})
590 res = apdb.getDiaSources(region, oids, visit_time)
591 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
592
593 with self.assertRaisesRegex(ValueError, r"do not exist.*\D1000"):
594 apdb.reassignDiaSources(
595 {
596 1000: 1,
597 7: 3,
598 }
599 )
600 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
601
602 def test_midpointMjdTai_src(self) -> None:
603 """Test for time filtering of DiaSources."""
604 config = self.make_instance()
605 apdb = Apdb.from_config(config)
606
607 region = _make_region()
608 # 2021-01-01 plus 360 days is 2021-12-27
609 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
610 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
611 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
612 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
613 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
614
615 objects = makeObjectCatalog(region, 100, visit_time0)
616 oids = list(objects["diaObjectId"])
617 sources = makeSourceCatalog(objects, src_time1, 0)
618 apdb.store(src_time1, objects, sources)
619
620 sources = makeSourceCatalog(objects, src_time2, 100)
621 apdb.store(src_time2, objects, sources)
622
623 # reading at time of last save should read all
624 res = apdb.getDiaSources(region, oids, src_time2)
625 self.assert_catalog(res, 200, ApdbTables.DiaSource)
626
627 # one second before 12 months
628 res = apdb.getDiaSources(region, oids, visit_time0)
629 self.assert_catalog(res, 200, ApdbTables.DiaSource)
630
631 # reading at later time of last save should only read a subset
632 res = apdb.getDiaSources(region, oids, visit_time1)
633 self.assert_catalog(res, 100, ApdbTables.DiaSource)
634
635 # reading at later time of last save should only read a subset
636 res = apdb.getDiaSources(region, oids, visit_time2)
637 self.assert_catalog(res, 0, ApdbTables.DiaSource)
638
639 def test_midpointMjdTai_fsrc(self) -> None:
640 """Test for time filtering of DiaForcedSources."""
641 config = self.make_instance()
642 apdb = Apdb.from_config(config)
643
644 region = _make_region()
645 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
646 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
647 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
648 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
649 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
650
651 objects = makeObjectCatalog(region, 100, visit_time0)
652 oids = list(objects["diaObjectId"])
653 sources = makeForcedSourceCatalog(objects, src_time1, 1)
654 apdb.store(src_time1, objects, forced_sources=sources)
655
656 sources = makeForcedSourceCatalog(objects, src_time2, 2)
657 apdb.store(src_time2, objects, forced_sources=sources)
658
659 # reading at time of last save should read all
660 res = apdb.getDiaForcedSources(region, oids, src_time2)
661 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
662
663 # one second before 12 months
664 res = apdb.getDiaForcedSources(region, oids, visit_time0)
665 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
666
667 # reading at later time of last save should only read a subset
668 res = apdb.getDiaForcedSources(region, oids, visit_time1)
669 self.assert_catalog(res, 100, ApdbTables.DiaForcedSource)
670
671 # reading at later time of last save should only read a subset
672 res = apdb.getDiaForcedSources(region, oids, visit_time2)
673 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
674
675 def test_metadata(self) -> None:
676 """Simple test for writing/reading metadata table"""
677 config = self.make_instance()
678 apdb = Apdb.from_config(config)
679 metadata = apdb.metadata
680
681 # APDB should write two or three metadata items with version numbers
682 # and a frozen JSON config.
683 self.assertFalse(metadata.empty())
684 expected_rows = 4 if self.enable_replicaenable_replica else 3
685 self.assertEqual(len(list(metadata.items())), expected_rows)
686
687 metadata.set("meta", "data")
688 metadata.set("data", "meta")
689
690 self.assertFalse(metadata.empty())
691 self.assertTrue(set(metadata.items()) >= {("meta", "data"), ("data", "meta")})
692
693 with self.assertRaisesRegex(KeyError, "Metadata key 'meta' already exists"):
694 metadata.set("meta", "data1")
695
696 metadata.set("meta", "data2", force=True)
697 self.assertTrue(set(metadata.items()) >= {("meta", "data2"), ("data", "meta")})
698
699 self.assertTrue(metadata.delete("meta"))
700 self.assertIsNone(metadata.get("meta"))
701 self.assertFalse(metadata.delete("meta"))
702
703 self.assertEqual(metadata.get("data"), "meta")
704 self.assertEqual(metadata.get("meta", "meta"), "meta")
705
706 def test_nometadata(self) -> None:
707 """Test case for when metadata table is missing"""
708 # We expect that schema includes metadata table, drop it.
709 with update_schema_yaml(self.schema_pathschema_path, drop_metadata=True) as schema_file:
710 config = self.make_instance(schema_file=schema_file)
711 apdb = Apdb.from_config(config)
712 metadata = apdb.metadata
713
714 self.assertTrue(metadata.empty())
715 self.assertEqual(list(metadata.items()), [])
716 with self.assertRaisesRegex(RuntimeError, "Metadata table does not exist"):
717 metadata.set("meta", "data")
718
719 self.assertTrue(metadata.empty())
720 self.assertIsNone(metadata.get("meta"))
721
722 # Also check what happens when configured schema has metadata, but
723 # database is missing it. Database was initialized inside above context
724 # without metadata table, here we use schema config which includes
725 # metadata table.
726 config.schema_file = self.schema_pathschema_path
727 apdb = Apdb.from_config(config)
728 metadata = apdb.metadata
729 self.assertTrue(metadata.empty())
730
731 def test_schemaVersionFromYaml(self) -> None:
732 """Check version number handling for reading schema from YAML."""
733 config = self.make_instance()
734 default_schema = config.schema_file
735 apdb = Apdb.from_config(config)
736 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
737
738 with update_schema_yaml(default_schema, version="") as schema_file:
739 config = self.make_instance(schema_file=schema_file)
740 apdb = Apdb.from_config(config)
741 self.assertEqual(
742 apdb._schema.schemaVersion(), VersionTuple(0, 1, 0) # type: ignore[attr-defined]
743 )
744
745 with update_schema_yaml(default_schema, version="99.0.0") as schema_file:
746 config = self.make_instance(schema_file=schema_file)
747 apdb = Apdb.from_config(config)
748 self.assertEqual(
749 apdb._schema.schemaVersion(), VersionTuple(99, 0, 0) # type: ignore[attr-defined]
750 )
751
752 def test_config_freeze(self) -> None:
753 """Test that some config fields are correctly frozen in database."""
754 config = self.make_instance()
755
756 # `use_insert_id` is the only parameter that is frozen in all
757 # implementations.
758 config.use_insert_id = not self.enable_replicaenable_replica
759 apdb = Apdb.from_config(config)
760 frozen_config = apdb.config # type: ignore[attr-defined]
761 self.assertEqual(frozen_config.use_insert_id, self.enable_replicaenable_replica)
762
763
765 """Base class for unit tests that verify how schema changes work."""
766
767 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
768
769 @abstractmethod
770 def make_instance(self, **kwargs: Any) -> ApdbConfig:
771 """Make config class instance used in all tests.
772
773 This method should return configuration that point to the identical
774 database instance on each call (i.e. ``db_url`` must be the same,
775 which also means for sqlite it has to use on-disk storage).
776 """
777 raise NotImplementedError()
778
779 def test_schema_add_replica(self) -> None:
780 """Check that new code can work with old schema without replica
781 tables.
782 """
783 # Make schema without replica tables.
784 config = self.make_instance(use_insert_id=False)
785 apdb = Apdb.from_config(config)
786 apdb_replica = ApdbReplica.from_config(config)
787
788 # Make APDB instance configured for replication.
789 config.use_insert_id = True
790 apdb = Apdb.from_config(config)
791
792 # Try to insert something, should work OK.
793 region = _make_region()
794 visit_time = self.visit_time
795
796 # have to store Objects first
797 objects = makeObjectCatalog(region, 100, visit_time)
798 sources = makeSourceCatalog(objects, visit_time)
799 fsources = makeForcedSourceCatalog(objects, visit_time)
800 apdb.store(visit_time, objects, sources, fsources)
801
802 # There should be no replica chunks.
803 replica_chunks = apdb_replica.getReplicaChunks()
804 self.assertIsNone(replica_chunks)
805
806 def test_schemaVersionCheck(self) -> None:
807 """Check version number compatibility."""
808 config = self.make_instance()
809 apdb = Apdb.from_config(config)
810
811 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
812
813 # Claim that schema version is now 99.0.0, must raise an exception.
814 with update_schema_yaml(config.schema_file, version="99.0.0") as schema_file:
815 config.schema_file = schema_file
816 with self.assertRaises(IncompatibleVersionError):
817 apdb = Apdb.from_config(config)
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:770
None assert_catalog(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:163
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:149
Pixelization pixelization(self, ApdbConfig config)
Definition _apdb.py:159
None assert_table_data(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:179
ApdbTables getDiaObjects_table(self)
Definition _apdb.py:154
Angle represents an angle in radians.
Definition Angle.h:50
Circle is a circular region on the unit sphere that contains its boundary.
Definition Circle.h:54
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Region _make_region(tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:65
Iterator[str] update_schema_yaml(str schema_file, bool drop_metadata=False, str|None version=None)
Definition _apdb.py:78