LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
_apdb.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbSchemaUpdateTest", "ApdbTest", "update_schema_yaml"]
25
26import contextlib
27import os
28import tempfile
29from abc import ABC, abstractmethod
30from collections.abc import Iterator
31from tempfile import TemporaryDirectory
32from typing import TYPE_CHECKING, Any
33
34import astropy.time
35import felis.datamodel
36import pandas
37import yaml
38
39from lsst.sphgeom import Angle, Circle, LonLat, Region, UnitVector3d
40
41from .. import (
42 Apdb,
43 ApdbConfig,
44 ApdbReplica,
45 ApdbTableData,
46 ApdbTables,
47 IncompatibleVersionError,
48 ReplicaChunk,
49 VersionTuple,
50)
51from .data_factory import (
52 makeForcedSourceCatalog,
53 makeObjectCatalog,
54 makeSourceCatalog,
55 makeSSObjectCatalog,
56 makeTimestampNow,
57)
58from .utils import TestCaseMixin
59
60if TYPE_CHECKING:
61 from ..pixelization import Pixelization
62
63
64def _make_region(xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
65 """Make a region to use in tests"""
66 pointing_v = UnitVector3d(*xyz)
67 fov = 0.0013 # radians
68 region = Circle(pointing_v, Angle(fov / 2))
69 return region
70
71
72@contextlib.contextmanager
74 schema_file: str,
75 drop_metadata: bool = False,
76 version: str | None = None,
77) -> Iterator[str]:
78 """Update schema definition and return name of the new schema file.
79
80 Parameters
81 ----------
82 schema_file : `str`
83 Path for the existing YAML file with APDB schema.
84 drop_metadata : `bool`
85 If `True` then remove metadata table from the list of tables.
86 version : `str` or `None`
87 If non-empty string then set schema version to this string, if empty
88 string then remove schema version from config, if `None` - don't change
89 the version in config.
90
91 Yields
92 ------
93 Path for the updated configuration file.
94 """
95 with open(schema_file) as yaml_stream:
96 schemas_list = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
97 # Edit YAML contents.
98 for schema in schemas_list:
99 # Optionally drop metadata table.
100 if drop_metadata:
101 schema["tables"] = [table for table in schema["tables"] if table["name"] != "metadata"]
102 if version is not None:
103 if version == "":
104 del schema["version"]
105 else:
106 schema["version"] = version
107
108 with TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
109 output_path = os.path.join(tmpdir, "schema.yaml")
110 with open(output_path, "w") as yaml_stream:
111 yaml.dump_all(schemas_list, stream=yaml_stream)
112 yield output_path
113
114
116 """Base class for Apdb tests that can be specialized for concrete
117 implementation.
118
119 This can only be used as a mixin class for a unittest.TestCase and it
120 calls various assert methods.
121 """
122
123 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
124
125 fsrc_requires_id_list = False
126 """Should be set to True if getDiaForcedSources requires object IDs"""
127
128 enable_replica: bool = False
129 """Set to true when support for replication is configured"""
130
131 use_mjd: bool = True
132 """If True then timestamp columns are MJD TAI."""
133
134 extra_chunk_columns = 1
135 """Number of additional columns in chunk tables."""
136
137 meta_row_count = 3
138 """Initial row count in metadata table."""
139
140 # number of columns as defined in tests/config/schema.yaml
141 table_column_count = {
142 ApdbTables.DiaObject: 7,
143 ApdbTables.DiaObjectLast: 4,
144 ApdbTables.DiaSource: 12,
145 ApdbTables.DiaForcedSource: 8,
146 ApdbTables.SSObject: 3,
147 }
148
149 @abstractmethod
150 def make_instance(self, **kwargs: Any) -> ApdbConfig:
151 """Make database instance and return configuration for it."""
152 raise NotImplementedError()
153
154 @abstractmethod
155 def getDiaObjects_table(self) -> ApdbTables:
156 """Return type of table returned from getDiaObjects method."""
157 raise NotImplementedError()
158
159 @abstractmethod
160 def pixelization(self, config: ApdbConfig) -> Pixelization:
161 """Return pixelization used by implementation."""
162 raise NotImplementedError()
163
164 def assert_catalog(self, catalog: Any, rows: int, table: ApdbTables) -> None:
165 """Validate catalog type and size
166
167 Parameters
168 ----------
169 catalog : `object`
170 Expected type of this is ``pandas.DataFrame``.
171 rows : `int`
172 Expected number of rows in a catalog.
173 table : `ApdbTables`
174 APDB table type.
175 """
176 self.assertIsInstance(catalog, pandas.DataFrame)
177 self.assertEqual(catalog.shape[0], rows)
178 self.assertEqual(catalog.shape[1], self.table_column_count[table])
179
180 def assert_table_data(self, catalog: Any, rows: int, table: ApdbTables) -> None:
181 """Validate catalog type and size
182
183 Parameters
184 ----------
185 catalog : `object`
186 Expected type of this is `ApdbTableData`.
187 rows : `int`
188 Expected number of rows in a catalog.
189 table : `ApdbTables`
190 APDB table type.
191 extra_columns : `int`
192 Count of additional columns expected in ``catalog``.
193 """
194 self.assertIsInstance(catalog, ApdbTableData)
195 n_rows = sum(1 for row in catalog.rows())
196 self.assertEqual(n_rows, rows)
197 # One extra column for replica chunk id
198 self.assertEqual(
199 len(catalog.column_names()), self.table_column_count[table] + self.extra_chunk_columns
200 )
201
202 def assert_column_types(self, catalog: Any, types: dict[str, felis.datamodel.DataType]) -> None:
203 column_defs = dict(catalog.column_defs())
204 for column, datatype in types.items():
205 self.assertEqual(column_defs[column], datatype)
206
207 def make_region(self, xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
208 """Make a region to use in tests"""
209 return _make_region(xyz)
210
211 def test_makeSchema(self) -> None:
212 """Test for making APDB schema."""
213 config = self.make_instance()
214 apdb = Apdb.from_config(config)
215
216 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
217 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
218 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
219 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
220 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
221 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSObject))
222 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSSource))
223 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject_To_Object_Match))
224
225 # Test from_uri factory method with the same config.
226 with tempfile.NamedTemporaryFile() as tmpfile:
227 config.save(tmpfile.name)
228 apdb = Apdb.from_uri(tmpfile.name)
229
230 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject))
231 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObjectLast))
232 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaSource))
233 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaForcedSource))
234 self.assertIsNotNone(apdb.tableDef(ApdbTables.metadata))
235 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSObject))
236 self.assertIsNotNone(apdb.tableDef(ApdbTables.SSSource))
237 self.assertIsNotNone(apdb.tableDef(ApdbTables.DiaObject_To_Object_Match))
238
239 def test_empty_gets(self) -> None:
240 """Test for getting data from empty database.
241
242 All get() methods should return empty results, only useful for
243 checking that code is not broken.
244 """
245 # use non-zero months for Forced/Source fetching
246 config = self.make_instance()
247 apdb = Apdb.from_config(config)
248
249 region = self.make_region()
250 visit_time = self.visit_time
251
252 res: pandas.DataFrame | None
253
254 # get objects by region
255 res = apdb.getDiaObjects(region)
256 self.assert_catalog(res, 0, self.getDiaObjects_table())
257
258 # get sources by region
259 res = apdb.getDiaSources(region, None, visit_time)
260 self.assert_catalog(res, 0, ApdbTables.DiaSource)
261
262 res = apdb.getDiaSources(region, [], visit_time)
263 self.assert_catalog(res, 0, ApdbTables.DiaSource)
264
265 # get sources by object ID, non-empty object list
266 res = apdb.getDiaSources(region, [1, 2, 3], visit_time)
267 self.assert_catalog(res, 0, ApdbTables.DiaSource)
268
269 # get forced sources by object ID, empty object list
270 res = apdb.getDiaForcedSources(region, [], visit_time)
271 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
272
273 # get sources by object ID, non-empty object list
274 res = apdb.getDiaForcedSources(region, [1, 2, 3], visit_time)
275 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
276
277 # data_factory's ccdVisitId generation corresponds to (1, 1)
278 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
279 self.assertFalse(res)
280
281 # get sources by region
282 if self.fsrc_requires_id_list:
283 with self.assertRaises(NotImplementedError):
284 apdb.getDiaForcedSources(region, None, visit_time)
285 else:
286 res = apdb.getDiaForcedSources(region, None, visit_time)
287 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
288
289 def test_empty_gets_0months(self) -> None:
290 """Test for getting data from empty database.
291
292 All get() methods should return empty DataFrame or None.
293 """
294 # set read_sources_months to 0 so that Forced/Sources are None
295 config = self.make_instance(read_sources_months=0, read_forced_sources_months=0)
296 apdb = Apdb.from_config(config)
297
298 region = self.make_region()
299 visit_time = self.visit_time
300
301 res: pandas.DataFrame | None
302
303 # get objects by region
304 res = apdb.getDiaObjects(region)
305 self.assert_catalog(res, 0, self.getDiaObjects_table())
306
307 # get sources by region
308 res = apdb.getDiaSources(region, None, visit_time)
309 self.assertIs(res, None)
310
311 # get sources by object ID, empty object list
312 res = apdb.getDiaSources(region, [], visit_time)
313 self.assertIs(res, None)
314
315 # get forced sources by object ID, empty object list
316 res = apdb.getDiaForcedSources(region, [], visit_time)
317 self.assertIs(res, None)
318
319 # Database is empty, no images exist.
320 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
321 self.assertFalse(res)
322
323 def test_storeObjects(self) -> None:
324 """Store and retrieve DiaObjects."""
325 # don't care about sources.
326 config = self.make_instance()
327 apdb = Apdb.from_config(config)
328
329 region = self.make_region()
330 visit_time = self.visit_time
331
332 # make catalog with Objects
333 catalog = makeObjectCatalog(region, 100, visit_time)
334
335 # store catalog
336 apdb.store(visit_time, catalog)
337
338 # read it back and check sizes
339 res = apdb.getDiaObjects(region)
340 self.assert_catalog(res, len(catalog), self.getDiaObjects_table())
341
342 # TODO: test apdb.contains with generic implementation from DM-41671
343
344 def test_storeObjects_empty(self) -> None:
345 """Test calling storeObject when there are no objects: see DM-43270."""
346 config = self.make_instance()
347 apdb = Apdb.from_config(config)
348 region = self.make_region()
349 visit_time = self.visit_time
350 # make catalog with no Objects
351 catalog = makeObjectCatalog(region, 0, visit_time)
352
353 with self.assertLogs("lsst.dax.apdb", level="DEBUG") as cm:
354 apdb.store(visit_time, catalog)
355 self.assertIn("No objects", "\n".join(cm.output))
356
357 def test_storeMovingObject(self) -> None:
358 """Store and retrieve DiaObject which changes its position."""
359 # don't care about sources.
360 config = self.make_instance()
361 apdb = Apdb.from_config(config)
362 pixelization = self.pixelization(config)
363
364 lon_deg, lat_deg = 0.0, 0.0
365 lonlat1 = LonLat.fromDegrees(lon_deg - 1.0, lat_deg)
366 lonlat2 = LonLat.fromDegrees(lon_deg + 1.0, lat_deg)
367 uv1 = UnitVector3d(lonlat1)
368 uv2 = UnitVector3d(lonlat2)
369
370 # Check that they fall into different pixels.
371 self.assertNotEqual(pixelization.pixel(uv1), pixelization.pixel(uv2))
372
373 # Store one object at two different positions.
374 visit_time1 = self.visit_time
375 catalog1 = makeObjectCatalog(lonlat1, 1, visit_time1)
376 apdb.store(visit_time1, catalog1)
377
378 visit_time2 = visit_time1 + astropy.time.TimeDelta(120.0, format="sec")
379 catalog1 = makeObjectCatalog(lonlat2, 1, visit_time2)
380 apdb.store(visit_time2, catalog1)
381
382 # Make region covering both points.
383 region = Circle(UnitVector3d(LonLat.fromDegrees(lon_deg, lat_deg)), Angle.fromDegrees(1.1))
384 self.assertTrue(region.contains(uv1))
385 self.assertTrue(region.contains(uv2))
386
387 # Read it back, must return the latest one.
388 res = apdb.getDiaObjects(region)
389 self.assert_catalog(res, 1, self.getDiaObjects_table())
390
391 def test_storeSources(self) -> None:
392 """Store and retrieve DiaSources."""
393 config = self.make_instance()
394 apdb = Apdb.from_config(config)
395
396 region = self.make_region()
397 visit_time = self.visit_time
398
399 # have to store Objects first
400 objects = makeObjectCatalog(region, 100, visit_time)
401 oids = list(objects["diaObjectId"])
402 sources = makeSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
403
404 # save the objects and sources
405 apdb.store(visit_time, objects, sources)
406
407 # read it back, no ID filtering
408 res = apdb.getDiaSources(region, None, visit_time)
409 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
410
411 # read it back and filter by ID
412 res = apdb.getDiaSources(region, oids, visit_time)
413 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
414
415 # read it back to get schema
416 res = apdb.getDiaSources(region, [], visit_time)
417 self.assert_catalog(res, 0, ApdbTables.DiaSource)
418
419 # test if a visit is present
420 # data_factory's ccdVisitId generation corresponds to (1, 1)
421 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
422 self.assertTrue(res)
423 # non-existent image
424 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
425 self.assertFalse(res)
426
427 def test_storeForcedSources(self) -> None:
428 """Store and retrieve DiaForcedSources."""
429 config = self.make_instance()
430 apdb = Apdb.from_config(config)
431
432 region = self.make_region()
433 visit_time = self.visit_time
434
435 # have to store Objects first
436 objects = makeObjectCatalog(region, 100, visit_time)
437 oids = list(objects["diaObjectId"])
438 catalog = makeForcedSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
439
440 apdb.store(visit_time, objects, forced_sources=catalog)
441
442 # read it back and check sizes
443 res = apdb.getDiaForcedSources(region, oids, visit_time)
444 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
445
446 # read it back to get schema
447 res = apdb.getDiaForcedSources(region, [], visit_time)
448 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
449
450 # data_factory's ccdVisitId generation corresponds to (1, 1)
451 res = apdb.containsVisitDetector(visit=1, detector=1, region=region, visit_time=visit_time)
452 self.assertTrue(res)
453 # non-existent image
454 res = apdb.containsVisitDetector(visit=2, detector=42, region=region, visit_time=visit_time)
455 self.assertFalse(res)
456
457 def test_timestamps(self) -> None:
458 """Check that timestamp return type is as expected."""
459 config = self.make_instance()
460 apdb = Apdb.from_config(config)
461
462 region = self.make_region()
463 visit_time = self.visit_time
464
465 # Cassandra has a millisecond precision, so subtract 1ms to allow for
466 # truncated returned values.
467 time_before = makeTimestampNow(self.use_mjd, -1)
468 objects = makeObjectCatalog(region, 100, visit_time)
469 oids = list(objects["diaObjectId"])
470 catalog = makeForcedSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
471 time_after = makeTimestampNow(self.use_mjd)
472
473 apdb.store(visit_time, objects, forced_sources=catalog)
474
475 # read it back and check sizes
476 res = apdb.getDiaForcedSources(region, oids, visit_time)
477 assert res is not None
478 self.assert_catalog(res, len(catalog), ApdbTables.DiaForcedSource)
479
480 time_processed_column = "timeProcessedMjdTai" if self.use_mjd else "time_processed"
481 self.assertIn(time_processed_column, res.dtypes)
482 dtype = res.dtypes[time_processed_column]
483 timestamp_type_name = "float64" if self.use_mjd else "datetime64[ns]"
484 self.assertEqual(dtype.name, timestamp_type_name)
485 # Verify that returned time is sensible.
486 self.assertTrue(all(time_before <= dt <= time_after for dt in res[time_processed_column]))
487
488 def test_getChunks(self) -> None:
489 """Store and retrieve replica chunks."""
490 # don't care about sources.
491 config = self.make_instance()
492 apdb = Apdb.from_config(config)
493 apdb_replica = ApdbReplica.from_config(config)
494 visit_time = self.visit_time
495
496 region1 = self.make_region((1.0, 1.0, -1.0))
497 region2 = self.make_region((-1.0, -1.0, -1.0))
498 nobj = 100
499 objects1 = makeObjectCatalog(region1, nobj, visit_time)
500 objects2 = makeObjectCatalog(region2, nobj, visit_time, start_id=nobj * 2)
501
502 # With the default 10 minutes replica chunk window we should have 4
503 # records.
504 visits = [
505 (astropy.time.Time("2021-01-01T00:01:00", format="isot", scale="tai"), objects1),
506 (astropy.time.Time("2021-01-01T00:02:00", format="isot", scale="tai"), objects2),
507 (astropy.time.Time("2021-01-01T00:11:00", format="isot", scale="tai"), objects1),
508 (astropy.time.Time("2021-01-01T00:12:00", format="isot", scale="tai"), objects2),
509 (astropy.time.Time("2021-01-01T00:45:00", format="isot", scale="tai"), objects1),
510 (astropy.time.Time("2021-01-01T00:46:00", format="isot", scale="tai"), objects2),
511 (astropy.time.Time("2021-03-01T00:01:00", format="isot", scale="tai"), objects1),
512 (astropy.time.Time("2021-03-01T00:02:00", format="isot", scale="tai"), objects2),
513 ]
514
515 start_id = 0
516 for visit_time, objects in visits:
517 sources = makeSourceCatalog(objects, visit_time, start_id=start_id, use_mjd=self.use_mjd)
518 fsources = makeForcedSourceCatalog(objects, visit_time, visit=start_id, use_mjd=self.use_mjd)
519 apdb.store(visit_time, objects, sources, fsources)
520 start_id += nobj
521
522 replica_chunks = apdb_replica.getReplicaChunks()
523 if not self.enable_replica:
524 self.assertIsNone(replica_chunks)
525
526 with self.assertRaisesRegex(ValueError, "APDB is not configured for replication"):
527 apdb_replica.getTableDataChunks(ApdbTables.DiaObject, [])
528
529 else:
530 assert replica_chunks is not None
531 self.assertEqual(len(replica_chunks), 4)
532
533 with self.assertRaisesRegex(ValueError, "does not support replica chunks"):
534 apdb_replica.getTableDataChunks(ApdbTables.SSObject, [])
535
536 def _check_chunks(replica_chunks: list[ReplicaChunk], n_records: int | None = None) -> None:
537 if n_records is None:
538 n_records = len(replica_chunks) * nobj
539 res = apdb_replica.getTableDataChunks(
540 ApdbTables.DiaObject, (chunk.id for chunk in replica_chunks)
541 )
542 self.assert_table_data(res, n_records, ApdbTables.DiaObject)
543 validityStartColumn = "validityStartMjdTai" if self.use_mjd else "validityStart"
544 validityStartType = (
545 felis.datamodel.DataType.double if self.use_mjd else felis.datamodel.DataType.timestamp
546 )
548 res,
549 {
550 "apdb_replica_chunk": felis.datamodel.DataType.long,
551 "diaObjectId": felis.datamodel.DataType.long,
552 validityStartColumn: validityStartType,
553 "ra": felis.datamodel.DataType.double,
554 "dec": felis.datamodel.DataType.double,
555 "parallax": felis.datamodel.DataType.float,
556 "nDiaSources": felis.datamodel.DataType.int,
557 },
558 )
559
560 res = apdb_replica.getTableDataChunks(
561 ApdbTables.DiaSource, (chunk.id for chunk in replica_chunks)
562 )
563 self.assert_table_data(res, n_records, ApdbTables.DiaSource)
565 res,
566 {
567 "apdb_replica_chunk": felis.datamodel.DataType.long,
568 "diaSourceId": felis.datamodel.DataType.long,
569 "visit": felis.datamodel.DataType.long,
570 "detector": felis.datamodel.DataType.short,
571 },
572 )
573
574 res = apdb_replica.getTableDataChunks(
575 ApdbTables.DiaForcedSource, (chunk.id for chunk in replica_chunks)
576 )
577 self.assert_table_data(res, n_records, ApdbTables.DiaForcedSource)
579 res,
580 {
581 "apdb_replica_chunk": felis.datamodel.DataType.long,
582 "diaObjectId": felis.datamodel.DataType.long,
583 "visit": felis.datamodel.DataType.long,
584 "detector": felis.datamodel.DataType.short,
585 },
586 )
587
588 # read it back and check sizes
589 _check_chunks(replica_chunks, 800)
590 _check_chunks(replica_chunks[1:], 600)
591 _check_chunks(replica_chunks[1:-1], 400)
592 _check_chunks(replica_chunks[2:3], 200)
593 _check_chunks([])
594
595 # try to remove some of those
596 deleted_chunks = replica_chunks[:1]
597 apdb_replica.deleteReplicaChunks(chunk.id for chunk in deleted_chunks)
598
599 # All queries on deleted ids should return empty set.
600 _check_chunks(deleted_chunks, 0)
601
602 replica_chunks = apdb_replica.getReplicaChunks()
603 assert replica_chunks is not None
604 self.assertEqual(len(replica_chunks), 3)
605
606 _check_chunks(replica_chunks, 600)
607
608 def test_storeSSObjects(self) -> None:
609 """Store and retrieve SSObjects."""
610 # don't care about sources.
611 config = self.make_instance()
612 apdb = Apdb.from_config(config)
613
614 # make catalog with SSObjects
615 catalog = makeSSObjectCatalog(100, flags=1)
616
617 # store catalog
618 apdb.storeSSObjects(catalog)
619
620 # read it back and check sizes
621 res = apdb.getSSObjects()
622 self.assert_catalog(res, len(catalog), ApdbTables.SSObject)
623
624 # check that override works, make catalog with SSObjects, ID = 51-150
625 catalog = makeSSObjectCatalog(100, 51, flags=2)
626 apdb.storeSSObjects(catalog)
627 res = apdb.getSSObjects()
628 self.assert_catalog(res, 150, ApdbTables.SSObject)
629 self.assertEqual(len(res[res["flags"] == 1]), 50)
630 self.assertEqual(len(res[res["flags"] == 2]), 100)
631
632 def test_reassignObjects(self) -> None:
633 """Reassign DiaObjects."""
634 # don't care about sources.
635 config = self.make_instance()
636 apdb = Apdb.from_config(config)
637
638 region = self.make_region()
639 visit_time = self.visit_time
640 objects = makeObjectCatalog(region, 100, visit_time)
641 oids = list(objects["diaObjectId"])
642 sources = makeSourceCatalog(objects, visit_time, use_mjd=self.use_mjd)
643 apdb.store(visit_time, objects, sources)
644
645 catalog = makeSSObjectCatalog(100)
646 apdb.storeSSObjects(catalog)
647
648 # read it back and filter by ID
649 res = apdb.getDiaSources(region, oids, visit_time)
650 self.assert_catalog(res, len(sources), ApdbTables.DiaSource)
651
652 apdb.reassignDiaSources({1: 1, 2: 2, 5: 5})
653 res = apdb.getDiaSources(region, oids, visit_time)
654 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
655
656 with self.assertRaisesRegex(ValueError, r"do not exist.*\D1000"):
657 apdb.reassignDiaSources(
658 {
659 1000: 1,
660 7: 3,
661 }
662 )
663 self.assert_catalog(res, len(sources) - 3, ApdbTables.DiaSource)
664
665 def test_midpointMjdTai_src(self) -> None:
666 """Test for time filtering of DiaSources."""
667 config = self.make_instance()
668 apdb = Apdb.from_config(config)
669
670 region = self.make_region()
671 # 2021-01-01 plus 360 days is 2021-12-27
672 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
673 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
674 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
675 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
676 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
677
678 objects = makeObjectCatalog(region, 100, visit_time0)
679 oids = list(objects["diaObjectId"])
680 sources = makeSourceCatalog(objects, src_time1, 0, use_mjd=self.use_mjd)
681 apdb.store(src_time1, objects, sources)
682
683 sources = makeSourceCatalog(objects, src_time2, 100, use_mjd=self.use_mjd)
684 apdb.store(src_time2, objects, sources)
685
686 # reading at time of last save should read all
687 res = apdb.getDiaSources(region, oids, src_time2)
688 self.assert_catalog(res, 200, ApdbTables.DiaSource)
689
690 # one second before 12 months
691 res = apdb.getDiaSources(region, oids, visit_time0)
692 self.assert_catalog(res, 200, ApdbTables.DiaSource)
693
694 # reading at later time of last save should only read a subset
695 res = apdb.getDiaSources(region, oids, visit_time1)
696 self.assert_catalog(res, 100, ApdbTables.DiaSource)
697
698 # reading at later time of last save should only read a subset
699 res = apdb.getDiaSources(region, oids, visit_time2)
700 self.assert_catalog(res, 0, ApdbTables.DiaSource)
701
702 def test_midpointMjdTai_fsrc(self) -> None:
703 """Test for time filtering of DiaForcedSources."""
704 config = self.make_instance()
705 apdb = Apdb.from_config(config)
706
707 region = self.make_region()
708 src_time1 = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
709 src_time2 = astropy.time.Time("2021-01-01T00:00:02", format="isot", scale="tai")
710 visit_time0 = astropy.time.Time("2021-12-26T23:59:59", format="isot", scale="tai")
711 visit_time1 = astropy.time.Time("2021-12-27T00:00:01", format="isot", scale="tai")
712 visit_time2 = astropy.time.Time("2021-12-27T00:00:03", format="isot", scale="tai")
713
714 objects = makeObjectCatalog(region, 100, visit_time0)
715 oids = list(objects["diaObjectId"])
716 sources = makeForcedSourceCatalog(objects, src_time1, 1, use_mjd=self.use_mjd)
717 apdb.store(src_time1, objects, forced_sources=sources)
718
719 sources = makeForcedSourceCatalog(objects, src_time2, 2, use_mjd=self.use_mjd)
720 apdb.store(src_time2, objects, forced_sources=sources)
721
722 # reading at time of last save should read all
723 res = apdb.getDiaForcedSources(region, oids, src_time2)
724 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
725
726 # one second before 12 months
727 res = apdb.getDiaForcedSources(region, oids, visit_time0)
728 self.assert_catalog(res, 200, ApdbTables.DiaForcedSource)
729
730 # reading at later time of last save should only read a subset
731 res = apdb.getDiaForcedSources(region, oids, visit_time1)
732 self.assert_catalog(res, 100, ApdbTables.DiaForcedSource)
733
734 # reading at later time of last save should only read a subset
735 res = apdb.getDiaForcedSources(region, oids, visit_time2)
736 self.assert_catalog(res, 0, ApdbTables.DiaForcedSource)
737
738 def test_metadata(self) -> None:
739 """Simple test for writing/reading metadata table"""
740 config = self.make_instance()
741 apdb = Apdb.from_config(config)
742 metadata = apdb.metadata
743
744 # APDB should write two or three metadata items with version numbers
745 # and a frozen JSON config.
746 self.assertFalse(metadata.empty())
747 self.assertEqual(len(list(metadata.items())), self.meta_row_count)
748
749 metadata.set("meta", "data")
750 metadata.set("data", "meta")
751
752 self.assertFalse(metadata.empty())
753 self.assertTrue(set(metadata.items()) >= {("meta", "data"), ("data", "meta")})
754
755 with self.assertRaisesRegex(KeyError, "Metadata key 'meta' already exists"):
756 metadata.set("meta", "data1")
757
758 metadata.set("meta", "data2", force=True)
759 self.assertTrue(set(metadata.items()) >= {("meta", "data2"), ("data", "meta")})
760
761 self.assertTrue(metadata.delete("meta"))
762 self.assertIsNone(metadata.get("meta"))
763 self.assertFalse(metadata.delete("meta"))
764
765 self.assertEqual(metadata.get("data"), "meta")
766 self.assertEqual(metadata.get("meta", "meta"), "meta")
767
768 def test_schemaVersionFromYaml(self) -> None:
769 """Check version number handling for reading schema from YAML."""
770 config = self.make_instance()
771 default_schema = config.schema_file
772 apdb = Apdb.from_config(config)
773 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
774
775 with update_schema_yaml(default_schema, version="") as schema_file:
776 config = self.make_instance(schema_file=schema_file)
777 apdb = Apdb.from_config(config)
778 self.assertEqual(
779 apdb._schema.schemaVersion(), # type: ignore[attr-defined]
780 VersionTuple(0, 1, 0),
781 )
782
783 with update_schema_yaml(default_schema, version="99.0.0") as schema_file:
784 config = self.make_instance(schema_file=schema_file)
785 apdb = Apdb.from_config(config)
786 self.assertEqual(
787 apdb._schema.schemaVersion(), # type: ignore[attr-defined]
788 VersionTuple(99, 0, 0),
789 )
790
791 def test_config_freeze(self) -> None:
792 """Test that some config fields are correctly frozen in database."""
793 config = self.make_instance()
794
795 # `enable_replica` is the only parameter that is frozen in all
796 # implementations.
797 config.enable_replica = not self.enable_replica
798 apdb = Apdb.from_config(config)
799 frozen_config = apdb.getConfig()
800 self.assertEqual(frozen_config.enable_replica, self.enable_replica)
801
802
804 """Base class for unit tests that verify how schema changes work."""
805
806 visit_time = astropy.time.Time("2021-01-01T00:00:00", format="isot", scale="tai")
807
808 @abstractmethod
809 def make_instance(self, **kwargs: Any) -> ApdbConfig:
810 """Make config class instance used in all tests.
811
812 This method should return configuration that point to the identical
813 database instance on each call (i.e. ``db_url`` must be the same,
814 which also means for sqlite it has to use on-disk storage).
815 """
816 raise NotImplementedError()
817
818 def make_region(self, xyz: tuple[float, float, float] = (1.0, 1.0, -1.0)) -> Region:
819 """Make a region to use in tests"""
820 return _make_region(xyz)
821
822 def test_schema_add_replica(self) -> None:
823 """Check that new code can work with old schema without replica
824 tables.
825 """
826 # Make schema without replica tables.
827 config = self.make_instance(enable_replica=False)
828 apdb = Apdb.from_config(config)
829 apdb_replica = ApdbReplica.from_config(config)
830
831 # Make APDB instance configured for replication.
832 config.enable_replica = True
833 apdb = Apdb.from_config(config)
834
835 # Try to insert something, should work OK.
836 region = self.make_region()
837 visit_time = self.visit_time
838
839 # have to store Objects first
840 objects = makeObjectCatalog(region, 100, visit_time)
841 sources = makeSourceCatalog(objects, visit_time)
842 fsources = makeForcedSourceCatalog(objects, visit_time)
843 apdb.store(visit_time, objects, sources, fsources)
844
845 # There should be no replica chunks.
846 replica_chunks = apdb_replica.getReplicaChunks()
847 self.assertIsNone(replica_chunks)
848
849 def test_schemaVersionCheck(self) -> None:
850 """Check version number compatibility."""
851 config = self.make_instance()
852 apdb = Apdb.from_config(config)
853
854 self.assertEqual(apdb._schema.schemaVersion(), VersionTuple(0, 1, 1)) # type: ignore[attr-defined]
855
856 # Claim that schema version is now 99.0.0, must raise an exception.
857 with update_schema_yaml(config.schema_file, version="99.0.0") as schema_file:
858 config.schema_file = schema_file
859 with self.assertRaises(IncompatibleVersionError):
860 apdb = Apdb.from_config(config)
861 # Version is checked only when we try to do connect.
862 apdb.metadata.items()
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:809
Region make_region(self, tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:818
None assert_catalog(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:164
ApdbConfig make_instance(self, **Any kwargs)
Definition _apdb.py:150
Pixelization pixelization(self, ApdbConfig config)
Definition _apdb.py:160
Region make_region(self, tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:207
None assert_column_types(self, Any catalog, dict[str, felis.datamodel.DataType] types)
Definition _apdb.py:202
None assert_table_data(self, Any catalog, int rows, ApdbTables table)
Definition _apdb.py:180
ApdbTables getDiaObjects_table(self)
Definition _apdb.py:155
Angle represents an angle in radians.
Definition Angle.h:50
Circle is a circular region on the unit sphere that contains its boundary.
Definition Circle.h:54
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Region _make_region(tuple[float, float, float] xyz=(1.0, 1.0, -1.0))
Definition _apdb.py:64
Iterator[str] update_schema_yaml(str schema_file, bool drop_metadata=False, str|None version=None)
Definition _apdb.py:77