LSST Applications g013ef56533+b8d55c8942,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+7a3b874d60,g210f2d0738+7416ca6900,g262e1987ae+1d557ba9a3,g29ae962dfc+519d34895e,g2cef7863aa+aef1011c0b,g30d7c61c20+36d16ea71a,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+cb326ad149,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+6240c63dbc,g64539dfbff+7416ca6900,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g74acd417e5+0bae3c876a,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+dee7680868,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+14b8164b5b,g98a1a72a9c+8389601a76,g98df359435+fff771c62d,gb8cb2b794d+6728931916,gbf99507273+8c5ae1fdc5,gc2a301910b+7416ca6900,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+0bae3c876a,ge410e46f29+f459a6810c,ge41e95a9f2+7416ca6900,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
_apdb.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbSchemaUpdateTest", "ApdbTest", "update_schema_yaml"]
25
26import contextlib
27import os
28import tempfile
29from abc import ABC, abstractmethod
30from collections.abc import Iterator
31from tempfile import TemporaryDirectory
32from typing import TYPE_CHECKING, Any
33
34import astropy.time
35import felis.datamodel
36import pandas
37import yaml
38
39from lsst.sphgeom import Angle, Circle, LonLat, Region, UnitVector3d
40
41from .. import (
42 Apdb,
43 ApdbConfig,
44 ApdbReassignDiaSourceRecord,
45 ApdbReplica,
46 ApdbTableData,
47 ApdbTables,
48 ApdbUpdateRecord,
49 ApdbWithdrawDiaSourceRecord,
50 IncompatibleVersionError,
51 ReplicaChunk,
52 VersionTuple,
53)
54from .data_factory import (
55 makeForcedSourceCatalog,
56 makeObjectCatalog,
57 makeSourceCatalog,
58 makeSSObjectCatalog,
59 makeTimestampNow,
60)
61from .utils import TestCaseMixin
62
63if TYPE_CHECKING:
64 from ..pixelization import Pixelization
65
66
67def _make_region(xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
68 """Make a region to use in tests"""
69 pointing_v = UnitVector3d(*xyz)
70 fov = 0.0013 # radians
71 region = Circle(pointing_v, Angle(fov / 2))
72 return region
73
74
75@contextlib.contextmanager
77 schema_file: str,
78 drop_metadata: bool = False,
79 version: str | None = None,
80) -> Iterator[str]:
81 """Update schema definition and return name of the new schema file.
82
83 Parameters
84 ----------
85 schema_file : `str`
86 Path for the existing YAML file with APDB schema.
87 drop_metadata : `bool`
88 If `True` then remove metadata table from the list of tables.
89 version : `str` or `None`
90 If non-empty string then set schema version to this string, if empty
91 string then remove schema version from config, if `None` - don't change
92 the version in config.
93
94 Yields
95 ------
96 Path for the updated configuration file.
97 """
98 with open(schema_file) as yaml_stream:
99 schemas_list = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
100 # Edit YAML contents.
101 for schema in schemas_list:
102 # Optionally drop metadata table.
103 if drop_metadata:
104 schema["tables"] = [table for table in schema["tables"] if table["name"] != "metadata"]
105 if version is not None:
106 if version == "":
107 del schema["version"]
108 else:
109 schema["version"] = version
110
111 with TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
112 output_path = os.path.join(tmpdir, "schema.yaml")
113 with open(output_path, "w") as yaml_stream:
114 yaml.dump_all(schemas_list, stream=yaml_stream)
115 yield output_path
116
117
119 """Base class for Apdb tests that can be specialized for concrete
120 implementation.
121
122 This can only be used as a mixin class for a unittest.TestCase and it
123 calls various assert methods.
124 """
125
126 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
127
128 fsrc_requires_id_list = False
129 """Should be set to True if getDiaForcedSources requires object IDs"""
130
131 enable_replica: bool = False
132 """Set to true when support for replication is configured"""
133
134 use_mjd: bool = True
135 """If True then timestamp columns are MJD TAI."""
136
137 extra_chunk_columns = 1
138 """Number of additional columns in chunk tables."""
139
140 meta_row_count = 3
141 """Initial row count in metadata table."""
142
143 # number of columns as defined in tests/config/schema.yaml
144 table_column_count = {
145 ApdbTables.DiaObject: 7,
146 ApdbTables.DiaObjectLast: 5,
147 ApdbTables.DiaSource: 12,
148 ApdbTables.DiaForcedSource: 8,
149 ApdbTables.SSObject: 3,
150 }
151
152 @abstractmethod
153 def make_instance(self, **kwargs: Any) -> ApdbConfig:
154 """Make database instance and return configuration for it."""
155 raise NotImplementedError()
156
157 @abstractmethod
158 def getDiaObjects_table(self) -> ApdbTables:
159 """Return type of table returned from getDiaObjects method."""
160 raise NotImplementedError()
161
162 @abstractmethod
163 def pixelization(self, config: ApdbConfig) -> Pixelization:
164 """Return pixelization used by implementation."""
165 raise NotImplementedError()
166
167 def assert_catalog(self, catalog: Any, rows: int, table: ApdbTables) -> None:
168 """Validate catalog type and size
169
170 Parameters
171 ----------
172 catalog : `object`
173 Expected type of this is ``pandas.DataFrame``.
174 rows : `int`
175 Expected number of rows in a catalog.
176 table : `ApdbTables`
177 APDB table type.
178 """
179 self.assertIsInstance(catalog, pandas.DataFrame)
180 self.assertEqual(catalog.shape[0], rows)
181 self.assertEqual(catalog.shape[1], self.table_column_count[table])
182
183 def assert_table_data(self, catalog: Any, rows: int, table: ApdbTables) -> None:
184 """Validate catalog type and size
185
186 Parameters
187 ----------
188 catalog : `object`
189 Expected type of this is `ApdbTableData`.
190 rows : `int`
191 Expected number of rows in a catalog.
192 table : `ApdbTables`
193 APDB table type.
194 extra_columns : `int`
195 Count of additional columns expected in ``catalog``.
196 """
197 self.assertIsInstance(catalog, ApdbTableData)
198 n_rows = sum(1 for row in catalog.rows())
199 self.assertEqual(n_rows, rows)
200 # One extra column for replica chunk id
201 self.assertEqual(
202 len(catalog.column_names()), self.table_column_count[table] + self.extra_chunk_columns
203 )
204
205 def assert_column_types(self, catalog: Any, types: dict[str, felis.datamodel.DataType]) -> None:
206 column_defs = dict(catalog.column_defs())
207 for column, datatype in types.items():
208 self.assertEqual(column_defs[column], datatype)
209
210 def make_region(self, xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
211 """Make a region to use in tests"""
212 return _make_region(xyz)
213
214 def test_makeSchema(self) -> None:
215 """Test for making APDB schema."""
216 config = self.make_instance()
217 apdb = Apdb.from_config(config)
218
219 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
220 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
221 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
222 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
223 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
224 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSObject))
225 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSSource))
226 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject_To_Object_Match))
227
228 # Test from_uri factory method with the same config.
229 with tempfile.NamedTemporaryFile() as tmpfile:
230 config.save(tmpfile.name)
231 apdb = Apdb.from_uri(tmpfile.name)
232
233 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
234 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
235 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
236 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
237 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
238 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSObject))
239 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSSource))
240 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject_To_Object_Match))
241
242 def test_empty_gets(self) -> None:
243 """Test for getting data from empty database.
244
245 All get() methods should return empty results, only useful for
246 checking that code is not broken.
247 """
248 # use non-zero months for Forced/Source fetching
249 config = self.make_instance()
250 apdb = Apdb.from_config(config)
251
252 region = self.make_region()
253 visit_time = self.visit_time
254
255 res: pandas.DataFrame | None
256
257 # get objects by region
258 res = apdb.getDiaObjects(region)
259 self.assert_catalog(res, 0, self.getDiaObjects_table())
260
261 # get sources by region
262 res = apdb.getDiaSources(region, None, visit_time)
263 self.assert_catalog(res, 0, ApdbTables.DiaSource)
264
265 res = apdb.getDiaSources(region, [], visit_time)
266 self.assert_catalog(res, 0, ApdbTables.DiaSource)
267
268 # get sources by object ID, non-empty object list
269 res = apdb.getDiaSources(region, [1, 2, 3], visit_time)
270 self.assert_catalog(res, 0, ApdbTables.DiaSource)
271
272 # get forced sources by object ID, empty object list
273 res = apdb.getDiaForcedSources(region, [], visit_time)
274 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
275
276 # get sources by object ID, non-empty object list
277 res = apdb.getDiaForcedSources(region, [1, 2, 3], visit_time)
278 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
279
280 # data_factory's ccdVisitId generation corresponds to (1, 1)
281 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
282 self.assertFalse(res)
283
284 # get sources by region
285 if self.fsrc_requires_id_list:
286 with self.assertRaises(NotImplementedError):
287 apdb.getDiaForcedSources(region, None, visit_time)
288 else:
289 res = apdb.getDiaForcedSources(region, None, visit_time)
290 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
291
292 def test_empty_gets_0months(self) -> None:
293 """Test for getting data from empty database.
294
295 All get() methods should return empty DataFrame or None.
296 """
297 # set read_sources_months to 0 so that Forced/Sources are None
298 config = self.make_instance(read_sources_months=0, read_forced_sources_months=0)
299 apdb = Apdb.from_config(config)
300
301 region = self.make_region()
302 visit_time = self.visit_time
303
304 res: pandas.DataFrame | None
305
306 # get objects by region
307 res = apdb.getDiaObjects(region)
308 self.assert_catalog(res, 0, self.getDiaObjects_table())
309
310 # get sources by region
311 res = apdb.getDiaSources(region, None, visit_time)
312 self.assertIs(res, None)
313
314 # get sources by object ID, empty object list
315 res = apdb.getDiaSources(region, [], visit_time)
316 self.assertIs(res, None)
317
318 # get forced sources by object ID, empty object list
319 res = apdb.getDiaForcedSources(region, [], visit_time)
320 self.assertIs(res, None)
321
322 # Database is empty, no images exist.
323 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
324 self.assertFalse(res)
325
326 def test_storeObjects(self) -> None:
327 """Store and retrieve DiaObjects."""
328 # don't care about sources.
329 config = self.make_instance()
330 apdb = Apdb.from_config(config)
331
332 region = self.make_region()
333 visit_time = self.visit_time
334
335 # make catalog with Objects
336 catalog = makeObjectCatalog(region, 100, visit_time)
337
338 # store catalog
339 apdb.store(visit_time, catalog)
340
341 # read it back and check sizes
342 res = apdb.getDiaObjects(region)
343 self.assert_catalog(res, len(catalog), self.getDiaObjects_table())
344
345 # TODO: test apdb.contains with generic implementation from DM-41671
346
347 def test_storeObjects_empty(self) -> None:
348 """Test calling storeObject when there are no objects: see DM-43270."""
349 config = self.make_instance()
350 apdb = Apdb.from_config(config)
351 region = self.make_region()
352 visit_time = self.visit_time
353 # make catalog with no Objects
354 catalog = makeObjectCatalog(region, 0, visit_time)
355
356 with self.assertLogs("lsst.dax.apdb", level="DEBUG") as cm:
357 apdb.store(visit_time, catalog)
358 self.assertIn("No objects", "\n".join(cm.output))
359
360 def test_storeMovingObject(self) -> None:
361 """Store and retrieve DiaObject which changes its position."""
362 # don't care about sources.
363 config = self.make_instance()
364 apdb = Apdb.from_config(config)
365 pixelization = self.pixelization(config)
366
367 lon_deg, lat_deg = 0.0, 0.0
368 lonlat1 = LonLat.fromDegrees(lon_deg - 1.0, lat_deg)
369 lonlat2 = LonLat.fromDegrees(lon_deg + 1.0, lat_deg)
370 uv1 = UnitVector3d(lonlat1)
371 uv2 = UnitVector3d(lonlat2)
372
373 # Check that they fall into different pixels.
374 self.assertNotEqual(pixelization.pixel(uv1), pixelization.pixel(uv2))
375
376 # Store one object at two different positions.
377 visit_time1 = self.visit_time
378 catalog1 = makeObjectCatalog(lonlat1, 1, visit_time1)
379 apdb.store(visit_time1, catalog1)
380
381 visit_time2 = visit_time1 + astropy.time.TimeDelta(120.0, format="sec")
382 catalog1 = makeObjectCatalog(lonlat2, 1, visit_time2)
383 apdb.store(visit_time2, catalog1)
384
385 # Make region covering both points.
386 region = Circle(UnitVector3d(LonLat.fromDegrees(lon_deg, lat_deg)), Angle.fromDegrees(1.1))
387 self.assertTrue(region.contains(uv1))
388 self.assertTrue(region.contains(uv2))
389
390 # Read it back, must return the latest one.
391 res = apdb.getDiaObjects(region)
392 self.assert_catalog(res, 1, self.getDiaObjects_table())
393
394 def test_storeSources(self) -> None:
395 """Store and retrieve DiaSources."""
396 config = self.make_instance()
397 apdb = Apdb.from_config(config)
398
399 region = self.make_region()
400 visit_time = self.visit_time
401
402 # have to store Objects first
403 objects = makeObjectCatalog(region, 100, visit_time)
404 oids = list(objects["diaObjectId"])
405 sources = makeSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
406
407 # save the objects and sources
408 apdb.store(visit_time, objects, sources)
409
410 # read it back, no ID filtering
411 res = apdb.getDiaSources(region, None, visit_time)
412 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
413
414 # read it back and filter by ID
415 res = apdb.getDiaSources(region, oids, visit_time)
416 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
417
418 # read it back to get schema
419 res = apdb.getDiaSources(region, [], visit_time)
420 self.assert_catalog(res, 0, ApdbTables.DiaSource)
421
422 # test if a visit is present
423 # data_factory's ccdVisitId generation corresponds to (1, 1)
424 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
425 self.assertTrue(res)
426 # non-existent image
427 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
428 self.assertFalse(res)
429
430 def test_storeForcedSources(self) -> None:
431 """Store and retrieve DiaForcedSources."""
432 config = self.make_instance()
433 apdb = Apdb.from_config(config)
434
435 region = self.make_region()
436 visit_time = self.visit_time
437
438 # have to store Objects first
439 objects = makeObjectCatalog(region, 100, visit_time)
440 oids = list(objects["diaObjectId"])
441 catalog = makeForcedSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
442
443 apdb.store(visit_time, objects, forced_sources=catalog)
444
445 # read it back and check sizes
446 res = apdb.getDiaForcedSources(region, oids, visit_time)
447 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
448
449 # read it back to get schema
450 res = apdb.getDiaForcedSources(region, [], visit_time)
451 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
452
453 # data_factory's ccdVisitId generation corresponds to (1, 1)
454 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
455 self.assertTrue(res)
456 # non-existent image
457 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
458 self.assertFalse(res)
459
460 def test_timestamps(self) -> None:
461 """Check that timestamp return type is as expected."""
462 config = self.make_instance()
463 apdb = Apdb.from_config(config)
464
465 region = self.make_region()
466 visit_time = self.visit_time
467
468 # Cassandra has a millisecond precision, so subtract 1ms to allow for
469 # truncated returned values.
470 time_before = makeTimestampNow(self.use_mjd, -1)
471 objects = makeObjectCatalog(region, 100, visit_time)
472 oids = list(objects["diaObjectId"])
473 catalog = makeForcedSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
474 time_after = makeTimestampNow(self.use_mjd)
475
476 apdb.store(visit_time, objects, forced_sources=catalog)
477
478 # read it back and check sizes
479 res = apdb.getDiaForcedSources(region, oids, visit_time)
480 assert res is not None
481 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
482
483 time_processed_column = "timeProcessedMjdTai" if self.use_mjd else "time_processed"
484 self.assertIn(time_processed_column, res.dtypes)
485 dtype = res.dtypes[time_processed_column]
486 timestamp_type_name = "float64" if self.use_mjd else "datetime64[ns]"
487 self.assertEqual(dtype.name, timestamp_type_name)
488 # Verify that returned time is sensible.
489 self.assertTrue(all(time_before <= dt <= time_after for dt in res[time_processed_column]))
490
491 def test_getChunks(self) -> None:
492 """Store and retrieve replica chunks."""
493 # don't care about sources.
494 config = self.make_instance()
495 apdb = Apdb.from_config(config)
496 apdb_replica = ApdbReplica.from_config(config)
497 visit_time = self.visit_time
498
499 region1 = self.make_region((1.0, 1.0, -1.0))
500 region2 = self.make_region((-1.0, -1.0, -1.0))
501 nobj = 100
502 objects1 = makeObjectCatalog(region1, nobj, visit_time)
503 objects2 = makeObjectCatalog(region2, nobj, visit_time, start_id=nobj * 2)
504
505 # With the default 10 minutes replica chunk window we should have 4
506 # records.
507 visits = [
508 (astropy.time.Time("2021-01-01T00:01:00", format="isot", scale="tai"), objects1),
509 (astropy.time.Time("2021-01-01T00:02:00", format="isot", scale="tai"), objects2),
510 (astropy.time.Time("2021-01-01T00:11:00", format="isot", scale="tai"), objects1),
511 (astropy.time.Time("2021-01-01T00:12:00", format="isot", scale="tai"), objects2),
512 (astropy.time.Time("2021-01-01T00:45:00", format="isot", scale="tai"), objects1),
513 (astropy.time.Time("2021-01-01T00:46:00", format="isot", scale="tai"), objects2),
514 (astropy.time.Time("2021-03-01T00:01:00", format="isot", scale="tai"), objects1),
515 (astropy.time.Time("2021-03-01T00:02:00", format="isot", scale="tai"), objects2),
516 ]
517
518 start_id = 0
519 for visit_time, objects in visits:
520 sources = makeSourceCatalog(objects, visit_time, start_id=start_id, use_mjd=self.use_mjd)
521 fsources = makeForcedSourceCatalog(objects, visit_time, visit=start_id, use_mjd=self.use_mjd)
522 apdb.store(visit_time, objects, sources, fsources)
523 start_id += nobj
524
525 replica_chunks = apdb_replica.getReplicaChunks()
526 if not self.enable_replica:
527 self.assertIsNone(replica_chunks)
528
529 with self.assertRaisesRegex(ValueError, "APDB is not configured for replication"):
530 apdb_replica.getTableDataChunks(ApdbTables.DiaObject, [])
531
532 else:
533 assert replica_chunks is not None
534 self.assertEqual(len(replica_chunks), 4)
535
536 with self.assertRaisesRegex(ValueError, "does not support replica chunks"):
537 apdb_replica.getTableDataChunks(ApdbTables.SSObject, [])
538
539 def _check_chunks(replica_chunks: list[ReplicaChunk], n_records: int | None = None) -> None:
540 if n_records is None:
541 n_records = len(replica_chunks) * nobj
542 res = apdb_replica.getTableDataChunks(
543 ApdbTables.DiaObject, (chunk.id for chunk in replica_chunks)
544 )
545 self.assert_table_data(res, n_records, ApdbTables.DiaObject)
546 validityStartColumn = "validityStartMjdTai" if self.use_mjd else "validityStart"
547 validityStartType = (
548 felis.datamodel.DataType.double if self.use_mjd else felis.datamodel.DataType.timestamp
549 )
551 res,
552 {
553 "apdb_replica_chunk": felis.datamodel.DataType.long,
554 "diaObjectId": felis.datamodel.DataType.long,
555 validityStartColumn: validityStartType,
556 "ra": felis.datamodel.DataType.double,
557 "dec": felis.datamodel.DataType.double,
558 "parallax": felis.datamodel.DataType.float,
559 "nDiaSources": felis.datamodel.DataType.int,
560 },
561 )
562
563 res = apdb_replica.getTableDataChunks(
564 ApdbTables.DiaSource, (chunk.id for chunk in replica_chunks)
565 )
566 self.assert_table_data(res, n_records, ApdbTables.DiaSource)
568 res,
569 {
570 "apdb_replica_chunk": felis.datamodel.DataType.long,
571 "diaSourceId": felis.datamodel.DataType.long,
572 "visit": felis.datamodel.DataType.long,
573 "detector": felis.datamodel.DataType.short,
574 },
575 )
576
577 res = apdb_replica.getTableDataChunks(
578 ApdbTables.DiaForcedSource, (chunk.id for chunk in replica_chunks)
579 )
580 self.assert_table_data(res, n_records, ApdbTables.DiaForcedSource)
582 res,
583 {
584 "apdb_replica_chunk": felis.datamodel.DataType.long,
585 "diaObjectId": felis.datamodel.DataType.long,
586 "visit": felis.datamodel.DataType.long,
587 "detector": felis.datamodel.DataType.short,
588 },
589 )
590
591 # read it back and check sizes
592 _check_chunks(replica_chunks, 800)
593 _check_chunks(replica_chunks[1:], 600)
594 _check_chunks(replica_chunks[1:-1], 400)
595 _check_chunks(replica_chunks[2:3], 200)
596 _check_chunks([])
597
598 # try to remove some of those
599 deleted_chunks = replica_chunks[:1]
600 apdb_replica.deleteReplicaChunks(chunk.id for chunk in deleted_chunks)
601
602 # All queries on deleted ids should return empty set.
603 _check_chunks(deleted_chunks, 0)
604
605 replica_chunks = apdb_replica.getReplicaChunks()
606 assert replica_chunks is not None
607 self.assertEqual(len(replica_chunks), 3)
608
609 _check_chunks(replica_chunks, 600)
610
611 def test_storeSSObjects(self) -> None:
612 """Store and retrieve SSObjects."""
613 # don't care about sources.
614 config = self.make_instance()
615 apdb = Apdb.from_config(config)
616
617 # make catalog with SSObjects
618 catalog = makeSSObjectCatalog(100, flags=1)
619
620 # store catalog
621 apdb.storeSSObjects(catalog)
622
623 # read it back and check sizes
624 res = apdb.getSSObjects()
625 self.assert_catalog(res, len(catalog), ApdbTables.SSObject)
626
627 # check that override works, make catalog with SSObjects, ID = 51-150
628 catalog = makeSSObjectCatalog(100, 51, flags=2)
629 apdb.storeSSObjects(catalog)
630 res = apdb.getSSObjects()
631 self.assert_catalog(res, 150, ApdbTables.SSObject)
632 self.assertEqual(len(res[res["flags"] == 1]), 50)
633 self.assertEqual(len(res[res["flags"] == 2]), 100)
634
635 def test_reassignObjects(self) -> None:
636 """Reassign DiaObjects."""
637 # don't care about sources.
638 config = self.make_instance()
639 apdb = Apdb.from_config(config)
640
641 region = self.make_region()
642 visit_time = self.visit_time
643 objects = makeObjectCatalog(region, 100, visit_time)
644 oids = list(objects["diaObjectId"])
645 sources = makeSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
646 apdb.store(visit_time, objects, sources)
647
648 catalog = makeSSObjectCatalog(100)
649 apdb.storeSSObjects(catalog)
650
651 # read it back and filter by ID
652 res = apdb.getDiaSources(region, oids, visit_time)
653 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
654
655 apdb.reassignDiaSources({1: 1, 2: 2, 5: 5})
656 res = apdb.getDiaSources(region, oids, visit_time)
657 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
658
659 with self.assertRaisesRegex(ValueError, r"do not exist.*\D1000"):
660 apdb.reassignDiaSources(
661 {
662 1000: 1,
663 7: 3,
664 }
665 )
666 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
667
668 def test_storeUpdateRecord(self) -> None:
669 """Test _storeUpdateRecord() method."""
670 config = self.make_instance()
671 apdb = Apdb.from_config(config)
672
673 # Times are totally arbitrary.
674 update_time_ns1 = 2_000_000_000_000_000_000
675 update_time_ns2 = 2_000_000_001_000_000_000
676 records = [
678 update_time_ns=update_time_ns1,
679 update_order=0,
680 diaSourceId=1,
681 diaObjectId=321,
682 ssObjectId=1,
683 ssObjectReassocTimeMjdTai=60000.0,
684 ra=45.0,
685 dec=-45.0,
686 ),
688 update_time_ns=update_time_ns1,
689 update_order=1,
690 diaSourceId=123456,
691 diaObjectId=321,
692 timeWithdrawnMjdTai=61000.0,
693 ra=45.0,
694 dec=-45.0,
695 ),
697 update_time_ns=update_time_ns1,
698 update_order=3,
699 diaSourceId=2,
700 diaObjectId=3,
701 ssObjectId=3,
702 ssObjectReassocTimeMjdTai=60000.0,
703 ra=45.0,
704 dec=-45.0,
705 ),
707 update_time_ns=update_time_ns2,
708 update_order=0,
709 diaSourceId=123456,
710 diaObjectId=321,
711 timeWithdrawnMjdTai=61000.0,
712 ra=45.0,
713 dec=-45.0,
714 ),
715 ]
716
717 update_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
718 chunk = ReplicaChunk.make_replica_chunk(update_time, 600)
719
720 if not self.enable_replica:
721 with self.assertRaises(TypeError):
722 self.store_update_records(apdb, records, chunk)
723 else:
724 self.store_update_records(apdb, records, chunk)
725
726 apdb_replica = ApdbReplica.from_config(config)
727 records_returned = apdb_replica.getUpdateRecordChunks([chunk.id])
728
729 # Input records are ordered, output will be ordered too.
730 self.assertEqual(records_returned, records)
731
732 @abstractmethod
733 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None:
734 """Store update records in database, must be overriden in subclass."""
735 raise NotImplementedError()
736
737 def test_midpointMjdTai_src(self) -> None:
738 """Test for time filtering of DiaSources."""
739 config = self.make_instance()
740 apdb = Apdb.from_config(config)
741
742 region = self.make_region()
743 # 2021-01-01 plus 360 days is 2021-12-27
744 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
745 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
746 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
747 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
748 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
749 one_sec = astropy.time.TimeDelta(1.0, format="sec")
750
751 objects = makeObjectCatalog(region, 100, visit_time0)
752 oids = list(objects["diaObjectId"])
753 sources = makeSourceCatalog(objects, src_time1, 0, use_mjd=self.use_mjd)
754 apdb.store(src_time1, objects, sources)
755
756 sources = makeSourceCatalog(objects, src_time2, 100, use_mjd=self.use_mjd)
757 apdb.store(src_time2, objects, sources)
758
759 # reading at time of last save should read all
760 res = apdb.getDiaSources(region, oids, src_time2)
761 self.assert_catalog(res, 200, ApdbTables.DiaSource)
762
763 # one second before 12 months
764 res = apdb.getDiaSources(region, oids, visit_time0)
765 self.assert_catalog(res, 200, ApdbTables.DiaSource)
766
767 # reading at later time of last save should only read a subset
768 res = apdb.getDiaSources(region, oids, visit_time1)
769 self.assert_catalog(res, 100, ApdbTables.DiaSource)
770
771 # reading at later time of last save should only read a subset
772 res = apdb.getDiaSources(region, oids, visit_time2)
773 self.assert_catalog(res, 0, ApdbTables.DiaSource)
774
775 # Use explicit start time argument instead of 12 month window, visit
776 # time does not matter in this case, set it to before all data.
777 res = apdb.getDiaSources(region, oids, src_time1 - one_sec, src_time1 - one_sec)
778 self.assert_catalog(res, 200, ApdbTables.DiaSource)
779
780 res = apdb.getDiaSources(region, oids, src_time1 - one_sec, src_time2 - one_sec)
781 self.assert_catalog(res, 100, ApdbTables.DiaSource)
782
783 res = apdb.getDiaSources(region, oids, src_time1 - one_sec, src_time2 + one_sec)
784 self.assert_catalog(res, 0, ApdbTables.DiaSource)
785
786 def test_midpointMjdTai_fsrc(self) -> None:
787 """Test for time filtering of DiaForcedSources."""
788 config = self.make_instance()
789 apdb = Apdb.from_config(config)
790
791 region = self.make_region()
792 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
793 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
794 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
795 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
796 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
797 one_sec = astropy.time.TimeDelta(1.0, format="sec")
798
799 objects = makeObjectCatalog(region, 100, visit_time0)
800 oids = list(objects["diaObjectId"])
801 sources = makeForcedSourceCatalog(objects, src_time1, 1, use_mjd=self.use_mjd)
802 apdb.store(src_time1, objects, forced_sources=sources)
803
804 sources = makeForcedSourceCatalog(objects, src_time2, 2, use_mjd=self.use_mjd)
805 apdb.store(src_time2, objects, forced_sources=sources)
806
807 # reading at time of last save should read all
808 res = apdb.getDiaForcedSources(region, oids, src_time2)
809 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
810
811 # one second before 12 months
812 res = apdb.getDiaForcedSources(region, oids, visit_time0)
813 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
814
815 # reading at later time of last save should only read a subset
816 res = apdb.getDiaForcedSources(region, oids, visit_time1)
817 self.assert_catalog(res, 100, ApdbTables.DiaForcedSource)
818
819 # reading at later time of last save should only read a subset
820 res = apdb.getDiaForcedSources(region, oids, visit_time2)
821 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
822
823 # Use explicit start time argument instead of 12 month window, visit
824 # time does not matter in this case, set it to before all data.
825 res = apdb.getDiaForcedSources(region, oids, src_time1 - one_sec, src_time1 - one_sec)
826 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
827
828 res = apdb.getDiaForcedSources(region, oids, src_time1 - one_sec, src_time2 - one_sec)
829 self.assert_catalog(res, 100, ApdbTables.DiaForcedSource)
830
831 res = apdb.getDiaForcedSources(region, oids, src_time1 - one_sec, src_time2 + one_sec)
832 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
833
834 def test_metadata(self) -> None:
835 """Simple test for writing/reading metadata table"""
836 config = self.make_instance()
837 apdb = Apdb.from_config(config)
838 metadata = apdb.metadata
839
840 # APDB should write two or three metadata items with version numbers
841 # and a frozen JSON config.
842 self.assertFalse(metadata.empty())
843 self.assertEqual(len(list(metadata.items())), self.meta_row_count)
844
845 metadata.set("meta", "data")
846 metadata.set("data", "meta")
847
848 self.assertFalse(metadata.empty())
849 self.assertTrue(set(metadata.items()) >= {("meta", "data"), ("data", "meta")})
850
851 with self.assertRaisesRegex(KeyError, "Metadata key 'meta' already exists"):
852 metadata.set("meta", "data1")
853
854 metadata.set("meta", "data2", force=True)
855 self.assertTrue(set(metadata.items()) >= {("meta", "data2"), ("data", "meta")})
856
857 self.assertTrue(metadata.delete("meta"))
858 self.assertIsNone(metadata.get("meta"))
859 self.assertFalse(metadata.delete("meta"))
860
861 self.assertEqual(metadata.get("data"), "meta")
862 self.assertEqual(metadata.get("meta", "meta"), "meta")
863
864 def test_schemaVersionFromYaml(self) -> None:
865 """Check version number handling for reading schema from YAML."""
866 config = self.make_instance()
867 default_schema = config.schema_file
868 apdb = Apdb.from_config(config)
869 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
870
871 with update_schema_yaml(default_schema, version="") as schema_file:
872 config = self.make_instance(schema_file=schema_file)
873 apdb = Apdb.from_config(config)
874 self.assertEqual(
875 apdb._schema.schemaVersion(), # type: ignore[attr-defined]
876 VersionTuple(0, 1, 0),
877 )
878
879 with update_schema_yaml(default_schema, version="99.0.0") as schema_file:
880 config = self.make_instance(schema_file=schema_file)
881 apdb = Apdb.from_config(config)
882 self.assertEqual(
883 apdb._schema.schemaVersion(), # type: ignore[attr-defined]
884 VersionTuple(99, 0, 0),
885 )
886
887 def test_config_freeze(self) -> None:
888 """Test that some config fields are correctly frozen in database."""
889 config = self.make_instance()
890
891 # `enable_replica` is the only parameter that is frozen in all
892 # implementations.
893 config.enable_replica = not self.enable_replica
894 apdb = Apdb.from_config(config)
895 frozen_config = apdb.getConfig()
896 self.assertEqual(frozen_config.enable_replica, self.enable_replica)
897
898
900 """Base class for unit tests that verify how schema changes work."""
901
902 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
903
904 @abstractmethod
905 def make_instance(self, **kwargs: Any) -> ApdbConfig:
906 """Make config class instance used in all tests.
907
908 This method should return configuration that point to the identical
909 database instance on each call (i.e. ``db_url`` must be the same,
910 which also means for sqlite it has to use on-disk storage).
911 """
912 raise NotImplementedError()
913
914 def make_region(self, xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
915 """Make a region to use in tests"""
916 return _make_region(xyz)
917
918 def test_schema_add_replica(self) -> None:
919 """Check that new code can work with old schema without replica
920 tables.
921 """
922 # Make schema without replica tables.
923 config = self.make_instance(enable_replica=False)
924 apdb = Apdb.from_config(config)
925 apdb_replica = ApdbReplica.from_config(config)
926
927 # Make APDB instance configured for replication.
928 config.enable_replica = True
929 apdb = Apdb.from_config(config)
930
931 # Try to insert something, should work OK.
932 region = self.make_region()
933 visit_time = self.visit_time
934
935 # have to store Objects first
936 objects = makeObjectCatalog(region, 100, visit_time)
937 sources = makeSourceCatalog(objects, visit_time)
938 fsources = makeForcedSourceCatalog(objects, visit_time)
939 apdb.store(visit_time, objects, sources, fsources)
940
941 # There should be no replica chunks.
942 replica_chunks = apdb_replica.getReplicaChunks()
943 self.assertIsNone(replica_chunks)
944
945 def test_schemaVersionCheck(self) -> None:
946 """Check version number compatibility."""
947 config = self.make_instance()
948 apdb = Apdb.from_config(config)
949
950 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
951
952 # Claim that schema version is now 99.0.0, must raise an exception.
953 with update_schema_yaml(config.schema_file, version="99.0.0") as schema_file:
954 config.schema_file = schema_file
955 with self.assertRaises(IncompatibleVersionError):
956 apdb = Apdb.from_config(config)
957 # Version is checked only when we try to do connect.
958 apdb.metadata.items()
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:905
Region make_region(self, tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:914
None assert_catalog(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:167
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:153
Pixelization pixelization(self, ApdbConfig config)
Definition _apdb.py:163
Region make_region(self, tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:210
None assert_column_types(self, Any catalog, dict[str, felis.datamodel.DataType] types)
Definition _apdb.py:205
None assert_table_data(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:183
None store_update_records(self, Apdb apdb, list[ApdbUpdateRecord] records, ReplicaChunk chunk)
Definition _apdb.py:733
ApdbTables getDiaObjects_table(self)
Definition _apdb.py:158
Angle represents an angle in radians.
Definition Angle.h:50
Circle is a circular region on the unit sphere that contains its boundary.
Definition Circle.h:54
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Region _make_region(tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:67
Iterator[str] update_schema_yaml(str schema_file, bool drop_metadata=False, str|None version=None)
Definition _apdb.py:80