Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+83433b07ee,g16d25e1f1b+23bc9e47ac,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+c74e83464e,g53246c7159+8c5ae1fdc5,g55585698de+1e04e59700,g56a49b3a55+92a7603e7a,g60b5630c4e+1e04e59700,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+1e04e59700,g989de1cb63+3fc8cb0b9e,g9d31334357+1e04e59700,g9f33ca652e+0a83e03614,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+8b4b9c8ed7,gb89ab40317+3fc8cb0b9e,gc0af124501+57ccba3ad1,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gd794735e4e+7be992507c,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+c7f52e58fe,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+747388abfa,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
_apdb.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbSchemaUpdateTest", "ApdbTest", "update_schema_yaml"]
25
26import contextlib
27import datetime
28import os
29import tempfile
30import unittest
31from abc import ABC, abstractmethod
32from collections.abc import Iterator
33from tempfile import TemporaryDirectory
34from typing import TYPE_CHECKING, Any
35
36import astropy.time
37import pandas
38import yaml
39from lsst.sphgeom import Angle, Circle, LonLat, Region, UnitVector3d
40
41from .. import (
42 Apdb,
43 ApdbConfig,
44 ApdbReplica,
45 ApdbTableData,
46 ApdbTables,
47 IncompatibleVersionError,
48 ReplicaChunk,
49 VersionTuple,
50)
51from .data_factory import makeForcedSourceCatalog, makeObjectCatalog, makeSourceCatalog, makeSSObjectCatalog
52
53if TYPE_CHECKING:
54 from ..pixelization import Pixelization
55
56 class TestCaseMixin(unittest.TestCase):
57 """Base class for mixin test classes that use TestCase methods."""
58
59else:
60
61 class TestCaseMixin:
62 """Do-nothing definition of mixin base class for regular execution."""
63
64
65def _make_region(xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
66 """Make a region to use in tests"""
67 pointing_v = UnitVector3d(*xyz)
68 fov = 0.0013 # radians
69 region = Circle(pointing_v, Angle(fov / 2))
70 return region
71
72
73@contextlib.contextmanager
75 schema_file: str,
76 drop_metadata: bool = False,
77 version: str | None = None,
78) -> Iterator[str]:
79 """Update schema definition and return name of the new schema file.
80
81 Parameters
82 ----------
83 schema_file : `str`
84 Path for the existing YAML file with APDB schema.
85 drop_metadata : `bool`
86 If `True` then remove metadata table from the list of tables.
87 version : `str` or `None`
88 If non-empty string then set schema version to this string, if empty
89 string then remove schema version from config, if `None` - don't change
90 the version in config.
91
92 Yields
93 ------
94 Path for the updated configuration file.
95 """
96 with open(schema_file) as yaml_stream:
97 schemas_list = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
98 # Edit YAML contents.
99 for schema in schemas_list:
100 # Optionally drop metadata table.
101 if drop_metadata:
102 schema["tables"] = [table for table in schema["tables"] if table["name"] != "metadata"]
103 if version is not None:
104 if version == "":
105 del schema["version"]
106 else:
107 schema["version"] = version
108
109 with TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
110 output_path = os.path.join(tmpdir, "schema.yaml")
111 with open(output_path, "w") as yaml_stream:
112 yaml.dump_all(schemas_list, stream=yaml_stream)
113 yield output_path
114
115
117 """Base class for Apdb tests that can be specialized for concrete
118 implementation.
119
120 This can only be used as a mixin class for a unittest.TestCase and it
121 calls various assert methods.
122 """
123
124 time_partition_tables = False
125 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
126
127 fsrc_requires_id_list = False
128 """Should be set to True if getDiaForcedSources requires object IDs"""
129
130 enable_replica: bool = False
131 """Set to true when support for replication is configured"""
132
133 schema_path: str
134 """Location of the Felis schema file."""
135
136 timestamp_type_name: str
137 """Type name of timestamp columns in DataFrames returned from queries."""
138
139 extra_chunk_columns = 1
140 """Number of additional columns in chunk tables."""
141
142 # number of columns as defined in tests/config/schema.yaml
143 table_column_count = {
144 ApdbTables.DiaObject: 8,
145 ApdbTables.DiaObjectLast: 5,
146 ApdbTables.DiaSource: 12,
147 ApdbTables.DiaForcedSource: 8,
148 ApdbTables.SSObject: 3,
149 }
150
151 @abstractmethod
152 def make_instance(self, **kwargs: Any) -> ApdbConfig:
153 """Make database instance and return configuration for it."""
154 raise NotImplementedError()
155
156 @abstractmethod
157 def getDiaObjects_table(self) -> ApdbTables:
158 """Return type of table returned from getDiaObjects method."""
159 raise NotImplementedError()
160
161 @abstractmethod
162 def pixelization(self, config: ApdbConfig) -> Pixelization:
163 """Return pixelization used by implementation."""
164 raise NotImplementedError()
165
166 def assert_catalog(self, catalog: Any, rows: int, table: ApdbTables) -> None:
167 """Validate catalog type and size
168
169 Parameters
170 ----------
171 catalog : `object`
172 Expected type of this is ``pandas.DataFrame``.
173 rows : `int`
174 Expected number of rows in a catalog.
175 table : `ApdbTables`
176 APDB table type.
177 """
178 self.assertIsInstance(catalog, pandas.DataFrame)
179 self.assertEqual(catalog.shape[0], rows)
180 self.assertEqual(catalog.shape[1], self.table_column_count[table])
181
182 def assert_table_data(self, catalog: Any, rows: int, table: ApdbTables) -> None:
183 """Validate catalog type and size
184
185 Parameters
186 ----------
187 catalog : `object`
188 Expected type of this is `ApdbTableData`.
189 rows : `int`
190 Expected number of rows in a catalog.
191 table : `ApdbTables`
192 APDB table type.
193 extra_columns : `int`
194 Count of additional columns expected in ``catalog``.
195 """
196 self.assertIsInstance(catalog, ApdbTableData)
197 n_rows = sum(1 for row in catalog.rows())
198 self.assertEqual(n_rows, rows)
199 # One extra column for replica chunk id
200 self.assertEqual(
201 len(catalog.column_names()), self.table_column_count[table] + self.extra_chunk_columns
202 )
203
204 def test_makeSchema(self) -> None:
205 """Test for making APDB schema."""
206 config = self.make_instance()
207 apdb = Apdb.from_config(config)
208
209 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
210 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
211 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
212 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
213 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
214
215 # Test from_uri factory method with the same config.
216 with tempfile.NamedTemporaryFile() as tmpfile:
217 config.save(tmpfile.name)
218 apdb = Apdb.from_uri(tmpfile.name)
219
220 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
221 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
222 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
223 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
224 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
225
226 def test_empty_gets(self) -> None:
227 """Test for getting data from empty database.
228
229 All get() methods should return empty results, only useful for
230 checking that code is not broken.
231 """
232 # use non-zero months for Forced/Source fetching
233 config = self.make_instance()
234 apdb = Apdb.from_config(config)
235
236 region = _make_region()
237 visit_time = self.visit_time
238
239 res: pandas.DataFrame | None
240
241 # get objects by region
242 res = apdb.getDiaObjects(region)
243 self.assert_catalog(res, 0, self.getDiaObjects_table())
244
245 # get sources by region
246 res = apdb.getDiaSources(region, None, visit_time)
247 self.assert_catalog(res, 0, ApdbTables.DiaSource)
248
249 res = apdb.getDiaSources(region, [], visit_time)
250 self.assert_catalog(res, 0, ApdbTables.DiaSource)
251
252 # get sources by object ID, non-empty object list
253 res = apdb.getDiaSources(region, [1, 2, 3], visit_time)
254 self.assert_catalog(res, 0, ApdbTables.DiaSource)
255
256 # get forced sources by object ID, empty object list
257 res = apdb.getDiaForcedSources(region, [], visit_time)
258 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
259
260 # get sources by object ID, non-empty object list
261 res = apdb.getDiaForcedSources(region, [1, 2, 3], visit_time)
262 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
263
264 # data_factory's ccdVisitId generation corresponds to (1, 1)
265 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
266 self.assertFalse(res)
267
268 # get sources by region
269 if self.fsrc_requires_id_list:
270 with self.assertRaises(NotImplementedError):
271 apdb.getDiaForcedSources(region, None, visit_time)
272 else:
273 res = apdb.getDiaForcedSources(region, None, visit_time)
274 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
275
276 def test_empty_gets_0months(self) -> None:
277 """Test for getting data from empty database.
278
279 All get() methods should return empty DataFrame or None.
280 """
281 # set read_sources_months to 0 so that Forced/Sources are None
282 config = self.make_instance(read_sources_months=0, read_forced_sources_months=0)
283 apdb = Apdb.from_config(config)
284
285 region = _make_region()
286 visit_time = self.visit_time
287
288 res: pandas.DataFrame | None
289
290 # get objects by region
291 res = apdb.getDiaObjects(region)
292 self.assert_catalog(res, 0, self.getDiaObjects_table())
293
294 # get sources by region
295 res = apdb.getDiaSources(region, None, visit_time)
296 self.assertIs(res, None)
297
298 # get sources by object ID, empty object list
299 res = apdb.getDiaSources(region, [], visit_time)
300 self.assertIs(res, None)
301
302 # get forced sources by object ID, empty object list
303 res = apdb.getDiaForcedSources(region, [], visit_time)
304 self.assertIs(res, None)
305
306 # Database is empty, no images exist.
307 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
308 self.assertFalse(res)
309
310 def test_storeObjects(self) -> None:
311 """Store and retrieve DiaObjects."""
312 # don't care about sources.
313 config = self.make_instance()
314 apdb = Apdb.from_config(config)
315
316 region = _make_region()
317 visit_time = self.visit_time
318
319 # make catalog with Objects
320 catalog = makeObjectCatalog(region, 100, visit_time)
321
322 # store catalog
323 apdb.store(visit_time, catalog)
324
325 # read it back and check sizes
326 res = apdb.getDiaObjects(region)
327 self.assert_catalog(res, len(catalog), self.getDiaObjects_table())
328
329 # TODO: test apdb.contains with generic implementation from DM-41671
330
331 def test_storeObjects_empty(self) -> None:
332 """Test calling storeObject when there are no objects: see DM-43270."""
333 config = self.make_instance()
334 apdb = Apdb.from_config(config)
335 region = _make_region()
336 visit_time = self.visit_time
337 # make catalog with no Objects
338 catalog = makeObjectCatalog(region, 0, visit_time)
339
340 with self.assertLogs("lsst.dax.apdb", level="DEBUG") as cm:
341 apdb.store(visit_time, catalog)
342 self.assertIn("No objects", "\n".join(cm.output))
343
344 def test_storeMovingObject(self) -> None:
345 """Store and retrieve DiaObject which changes its position."""
346 # don't care about sources.
347 config = self.make_instance()
348 apdb = Apdb.from_config(config)
349 pixelization = self.pixelization(config)
350
351 lon_deg, lat_deg = 0.0, 0.0
352 lonlat1 = LonLat.fromDegrees(lon_deg - 1.0, lat_deg)
353 lonlat2 = LonLat.fromDegrees(lon_deg + 1.0, lat_deg)
354 uv1 = UnitVector3d(lonlat1)
355 uv2 = UnitVector3d(lonlat2)
356
357 # Check that they fall into different pixels.
358 self.assertNotEqual(pixelization.pixel(uv1), pixelization.pixel(uv2))
359
360 # Store one object at two different positions.
361 visit_time1 = self.visit_time
362 catalog1 = makeObjectCatalog(lonlat1, 1, visit_time1)
363 apdb.store(visit_time1, catalog1)
364
365 visit_time2 = visit_time1 + astropy.time.TimeDelta(120.0, format="sec")
366 catalog1 = makeObjectCatalog(lonlat2, 1, visit_time2)
367 apdb.store(visit_time2, catalog1)
368
369 # Make region covering both points.
370 region = Circle(UnitVector3d(LonLat.fromDegrees(lon_deg, lat_deg)), Angle.fromDegrees(1.1))
371 self.assertTrue(region.contains(uv1))
372 self.assertTrue(region.contains(uv2))
373
374 # Read it back, must return the latest one.
375 res = apdb.getDiaObjects(region)
376 self.assert_catalog(res, 1, self.getDiaObjects_table())
377
378 def test_storeSources(self) -> None:
379 """Store and retrieve DiaSources."""
380 config = self.make_instance()
381 apdb = Apdb.from_config(config)
382
383 region = _make_region()
384 visit_time = self.visit_time
385
386 # have to store Objects first
387 objects = makeObjectCatalog(region, 100, visit_time)
388 oids = list(objects["diaObjectId"])
389 sources = makeSourceCatalog(objects, visit_time)
390
391 # save the objects and sources
392 apdb.store(visit_time, objects, sources)
393
394 # read it back, no ID filtering
395 res = apdb.getDiaSources(region, None, visit_time)
396 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
397
398 # read it back and filter by ID
399 res = apdb.getDiaSources(region, oids, visit_time)
400 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
401
402 # read it back to get schema
403 res = apdb.getDiaSources(region, [], visit_time)
404 self.assert_catalog(res, 0, ApdbTables.DiaSource)
405
406 # test if a visit is present
407 # data_factory's ccdVisitId generation corresponds to (1, 1)
408 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
409 self.assertTrue(res)
410 # non-existent image
411 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
412 self.assertFalse(res)
413
414 def test_storeForcedSources(self) -> None:
415 """Store and retrieve DiaForcedSources."""
416 config = self.make_instance()
417 apdb = Apdb.from_config(config)
418
419 region = _make_region()
420 visit_time = self.visit_time
421
422 # have to store Objects first
423 objects = makeObjectCatalog(region, 100, visit_time)
424 oids = list(objects["diaObjectId"])
425 catalog = makeForcedSourceCatalog(objects, visit_time)
426
427 apdb.store(visit_time, objects, forced_sources=catalog)
428
429 # read it back and check sizes
430 res = apdb.getDiaForcedSources(region, oids, visit_time)
431 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
432
433 # read it back to get schema
434 res = apdb.getDiaForcedSources(region, [], visit_time)
435 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
436
437 # data_factory's ccdVisitId generation corresponds to (1, 1)
438 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
439 self.assertTrue(res)
440 # non-existent image
441 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
442 self.assertFalse(res)
443
444 def test_timestamps(self) -> None:
445 """Check that timestamp return type is as expected."""
446 config = self.make_instance()
447 apdb = Apdb.from_config(config)
448
449 region = _make_region()
450 visit_time = self.visit_time
451
452 # have to store Objects first
453 time_before = datetime.datetime.now()
454 # Cassandra has a millisecond precision, so subtract 1ms to allow for
455 # truncated returned values.
456 time_before -= datetime.timedelta(milliseconds=1)
457 objects = makeObjectCatalog(region, 100, visit_time)
458 oids = list(objects["diaObjectId"])
459 catalog = makeForcedSourceCatalog(objects, visit_time)
460 time_after = datetime.datetime.now()
461
462 apdb.store(visit_time, objects, forced_sources=catalog)
463
464 # read it back and check sizes
465 res = apdb.getDiaForcedSources(region, oids, visit_time)
466 assert res is not None
467 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
468
469 self.assertIn("time_processed", res.dtypes)
470 dtype = res.dtypes["time_processed"]
471 self.assertEqual(dtype.name, self.timestamp_type_name)
472 # Verify that returned time is sensible.
473 self.assertTrue(all(time_before <= dt <= time_after for dt in res["time_processed"]))
474
475 def test_getChunks(self) -> None:
476 """Store and retrieve replica chunks."""
477 # don't care about sources.
478 config = self.make_instance()
479 apdb = Apdb.from_config(config)
480 apdb_replica = ApdbReplica.from_config(config)
481 visit_time = self.visit_time
482
483 region1 = _make_region((1.0, 1.0, -1.0))
484 region2 = _make_region((-1.0, -1.0, -1.0))
485 nobj = 100
486 objects1 = makeObjectCatalog(region1, nobj, visit_time)
487 objects2 = makeObjectCatalog(region2, nobj, visit_time, start_id=nobj * 2)
488
489 # With the default 10 minutes replica chunk window we should have 4
490 # records.
491 visits = [
492 (astropy.time.Time("2021-01-01T00:01:00", format="isot", scale="tai"), objects1),
493 (astropy.time.Time("2021-01-01T00:02:00", format="isot", scale="tai"), objects2),
494 (astropy.time.Time("2021-01-01T00:11:00", format="isot", scale="tai"), objects1),
495 (astropy.time.Time("2021-01-01T00:12:00", format="isot", scale="tai"), objects2),
496 (astropy.time.Time("2021-01-01T00:45:00", format="isot", scale="tai"), objects1),
497 (astropy.time.Time("2021-01-01T00:46:00", format="isot", scale="tai"), objects2),
498 (astropy.time.Time("2021-03-01T00:01:00", format="isot", scale="tai"), objects1),
499 (astropy.time.Time("2021-03-01T00:02:00", format="isot", scale="tai"), objects2),
500 ]
501
502 start_id = 0
503 for visit_time, objects in visits:
504 sources = makeSourceCatalog(objects, visit_time, start_id=start_id)
505 fsources = makeForcedSourceCatalog(objects, visit_time, visit=start_id)
506 apdb.store(visit_time, objects, sources, fsources)
507 start_id += nobj
508
509 replica_chunks = apdb_replica.getReplicaChunks()
510 if not self.enable_replica:
511 self.assertIsNone(replica_chunks)
512
513 with self.assertRaisesRegex(ValueError, "APDB is not configured for replication"):
514 apdb_replica.getDiaObjectsChunks([])
515
516 else:
517 assert replica_chunks is not None
518 self.assertEqual(len(replica_chunks), 4)
519
520 def _check_chunks(replica_chunks: list[ReplicaChunk], n_records: int | None = None) -> None:
521 if n_records is None:
522 n_records = len(replica_chunks) * nobj
523 res = apdb_replica.getDiaObjectsChunks(chunk.id for chunk in replica_chunks)
524 self.assert_table_data(res, n_records, ApdbTables.DiaObject)
525 res = apdb_replica.getDiaSourcesChunks(chunk.id for chunk in replica_chunks)
526 self.assert_table_data(res, n_records, ApdbTables.DiaSource)
527 res = apdb_replica.getDiaForcedSourcesChunks(chunk.id for chunk in replica_chunks)
528 self.assert_table_data(res, n_records, ApdbTables.DiaForcedSource)
529
530 # read it back and check sizes
531 _check_chunks(replica_chunks, 800)
532 _check_chunks(replica_chunks[1:], 600)
533 _check_chunks(replica_chunks[1:-1], 400)
534 _check_chunks(replica_chunks[2:3], 200)
535 _check_chunks([])
536
537 # try to remove some of those
538 deleted_chunks = replica_chunks[:1]
539 apdb_replica.deleteReplicaChunks(chunk.id for chunk in deleted_chunks)
540
541 # All queries on deleted ids should return empty set.
542 _check_chunks(deleted_chunks, 0)
543
544 replica_chunks = apdb_replica.getReplicaChunks()
545 assert replica_chunks is not None
546 self.assertEqual(len(replica_chunks), 3)
547
548 _check_chunks(replica_chunks, 600)
549
550 def test_storeSSObjects(self) -> None:
551 """Store and retrieve SSObjects."""
552 # don't care about sources.
553 config = self.make_instance()
554 apdb = Apdb.from_config(config)
555
556 # make catalog with SSObjects
557 catalog = makeSSObjectCatalog(100, flags=1)
558
559 # store catalog
560 apdb.storeSSObjects(catalog)
561
562 # read it back and check sizes
563 res = apdb.getSSObjects()
564 self.assert_catalog(res, len(catalog), ApdbTables.SSObject)
565
566 # check that override works, make catalog with SSObjects, ID = 51-150
567 catalog = makeSSObjectCatalog(100, 51, flags=2)
568 apdb.storeSSObjects(catalog)
569 res = apdb.getSSObjects()
570 self.assert_catalog(res, 150, ApdbTables.SSObject)
571 self.assertEqual(len(res[res["flags"] == 1]), 50)
572 self.assertEqual(len(res[res["flags"] == 2]), 100)
573
574 def test_reassignObjects(self) -> None:
575 """Reassign DiaObjects."""
576 # don't care about sources.
577 config = self.make_instance()
578 apdb = Apdb.from_config(config)
579
580 region = _make_region()
581 visit_time = self.visit_time
582 objects = makeObjectCatalog(region, 100, visit_time)
583 oids = list(objects["diaObjectId"])
584 sources = makeSourceCatalog(objects, visit_time)
585 apdb.store(visit_time, objects, sources)
586
587 catalog = makeSSObjectCatalog(100)
588 apdb.storeSSObjects(catalog)
589
590 # read it back and filter by ID
591 res = apdb.getDiaSources(region, oids, visit_time)
592 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
593
594 apdb.reassignDiaSources({1: 1, 2: 2, 5: 5})
595 res = apdb.getDiaSources(region, oids, visit_time)
596 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
597
598 with self.assertRaisesRegex(ValueError, r"do not exist.*\D1000"):
599 apdb.reassignDiaSources(
600 {
601 1000: 1,
602 7: 3,
603 }
604 )
605 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
606
607 def test_midpointMjdTai_src(self) -> None:
608 """Test for time filtering of DiaSources."""
609 config = self.make_instance()
610 apdb = Apdb.from_config(config)
611
612 region = _make_region()
613 # 2021-01-01 plus 360 days is 2021-12-27
614 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
615 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
616 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
617 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
618 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
619
620 objects = makeObjectCatalog(region, 100, visit_time0)
621 oids = list(objects["diaObjectId"])
622 sources = makeSourceCatalog(objects, src_time1, 0)
623 apdb.store(src_time1, objects, sources)
624
625 sources = makeSourceCatalog(objects, src_time2, 100)
626 apdb.store(src_time2, objects, sources)
627
628 # reading at time of last save should read all
629 res = apdb.getDiaSources(region, oids, src_time2)
630 self.assert_catalog(res, 200, ApdbTables.DiaSource)
631
632 # one second before 12 months
633 res = apdb.getDiaSources(region, oids, visit_time0)
634 self.assert_catalog(res, 200, ApdbTables.DiaSource)
635
636 # reading at later time of last save should only read a subset
637 res = apdb.getDiaSources(region, oids, visit_time1)
638 self.assert_catalog(res, 100, ApdbTables.DiaSource)
639
640 # reading at later time of last save should only read a subset
641 res = apdb.getDiaSources(region, oids, visit_time2)
642 self.assert_catalog(res, 0, ApdbTables.DiaSource)
643
644 def test_midpointMjdTai_fsrc(self) -> None:
645 """Test for time filtering of DiaForcedSources."""
646 config = self.make_instance()
647 apdb = Apdb.from_config(config)
648
649 region = _make_region()
650 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
651 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
652 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
653 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
654 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
655
656 objects = makeObjectCatalog(region, 100, visit_time0)
657 oids = list(objects["diaObjectId"])
658 sources = makeForcedSourceCatalog(objects, src_time1, 1)
659 apdb.store(src_time1, objects, forced_sources=sources)
660
661 sources = makeForcedSourceCatalog(objects, src_time2, 2)
662 apdb.store(src_time2, objects, forced_sources=sources)
663
664 # reading at time of last save should read all
665 res = apdb.getDiaForcedSources(region, oids, src_time2)
666 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
667
668 # one second before 12 months
669 res = apdb.getDiaForcedSources(region, oids, visit_time0)
670 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
671
672 # reading at later time of last save should only read a subset
673 res = apdb.getDiaForcedSources(region, oids, visit_time1)
674 self.assert_catalog(res, 100, ApdbTables.DiaForcedSource)
675
676 # reading at later time of last save should only read a subset
677 res = apdb.getDiaForcedSources(region, oids, visit_time2)
678 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
679
680 def test_metadata(self) -> None:
681 """Simple test for writing/reading metadata table"""
682 config = self.make_instance()
683 apdb = Apdb.from_config(config)
684 metadata = apdb.metadata
685
686 # APDB should write two or three metadata items with version numbers
687 # and a frozen JSON config.
688 self.assertFalse(metadata.empty())
689 expected_rows = 4 if self.enable_replica else 3
690 self.assertEqual(len(list(metadata.items())), expected_rows)
691
692 metadata.set("meta", "data")
693 metadata.set("data", "meta")
694
695 self.assertFalse(metadata.empty())
696 self.assertTrue(set(metadata.items()) >= {("meta", "data"), ("data", "meta")})
697
698 with self.assertRaisesRegex(KeyError, "Metadata key 'meta' already exists"):
699 metadata.set("meta", "data1")
700
701 metadata.set("meta", "data2", force=True)
702 self.assertTrue(set(metadata.items()) >= {("meta", "data2"), ("data", "meta")})
703
704 self.assertTrue(metadata.delete("meta"))
705 self.assertIsNone(metadata.get("meta"))
706 self.assertFalse(metadata.delete("meta"))
707
708 self.assertEqual(metadata.get("data"), "meta")
709 self.assertEqual(metadata.get("meta", "meta"), "meta")
710
711 def test_schemaVersionFromYaml(self) -> None:
712 """Check version number handling for reading schema from YAML."""
713 config = self.make_instance()
714 default_schema = config.schema_file
715 apdb = Apdb.from_config(config)
716 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
717
718 with update_schema_yaml(default_schema, version="") as schema_file:
719 config = self.make_instance(schema_file=schema_file)
720 apdb = Apdb.from_config(config)
721 self.assertEqual(
722 apdb._schema.schemaVersion(), VersionTuple(0, 1, 0) # type: ignore[attr-defined]
723 )
724
725 with update_schema_yaml(default_schema, version="99.0.0") as schema_file:
726 config = self.make_instance(schema_file=schema_file)
727 apdb = Apdb.from_config(config)
728 self.assertEqual(
729 apdb._schema.schemaVersion(), VersionTuple(99, 0, 0) # type: ignore[attr-defined]
730 )
731
732 def test_config_freeze(self) -> None:
733 """Test that some config fields are correctly frozen in database."""
734 config = self.make_instance()
735
736 # `enable_replica` is the only parameter that is frozen in all
737 # implementations.
738 config.enable_replica = not self.enable_replica
739 apdb = Apdb.from_config(config)
740 frozen_config = apdb.config # type: ignore[attr-defined]
741 self.assertEqual(frozen_config.enable_replica, self.enable_replica)
742
743
745 """Base class for unit tests that verify how schema changes work."""
746
747 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
748
749 @abstractmethod
750 def make_instance(self, **kwargs: Any) -> ApdbConfig:
751 """Make config class instance used in all tests.
752
753 This method should return configuration that point to the identical
754 database instance on each call (i.e. ``db_url`` must be the same,
755 which also means for sqlite it has to use on-disk storage).
756 """
757 raise NotImplementedError()
758
759 def test_schema_add_replica(self) -> None:
760 """Check that new code can work with old schema without replica
761 tables.
762 """
763 # Make schema without replica tables.
764 config = self.make_instance(enable_replica=False)
765 apdb = Apdb.from_config(config)
766 apdb_replica = ApdbReplica.from_config(config)
767
768 # Make APDB instance configured for replication.
769 config.enable_replica = True
770 apdb = Apdb.from_config(config)
771
772 # Try to insert something, should work OK.
773 region = _make_region()
774 visit_time = self.visit_time
775
776 # have to store Objects first
777 objects = makeObjectCatalog(region, 100, visit_time)
778 sources = makeSourceCatalog(objects, visit_time)
779 fsources = makeForcedSourceCatalog(objects, visit_time)
780 apdb.store(visit_time, objects, sources, fsources)
781
782 # There should be no replica chunks.
783 replica_chunks = apdb_replica.getReplicaChunks()
784 self.assertIsNone(replica_chunks)
785
786 def test_schemaVersionCheck(self) -> None:
787 """Check version number compatibility."""
788 config = self.make_instance()
789 apdb = Apdb.from_config(config)
790
791 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
792
793 # Claim that schema version is now 99.0.0, must raise an exception.
794 with update_schema_yaml(config.schema_file, version="99.0.0") as schema_file:
795 config.schema_file = schema_file
796 with self.assertRaises(IncompatibleVersionError):
797 apdb = Apdb.from_config(config)
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:750
None assert_catalog(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:166
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:152
Pixelization pixelization(self, ApdbConfig config)
Definition _apdb.py:162
None assert_table_data(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:182
ApdbTables getDiaObjects_table(self)
Definition _apdb.py:157
Angle represents an angle in radians.
Definition Angle.h:50
Circle is a circular region on the unit sphere that contains its boundary.
Definition Circle.h:54
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Region _make_region(tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:65
Iterator[str] update_schema_yaml(str schema_file, bool drop_metadata=False, str|None version=None)
Definition _apdb.py:78