LSST Applications g0603fd7c41+501e3db9f9,g0aad566f14+23d8574c86,g0dd44d6229+a1a4c8b791,g2079a07aa2+86d27d4dc4,g2305ad1205+a62672bbc1,g2bbee38e9b+047b288a59,g337abbeb29+047b288a59,g33d1c0ed96+047b288a59,g3a166c0a6a+047b288a59,g3d1719c13e+23d8574c86,g487adcacf7+cb7fd919b2,g4be5004598+23d8574c86,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+4a9e435310,g63cd9335cc+585e252eca,g858d7b2824+23d8574c86,g88963caddf+0cb8e002cc,g99cad8db69+43388bcaec,g9ddcbc5298+9a081db1e4,ga1e77700b3+a912195c07,gae0086650b+585e252eca,gb0e22166c9+60f28cb32d,gb2522980b2+793639e996,gb3a676b8dc+b4feba26a1,gb4b16eec92+63f8520565,gba4ed39666+c2a2e4ac27,gbb8dafda3b+a5d255a82e,gc120e1dc64+d820f8acdb,gc28159a63d+047b288a59,gc3e9b769f7+f4f1cc6b50,gcf0d15dbbd+a1a4c8b791,gdaeeff99f8+f9a426f77a,gdb0af172c8+b6d5496702,ge79ae78c31+047b288a59,w.2024.19
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSql.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlConfig", "ApdbSql"]
28
29import logging
30from collections.abc import Iterable, Mapping, MutableMapping
31from contextlib import closing, suppress
32from typing import TYPE_CHECKING, Any, cast
33
34import astropy.time
35import numpy as np
36import pandas
37import sqlalchemy
38import sqlalchemy.dialects.postgresql
39import sqlalchemy.dialects.sqlite
40from lsst.pex.config import ChoiceField, Field, ListField
41from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
42from lsst.utils.iteration import chunk_iterable
43from sqlalchemy import func, sql
44from sqlalchemy.pool import NullPool
45
46from ..apdb import Apdb, ApdbConfig
47from ..apdbConfigFreezer import ApdbConfigFreezer
48from ..apdbReplica import ReplicaChunk
49from ..apdbSchema import ApdbTables
50from ..monitor import MonAgent
51from ..schema_model import Table
52from ..timer import Timer
53from ..versionTuple import IncompatibleVersionError, VersionTuple
54from .apdbMetadataSql import ApdbMetadataSql
55from .apdbSqlReplica import ApdbSqlReplica
56from .apdbSqlSchema import ApdbSqlSchema, ExtraTables
57
58if TYPE_CHECKING:
59 import sqlite3
60
61 from ..apdbMetadata import ApdbMetadata
62
63_LOG = logging.getLogger(__name__)
64
65_MON = MonAgent(__name__)
66
67VERSION = VersionTuple(0, 1, 0)
68"""Version for the code controlling non-replication tables. This needs to be
69updated following compatibility rules when schema produced by this code
70changes.
71"""
72
73
74def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
75 """Change the type of uint64 columns to int64, and return copy of data
76 frame.
77 """
78 names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
79 return df.astype({name: np.int64 for name in names})
80
81
82def _make_midpointMjdTai_start(visit_time: astropy.time.Time, months: int) -> float:
83 """Calculate starting point for time-based source search.
84
85 Parameters
86 ----------
87 visit_time : `astropy.time.Time`
88 Time of current visit.
89 months : `int`
90 Number of months in the sources history.
91
92 Returns
93 -------
94 time : `float`
95 A ``midpointMjdTai`` starting point, MJD time.
96 """
97 # TODO: Use of MJD must be consistent with the code in ap_association
98 # (see DM-31996)
99 return visit_time.mjd - months * 30
100
101
103 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
104) -> None:
105 # Enable foreign keys
106 with closing(dbapiConnection.cursor()) as cursor:
107 cursor.execute("PRAGMA foreign_keys=ON;")
108
109
111 """APDB configuration class for SQL implementation (ApdbSql)."""
112
113 db_url = Field[str](doc="SQLAlchemy database connection URI")
114 isolation_level = ChoiceField[str](
115 doc=(
116 "Transaction isolation level, if unset then backend-default value "
117 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
118 "Some backends may not support every allowed value."
119 ),
120 allowed={
121 "READ_COMMITTED": "Read committed",
122 "READ_UNCOMMITTED": "Read uncommitted",
123 "REPEATABLE_READ": "Repeatable read",
124 "SERIALIZABLE": "Serializable",
125 },
126 default=None,
127 optional=True,
128 )
129 connection_pool = Field[bool](
130 doc="If False then disable SQLAlchemy connection pool. Do not use connection pool when forking.",
131 default=True,
132 )
133 connection_timeout = Field[float](
134 doc=(
135 "Maximum time to wait time for database lock to be released before exiting. "
136 "Defaults to sqlalchemy defaults if not set."
137 ),
138 default=None,
139 optional=True,
140 )
141 sql_echo = Field[bool](doc="If True then pass SQLAlchemy echo option.", default=False)
142 dia_object_index = ChoiceField[str](
143 doc="Indexing mode for DiaObject table",
144 allowed={
145 "baseline": "Index defined in baseline schema",
146 "pix_id_iov": "(pixelId, objectId, iovStart) PK",
147 "last_object_table": "Separate DiaObjectLast table",
148 },
149 default="baseline",
150 )
151 htm_level = Field[int](doc="HTM indexing level", default=20)
152 htm_max_ranges = Field[int](doc="Max number of ranges in HTM envelope", default=64)
153 htm_index_column = Field[str](
154 default="pixelId", doc="Name of a HTM index column for DiaObject and DiaSource tables"
155 )
156 ra_dec_columns = ListField[str](default=["ra", "dec"], doc="Names of ra/dec columns in DiaObject table")
157 dia_object_columns = ListField[str](
158 doc="List of columns to read from DiaObject, by default read all columns", default=[]
159 )
160 prefix = Field[str](doc="Prefix to add to table names and index names", default="")
161 namespace = Field[str](
162 doc=(
163 "Namespace or schema name for all tables in APDB database. "
164 "Presently only works for PostgreSQL backend. "
165 "If schema with this name does not exist it will be created when "
166 "APDB tables are created."
167 ),
168 default=None,
169 optional=True,
170 )
171 timer = Field[bool](doc="If True then print/log timing information", default=False)
172
173 def validate(self) -> None:
174 super().validate()
175 if len(self.ra_dec_columns) != 2:
176 raise ValueError("ra_dec_columns must have exactly two column names")
177
178
180 """Implementation of APDB interface based on SQL database.
181
182 The implementation is configured via standard ``pex_config`` mechanism
183 using `ApdbSqlConfig` configuration class. For an example of different
184 configurations check ``config/`` folder.
185
186 Parameters
187 ----------
188 config : `ApdbSqlConfig`
189 Configuration object.
190 """
191
192 ConfigClass = ApdbSqlConfig
193
194 metadataSchemaVersionKey = "version:schema"
195 """Name of the metadata key to store schema version number."""
196
197 metadataCodeVersionKey = "version:ApdbSql"
198 """Name of the metadata key to store code version number."""
199
200 metadataReplicaVersionKey = "version:ApdbSqlReplica"
201 """Name of the metadata key to store replica code version number."""
202
203 metadataConfigKey = "config:apdb-sql.json"
204 """Name of the metadata key to store code version number."""
205
206 _frozen_parameters = (
207 "use_insert_id",
208 "dia_object_index",
209 "htm_level",
210 "htm_index_column",
211 "ra_dec_columns",
212 )
213 """Names of the config parameters to be frozen in metadata table."""
214
215 def __init__(self, config: ApdbSqlConfig):
216 self._engine = self._makeEngine(config)
217
218 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
219 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
220 meta_table: sqlalchemy.schema.Table | None = None
221 with suppress(sqlalchemy.exc.NoSuchTableError):
222 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
223
224 self._metadata = ApdbMetadataSql(self._engine, meta_table)
225
226 # Read frozen config from metadata.
227 config_json = self._metadata.get(self.metadataConfigKeymetadataConfigKey)
228 if config_json is not None:
229 # Update config from metadata.
230 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
231 self.config = freezer.update(config, config_json)
232 else:
233 self.config = config
234 self.config.validate()
235
237 engine=self._engine,
238 dia_object_index=self.config.dia_object_index,
239 schema_file=self.config.schema_file,
240 schema_name=self.config.schema_name,
241 prefix=self.config.prefix,
242 namespace=self.config.namespace,
243 htm_index_column=self.config.htm_index_column,
244 enable_replica=self.config.use_insert_id,
245 )
246
247 if self._metadata.table_exists():
248 self._versionCheck(self._metadata)
249
250 self.pixelator = HtmPixelization(self.config.htm_level)
251
252 _LOG.debug("APDB Configuration:")
253 _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
254 _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
255 _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
256 _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
257 _LOG.debug(" schema_file: %s", self.config.schema_file)
258 _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
259 _LOG.debug(" schema prefix: %s", self.config.prefix)
260
261 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
262 if self.config.timer:
263 self._timer_args.append(_LOG)
264
265 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
266 """Create `Timer` instance given its name."""
267 return Timer(name, *self._timer_args, tags=tags)
268
269 @classmethod
270 def _makeEngine(cls, config: ApdbSqlConfig) -> sqlalchemy.engine.Engine:
271 """Make SQLALchemy engine based on configured parameters.
272
273 Parameters
274 ----------
275 config : `ApdbSqlConfig`
276 Configuration object.
277 """
278 # engine is reused between multiple processes, make sure that we don't
279 # share connections by disabling pool (by using NullPool class)
280 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
281 conn_args: dict[str, Any] = dict()
282 if not config.connection_pool:
283 kw.update(poolclass=NullPool)
284 if config.isolation_level is not None:
285 kw.update(isolation_level=config.isolation_level)
286 elif config.db_url.startswith("sqlite"): # type: ignore
287 # Use READ_UNCOMMITTED as default value for sqlite.
288 kw.update(isolation_level="READ_UNCOMMITTED")
289 if config.connection_timeout is not None:
290 if config.db_url.startswith("sqlite"):
291 conn_args.update(timeout=config.connection_timeout)
292 elif config.db_url.startswith(("postgresql", "mysql")):
293 conn_args.update(connect_timeout=config.connection_timeout)
294 kw.update(connect_args=conn_args)
295 engine = sqlalchemy.create_engine(config.db_url, **kw)
296
297 if engine.dialect.name == "sqlite":
298 # Need to enable foreign keys on every new connection.
299 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
300
301 return engine
302
303 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
304 """Check schema version compatibility."""
305
306 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
307 """Retrieve version number from given metadata key."""
308 if metadata.table_exists():
309 version_str = metadata.get(key)
310 if version_str is None:
311 # Should not happen with existing metadata table.
312 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
313 return VersionTuple.fromString(version_str)
314 return default
315
316 # For old databases where metadata table does not exist we assume that
317 # version of both code and schema is 0.1.0.
318 initial_version = VersionTuple(0, 1, 0)
319 db_schema_version = _get_version(self.metadataSchemaVersionKeymetadataSchemaVersionKey, initial_version)
320 db_code_version = _get_version(self.metadataCodeVersionKeymetadataCodeVersionKey, initial_version)
321
322 # For now there is no way to make read-only APDB instances, assume that
323 # any access can do updates.
324 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
326 f"Configured schema version {self._schema.schemaVersion()} "
327 f"is not compatible with database version {db_schema_version}"
328 )
329 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
331 f"Current code version {self.apdbImplementationVersion()} "
332 f"is not compatible with database version {db_code_version}"
333 )
334
335 # Check replica code version only if replica is enabled.
336 if self._schema.has_replica_chunks:
337 db_replica_version = _get_version(self.metadataReplicaVersionKeymetadataReplicaVersionKey, initial_version)
338 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
339 if not code_replica_version.checkCompatibility(db_replica_version):
341 f"Current replication code version {code_replica_version} "
342 f"is not compatible with database version {db_replica_version}"
343 )
344
345 @classmethod
346 def apdbImplementationVersion(cls) -> VersionTuple:
347 """Return version number for current APDB implementation.
348
349 Returns
350 -------
351 version : `VersionTuple`
352 Version of the code defined in implementation class.
353 """
354 return VERSION
355
356 @classmethod
358 cls,
359 db_url: str,
360 *,
361 schema_file: str | None = None,
362 schema_name: str | None = None,
363 read_sources_months: int | None = None,
364 read_forced_sources_months: int | None = None,
365 use_insert_id: bool = False,
366 connection_timeout: int | None = None,
367 dia_object_index: str | None = None,
368 htm_level: int | None = None,
369 htm_index_column: str | None = None,
370 ra_dec_columns: list[str] | None = None,
371 prefix: str | None = None,
372 namespace: str | None = None,
373 drop: bool = False,
374 ) -> ApdbSqlConfig:
375 """Initialize new APDB instance and make configuration object for it.
376
377 Parameters
378 ----------
379 db_url : `str`
380 SQLAlchemy database URL.
381 schema_file : `str`, optional
382 Location of (YAML) configuration file with APDB schema. If not
383 specified then default location will be used.
384 schema_name : str | None
385 Name of the schema in YAML configuration file. If not specified
386 then default name will be used.
387 read_sources_months : `int`, optional
388 Number of months of history to read from DiaSource.
389 read_forced_sources_months : `int`, optional
390 Number of months of history to read from DiaForcedSource.
391 use_insert_id : `bool`
392 If True, make additional tables used for replication to PPDB.
393 connection_timeout : `int`, optional
394 Database connection timeout in seconds.
395 dia_object_index : `str`, optional
396 Indexing mode for DiaObject table.
397 htm_level : `int`, optional
398 HTM indexing level.
399 htm_index_column : `str`, optional
400 Name of a HTM index column for DiaObject and DiaSource tables.
401 ra_dec_columns : `list` [`str`], optional
402 Names of ra/dec columns in DiaObject table.
403 prefix : `str`, optional
404 Optional prefix for all table names.
405 namespace : `str`, optional
406 Name of the database schema for all APDB tables. If not specified
407 then default schema is used.
408 drop : `bool`, optional
409 If `True` then drop existing tables before re-creating the schema.
410
411 Returns
412 -------
413 config : `ApdbSqlConfig`
414 Resulting configuration object for a created APDB instance.
415 """
416 config = ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
417 if schema_file is not None:
418 config.schema_file = schema_file
419 if schema_name is not None:
420 config.schema_name = schema_name
421 if read_sources_months is not None:
422 config.read_sources_months = read_sources_months
423 if read_forced_sources_months is not None:
424 config.read_forced_sources_months = read_forced_sources_months
425 if connection_timeout is not None:
426 config.connection_timeout = connection_timeout
427 if dia_object_index is not None:
428 config.dia_object_index = dia_object_index
429 if htm_level is not None:
430 config.htm_level = htm_level
431 if htm_index_column is not None:
432 config.htm_index_column = htm_index_column
433 if ra_dec_columns is not None:
434 config.ra_dec_columns = ra_dec_columns
435 if prefix is not None:
436 config.prefix = prefix
437 if namespace is not None:
438 config.namespace = namespace
439
440 cls._makeSchema(config, drop=drop)
441
442 return config
443
444 def get_replica(self) -> ApdbSqlReplica:
445 """Return `ApdbReplica` instance for this database."""
446 return ApdbSqlReplica(self._schema, self._engine)
447
448 def tableRowCount(self) -> dict[str, int]:
449 """Return dictionary with the table names and row counts.
450
451 Used by ``ap_proto`` to keep track of the size of the database tables.
452 Depending on database technology this could be expensive operation.
453
454 Returns
455 -------
456 row_counts : `dict`
457 Dict where key is a table name and value is a row count.
458 """
459 res = {}
460 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
461 if self.config.dia_object_index == "last_object_table":
462 tables.append(ApdbTables.DiaObjectLast)
463 with self._engine.begin() as conn:
464 for table in tables:
465 sa_table = self._schema.get_table(table)
466 stmt = sql.select(func.count()).select_from(sa_table)
467 count: int = conn.execute(stmt).scalar_one()
468 res[table.name] = count
469
470 return res
471
472 def tableDef(self, table: ApdbTables) -> Table | None:
473 # docstring is inherited from a base class
474 return self._schema.tableSchemas.get(table)
475
476 @classmethod
477 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
478 # docstring is inherited from a base class
479
480 if not isinstance(config, ApdbSqlConfig):
481 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
482
483 engine = cls._makeEngine(config)
484
485 # Ask schema class to create all tables.
486 schema = ApdbSqlSchema(
487 engine=engine,
488 dia_object_index=config.dia_object_index,
489 schema_file=config.schema_file,
490 schema_name=config.schema_name,
491 prefix=config.prefix,
492 namespace=config.namespace,
493 htm_index_column=config.htm_index_column,
494 enable_replica=config.use_insert_id,
495 )
496 schema.makeSchema(drop=drop)
497
498 # Need metadata table to store few items in it, if table exists.
499 meta_table: sqlalchemy.schema.Table | None = None
500 with suppress(ValueError):
501 meta_table = schema.get_table(ApdbTables.metadata)
502
503 apdb_meta = ApdbMetadataSql(engine, meta_table)
504 if apdb_meta.table_exists():
505 # Fill version numbers, overwrite if they are already there.
506 apdb_meta.set(cls.metadataSchemaVersionKeymetadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
508 if config.use_insert_id:
509 # Only store replica code version if replcia is enabled.
510 apdb_meta.set(
512 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
513 force=True,
514 )
515
516 # Store frozen part of a configuration in metadata.
517 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
518 apdb_meta.set(cls.metadataConfigKeymetadataConfigKey, freezer.to_json(config), force=True)
519
520 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
521 # docstring is inherited from a base class
522
523 # decide what columns we need
524 if self.config.dia_object_index == "last_object_table":
525 table_enum = ApdbTables.DiaObjectLast
526 else:
527 table_enum = ApdbTables.DiaObject
528 table = self._schema.get_table(table_enum)
529 if not self.config.dia_object_columns:
530 columns = self._schema.get_apdb_columns(table_enum)
531 else:
532 columns = [table.c[col] for col in self.config.dia_object_columns]
533 query = sql.select(*columns)
534
535 # build selection
536 query = query.where(self._filterRegion(table, region))
537
538 # select latest version of objects
539 if self.config.dia_object_index != "last_object_table":
540 query = query.where(table.c.validityEnd == None) # noqa: E711
541
542 # _LOG.debug("query: %s", query)
543
544 # execute select
545 with self._timer("select_time", tags={"table": "DiaObject"}):
546 with self._engine.begin() as conn:
547 objects = pandas.read_sql_query(query, conn)
548 _LOG.debug("found %s DiaObjects", len(objects))
549 return objects
550
552 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
553 ) -> pandas.DataFrame | None:
554 # docstring is inherited from a base class
555 if self.config.read_sources_months == 0:
556 _LOG.debug("Skip DiaSources fetching")
557 return None
558
559 if object_ids is None:
560 # region-based select
561 return self._getDiaSourcesInRegion(region, visit_time)
562 else:
563 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
564
566 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
567 ) -> pandas.DataFrame | None:
568 # docstring is inherited from a base class
569 if self.config.read_forced_sources_months == 0:
570 _LOG.debug("Skip DiaForceSources fetching")
571 return None
572
573 if object_ids is None:
574 # This implementation does not support region-based selection.
575 raise NotImplementedError("Region-based selection is not supported")
576
577 # TODO: DateTime.MJD must be consistent with code in ap_association,
578 # alternatively we can fill midpointMjdTai ourselves in store()
579 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
580 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
581
582 with self._timer("select_time", tags={"table": "DiaForcedSource"}):
583 sources = self._getSourcesByIDs(
584 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
585 )
586
587 _LOG.debug("found %s DiaForcedSources", len(sources))
588 return sources
589
590 def containsVisitDetector(self, visit: int, detector: int) -> bool:
591 # docstring is inherited from a base class
592 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
593 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
594 # Query should load only one leaf page of the index
595 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
596
597 with self._engine.begin() as conn:
598 result = conn.execute(query1).scalar_one_or_none()
599 if result is not None:
600 return True
601 else:
602 # Backup query if an image was processed but had no diaSources
603 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
604 result = conn.execute(query2).scalar_one_or_none()
605 return result is not None
606
607 def getSSObjects(self) -> pandas.DataFrame:
608 # docstring is inherited from a base class
609
610 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
611 query = sql.select(*columns)
612
613 # execute select
614 with self._timer("SSObject_select_time", tags={"table": "SSObject"}):
615 with self._engine.begin() as conn:
616 objects = pandas.read_sql_query(query, conn)
617 _LOG.debug("found %s SSObjects", len(objects))
618 return objects
619
620 def store(
621 self,
622 visit_time: astropy.time.Time,
623 objects: pandas.DataFrame,
624 sources: pandas.DataFrame | None = None,
625 forced_sources: pandas.DataFrame | None = None,
626 ) -> None:
627 # docstring is inherited from a base class
628
629 # We want to run all inserts in one transaction.
630 with self._engine.begin() as connection:
631 replica_chunk: ReplicaChunk | None = None
632 if self._schema.has_replica_chunks:
633 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
634 self._storeReplicaChunk(replica_chunk, visit_time, connection)
635
636 # fill pixelId column for DiaObjects
637 objects = self._add_obj_htm_index(objects)
638 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
639
640 if sources is not None:
641 # copy pixelId column from DiaObjects to DiaSources
642 sources = self._add_src_htm_index(sources, objects)
643 self._storeDiaSources(sources, replica_chunk, connection)
644
645 if forced_sources is not None:
646 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
647
648 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
649 # docstring is inherited from a base class
650
651 idColumn = "ssObjectId"
652 table = self._schema.get_table(ApdbTables.SSObject)
653
654 # everything to be done in single transaction
655 with self._engine.begin() as conn:
656 # Find record IDs that already exist. Some types like np.int64 can
657 # cause issues with sqlalchemy, convert them to int.
658 ids = sorted(int(oid) for oid in objects[idColumn])
659
660 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
661 result = conn.execute(query)
662 knownIds = set(row.ssObjectId for row in result)
663
664 filter = objects[idColumn].isin(knownIds)
665 toUpdate = cast(pandas.DataFrame, objects[filter])
666 toInsert = cast(pandas.DataFrame, objects[~filter])
667
668 # insert new records
669 if len(toInsert) > 0:
670 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
671
672 # update existing records
673 if len(toUpdate) > 0:
674 whereKey = f"{idColumn}_param"
675 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
676 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
677 values = toUpdate.to_dict("records")
678 result = conn.execute(update, values)
679
680 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
681 # docstring is inherited from a base class
682
683 table = self._schema.get_table(ApdbTables.DiaSource)
684 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
685
686 with self._engine.begin() as conn:
687 # Need to make sure that every ID exists in the database, but
688 # executemany may not support rowcount, so iterate and check what
689 # is missing.
690 missing_ids: list[int] = []
691 for key, value in idMap.items():
692 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
693 result = conn.execute(query, params)
694 if result.rowcount == 0:
695 missing_ids.append(key)
696 if missing_ids:
697 missing = ",".join(str(item) for item in missing_ids)
698 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
699
700 def dailyJob(self) -> None:
701 # docstring is inherited from a base class
702 pass
703
704 def countUnassociatedObjects(self) -> int:
705 # docstring is inherited from a base class
706
707 # Retrieve the DiaObject table.
708 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
709
710 # Construct the sql statement.
711 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
712 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
713
714 # Return the count.
715 with self._engine.begin() as conn:
716 count = conn.execute(stmt).scalar_one()
717
718 return count
719
720 @property
721 def metadata(self) -> ApdbMetadata:
722 # docstring is inherited from a base class
723 if self._metadata is None:
724 raise RuntimeError("Database schema was not initialized.")
725 return self._metadata
726
727 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
728 """Return catalog of DiaSource instances from given region.
729
730 Parameters
731 ----------
732 region : `lsst.sphgeom.Region`
733 Region to search for DIASources.
734 visit_time : `astropy.time.Time`
735 Time of the current visit.
736
737 Returns
738 -------
739 catalog : `pandas.DataFrame`
740 Catalog containing DiaSource records.
741 """
742 # TODO: DateTime.MJD must be consistent with code in ap_association,
743 # alternatively we can fill midpointMjdTai ourselves in store()
744 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
745 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
746
747 table = self._schema.get_table(ApdbTables.DiaSource)
748 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
749 query = sql.select(*columns)
750
751 # build selection
752 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
753 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
754 query = query.where(where)
755
756 # execute select
757 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}):
758 with self._engine.begin() as conn:
759 sources = pandas.read_sql_query(query, conn)
760 _LOG.debug("found %s DiaSources", len(sources))
761 return sources
762
763 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
764 """Return catalog of DiaSource instances given set of DiaObject IDs.
765
766 Parameters
767 ----------
768 object_ids :
769 Collection of DiaObject IDs
770 visit_time : `astropy.time.Time`
771 Time of the current visit.
772
773 Returns
774 -------
775 catalog : `pandas.DataFrame`
776 Catalog contaning DiaSource records.
777 """
778 # TODO: DateTime.MJD must be consistent with code in ap_association,
779 # alternatively we can fill midpointMjdTai ourselves in store()
780 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
781 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
782
783 with self._timer("select_time", tags={"table": "DiaSource"}):
784 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
785
786 _LOG.debug("found %s DiaSources", len(sources))
787 return sources
788
790 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
791 ) -> pandas.DataFrame:
792 """Return catalog of DiaSource or DiaForcedSource instances given set
793 of DiaObject IDs.
794
795 Parameters
796 ----------
797 table : `sqlalchemy.schema.Table`
798 Database table.
799 object_ids :
800 Collection of DiaObject IDs
801 midpointMjdTai_start : `float`
802 Earliest midpointMjdTai to retrieve.
803
804 Returns
805 -------
806 catalog : `pandas.DataFrame`
807 Catalog contaning DiaSource records. `None` is returned if
808 ``read_sources_months`` configuration parameter is set to 0 or
809 when ``object_ids`` is empty.
810 """
811 table = self._schema.get_table(table_enum)
812 columns = self._schema.get_apdb_columns(table_enum)
813
814 sources: pandas.DataFrame | None = None
815 if len(object_ids) <= 0:
816 _LOG.debug("ID list is empty, just fetch empty result")
817 query = sql.select(*columns).where(sql.literal(False))
818 with self._engine.begin() as conn:
819 sources = pandas.read_sql_query(query, conn)
820 else:
821 data_frames: list[pandas.DataFrame] = []
822 for ids in chunk_iterable(sorted(object_ids), 1000):
823 query = sql.select(*columns)
824
825 # Some types like np.int64 can cause issues with
826 # sqlalchemy, convert them to int.
827 int_ids = [int(oid) for oid in ids]
828
829 # select by object id
830 query = query.where(
831 sql.expression.and_(
832 table.columns["diaObjectId"].in_(int_ids),
833 table.columns["midpointMjdTai"] > midpointMjdTai_start,
834 )
835 )
836
837 # execute select
838 with self._engine.begin() as conn:
839 data_frames.append(pandas.read_sql_query(query, conn))
840
841 if len(data_frames) == 1:
842 sources = data_frames[0]
843 else:
844 sources = pandas.concat(data_frames)
845 assert sources is not None, "Catalog cannot be None"
846 return sources
847
849 self,
850 replica_chunk: ReplicaChunk,
851 visit_time: astropy.time.Time,
852 connection: sqlalchemy.engine.Connection,
853 ) -> None:
854 dt = visit_time.datetime
855
856 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
857
858 # We need UPSERT which is dialect-specific construct
859 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
860 row = {"apdb_replica_chunk": replica_chunk.id} | values
861 if connection.dialect.name == "sqlite":
862 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
863 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
864 connection.execute(insert_sqlite, row)
865 elif connection.dialect.name == "postgresql":
866 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
867 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
868 connection.execute(insert_pg, row)
869 else:
870 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
871
873 self,
874 objs: pandas.DataFrame,
875 visit_time: astropy.time.Time,
876 replica_chunk: ReplicaChunk | None,
877 connection: sqlalchemy.engine.Connection,
878 ) -> None:
879 """Store catalog of DiaObjects from current visit.
880
881 Parameters
882 ----------
883 objs : `pandas.DataFrame`
884 Catalog with DiaObject records.
885 visit_time : `astropy.time.Time`
886 Time of the visit.
887 replica_chunk : `ReplicaChunk`
888 Insert identifier.
889 """
890 if len(objs) == 0:
891 _LOG.debug("No objects to write to database.")
892 return
893
894 # Some types like np.int64 can cause issues with sqlalchemy, convert
895 # them to int.
896 ids = sorted(int(oid) for oid in objs["diaObjectId"])
897 _LOG.debug("first object ID: %d", ids[0])
898
899 # TODO: Need to verify that we are using correct scale here for
900 # DATETIME representation (see DM-31996).
901 dt = visit_time.datetime
902
903 # everything to be done in single transaction
904 if self.config.dia_object_index == "last_object_table":
905 # Insert and replace all records in LAST table.
906 table = self._schema.get_table(ApdbTables.DiaObjectLast)
907
908 # Drop the previous objects (pandas cannot upsert).
909 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
910
911 with self._timer("delete_time", tags={"table": table.name}):
912 res = connection.execute(query)
913 _LOG.debug("deleted %s objects", res.rowcount)
914
915 # DiaObjectLast is a subset of DiaObject, strip missing columns
916 last_column_names = [column.name for column in table.columns]
917 last_objs = objs[last_column_names]
918 last_objs = _coerce_uint64(last_objs)
919
920 if "lastNonForcedSource" in last_objs.columns:
921 # lastNonForcedSource is defined NOT NULL, fill it with visit
922 # time just in case.
923 last_objs["lastNonForcedSource"].fillna(dt, inplace=True)
924 else:
925 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
926 last_objs.set_index(extra_column.index, inplace=True)
927 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
928
929 with self._timer("insert_time", tags={"table": "DiaObjectLast"}):
930 last_objs.to_sql(
931 table.name,
932 connection,
933 if_exists="append",
934 index=False,
935 schema=table.schema,
936 )
937 else:
938 # truncate existing validity intervals
939 table = self._schema.get_table(ApdbTables.DiaObject)
940
941 update = (
942 table.update()
943 .values(validityEnd=dt)
944 .where(
945 sql.expression.and_(
946 table.columns["diaObjectId"].in_(ids),
947 table.columns["validityEnd"].is_(None),
948 )
949 )
950 )
951
952 with self._timer("truncate_time", tags={"table": table.name}):
953 res = connection.execute(update)
954 _LOG.debug("truncated %s intervals", res.rowcount)
955
956 objs = _coerce_uint64(objs)
957
958 # Fill additional columns
959 extra_columns: list[pandas.Series] = []
960 if "validityStart" in objs.columns:
961 objs["validityStart"] = dt
962 else:
963 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
964 if "validityEnd" in objs.columns:
965 objs["validityEnd"] = None
966 else:
967 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
968 if "lastNonForcedSource" in objs.columns:
969 # lastNonForcedSource is defined NOT NULL, fill it with visit time
970 # just in case.
971 objs["lastNonForcedSource"].fillna(dt, inplace=True)
972 else:
973 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
974 if extra_columns:
975 objs.set_index(extra_columns[0].index, inplace=True)
976 objs = pandas.concat([objs] + extra_columns, axis="columns")
977
978 # Insert replica data
979 table = self._schema.get_table(ApdbTables.DiaObject)
980 replica_data: list[dict] = []
981 replica_stmt: Any = None
982 replica_table_name = ""
983 if replica_chunk is not None:
984 pk_names = [column.name for column in table.primary_key]
985 replica_data = objs[pk_names].to_dict("records")
986 for row in replica_data:
987 row["apdb_replica_chunk"] = replica_chunk.id
988 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
989 replica_table_name = replica_table.name
990 replica_stmt = replica_table.insert()
991
992 # insert new versions
993 with self._timer("insert_time", tags={"table": table.name}):
994 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
995 if replica_stmt is not None:
996 with self._timer("insert_time", tags={"table": replica_table_name}):
997 connection.execute(replica_stmt, replica_data)
998
1000 self,
1001 sources: pandas.DataFrame,
1002 replica_chunk: ReplicaChunk | None,
1003 connection: sqlalchemy.engine.Connection,
1004 ) -> None:
1005 """Store catalog of DiaSources from current visit.
1006
1007 Parameters
1008 ----------
1009 sources : `pandas.DataFrame`
1010 Catalog containing DiaSource records
1011 """
1012 table = self._schema.get_table(ApdbTables.DiaSource)
1013
1014 # Insert replica data
1015 replica_data: list[dict] = []
1016 replica_stmt: Any = None
1017 replica_table_name = ""
1018 if replica_chunk is not None:
1019 pk_names = [column.name for column in table.primary_key]
1020 replica_data = sources[pk_names].to_dict("records")
1021 for row in replica_data:
1022 row["apdb_replica_chunk"] = replica_chunk.id
1023 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1024 replica_table_name = replica_table.name
1025 replica_stmt = replica_table.insert()
1026
1027 # everything to be done in single transaction
1028 with self._timer("insert_time", tags={"table": table.name}):
1029 sources = _coerce_uint64(sources)
1030 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1031 if replica_stmt is not None:
1032 with self._timer("replica_insert_time", tags={"table": replica_table_name}):
1033 connection.execute(replica_stmt, replica_data)
1034
1036 self,
1037 sources: pandas.DataFrame,
1038 replica_chunk: ReplicaChunk | None,
1039 connection: sqlalchemy.engine.Connection,
1040 ) -> None:
1041 """Store a set of DiaForcedSources from current visit.
1042
1043 Parameters
1044 ----------
1045 sources : `pandas.DataFrame`
1046 Catalog containing DiaForcedSource records
1047 """
1048 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1049
1050 # Insert replica data
1051 replica_data: list[dict] = []
1052 replica_stmt: Any = None
1053 replica_table_name = ""
1054 if replica_chunk is not None:
1055 pk_names = [column.name for column in table.primary_key]
1056 replica_data = sources[pk_names].to_dict("records")
1057 for row in replica_data:
1058 row["apdb_replica_chunk"] = replica_chunk.id
1059 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1060 replica_table_name = replica_table.name
1061 replica_stmt = replica_table.insert()
1062
1063 # everything to be done in single transaction
1064 with self._timer("insert_time", tags={"table": table.name}):
1065 sources = _coerce_uint64(sources)
1066 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1067 if replica_stmt is not None:
1068 with self._timer("insert_time", tags={"table": replica_table_name}):
1069 connection.execute(replica_stmt, replica_data)
1070
1071 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1072 """Generate a set of HTM indices covering specified region.
1073
1074 Parameters
1075 ----------
1076 region: `sphgeom.Region`
1077 Region that needs to be indexed.
1078
1079 Returns
1080 -------
1081 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1082 """
1083 _LOG.debug("region: %s", region)
1084 indices = self.pixelator.envelope(region, self.config.htm_max_ranges)
1085
1086 return indices.ranges()
1087
1088 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1089 """Make SQLAlchemy expression for selecting records in a region."""
1090 htm_index_column = table.columns[self.config.htm_index_column]
1091 exprlist = []
1092 pixel_ranges = self._htm_indices(region)
1093 for low, upper in pixel_ranges:
1094 upper -= 1
1095 if low == upper:
1096 exprlist.append(htm_index_column == low)
1097 else:
1098 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1099
1100 return sql.expression.or_(*exprlist)
1101
1102 def _add_obj_htm_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1103 """Calculate HTM index for each record and add it to a DataFrame.
1104
1105 Notes
1106 -----
1107 This overrides any existing column in a DataFrame with the same name
1108 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1109 returned.
1110 """
1111 # calculate HTM index for every DiaObject
1112 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1113 ra_col, dec_col = self.config.ra_dec_columns
1114 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1115 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1116 idx = self.pixelator.index(uv3d)
1117 htm_index[i] = idx
1118 df = df.copy()
1119 df[self.config.htm_index_column] = htm_index
1120 return df
1121
1122 def _add_src_htm_index(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1123 """Add pixelId column to DiaSource catalog.
1124
1125 Notes
1126 -----
1127 This method copies pixelId value from a matching DiaObject record.
1128 DiaObject catalog needs to have a pixelId column filled by
1129 ``_add_obj_htm_index`` method and DiaSource records need to be
1130 associated to DiaObjects via ``diaObjectId`` column.
1131
1132 This overrides any existing column in a DataFrame with the same name
1133 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1134 returned.
1135 """
1136 pixel_id_map: dict[int, int] = {
1137 diaObjectId: pixelId
1138 for diaObjectId, pixelId in zip(objs["diaObjectId"], objs[self.config.htm_index_column])
1139 }
1140 # DiaSources associated with SolarSystemObjects do not have an
1141 # associated DiaObject hence we skip them and set their htmIndex
1142 # value to 0.
1143 pixel_id_map[0] = 0
1144 htm_index = np.zeros(sources.shape[0], dtype=np.int64)
1145 for i, diaObjId in enumerate(sources["diaObjectId"]):
1146 htm_index[i] = pixel_id_map[diaObjId]
1147 sources = sources.copy()
1148 sources[self.config.htm_index_column] = htm_index
1149 return sources
Tag types used to declare specialized field types.
Definition misc.h:31
__init__(self, ApdbSqlConfig config)
Definition apdbSql.py:215
pandas.DataFrame getSSObjects(self)
Definition apdbSql.py:607
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:553
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
Definition apdbSql.py:265
pandas.DataFrame _add_obj_htm_index(self, pandas.DataFrame df)
Definition apdbSql.py:1102
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:567
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1040
pandas.DataFrame getDiaObjects(self, Region region)
Definition apdbSql.py:520
VersionTuple apdbImplementationVersion(cls)
Definition apdbSql.py:346
None _versionCheck(self, ApdbMetadataSql metadata)
Definition apdbSql.py:303
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:853
pandas.DataFrame _add_src_htm_index(self, pandas.DataFrame sources, pandas.DataFrame objs)
Definition apdbSql.py:1122
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:727
ApdbMetadata metadata(self)
Definition apdbSql.py:721
None _makeSchema(cls, ApdbConfig config, bool drop=False)
Definition apdbSql.py:477
bool containsVisitDetector(self, int visit, int detector)
Definition apdbSql.py:590
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:763
None reassignDiaSources(self, Mapping[int, int] idMap)
Definition apdbSql.py:680
list[tuple[int, int]] _htm_indices(self, Region region)
Definition apdbSql.py:1071
dict[str, int] tableRowCount(self)
Definition apdbSql.py:448
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
Definition apdbSql.py:791
ApdbSqlConfig init_database(cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
Definition apdbSql.py:374
ApdbSqlReplica get_replica(self)
Definition apdbSql.py:444
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
Definition apdbSql.py:626
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config)
Definition apdbSql.py:270
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:878
Table|None tableDef(self, ApdbTables table)
Definition apdbSql.py:472
None storeSSObjects(self, pandas.DataFrame objects)
Definition apdbSql.py:648
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1004
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
Definition apdbSql.py:1088
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertySet * set
Definition fits.cc:931
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
Definition apdbSql.py:82
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
Definition apdbSql.py:104
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)
Definition apdbSql.py:74