Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+83433b07ee,g16d25e1f1b+23bc9e47ac,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+c74e83464e,g53246c7159+8c5ae1fdc5,g55585698de+1e04e59700,g56a49b3a55+92a7603e7a,g60b5630c4e+1e04e59700,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+1e04e59700,g989de1cb63+3fc8cb0b9e,g9d31334357+1e04e59700,g9f33ca652e+0a83e03614,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+8b4b9c8ed7,gb89ab40317+3fc8cb0b9e,gc0af124501+57ccba3ad1,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gd794735e4e+7be992507c,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+c7f52e58fe,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+747388abfa,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbSql.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSql"]
27
28import datetime
29import logging
30import urllib.parse
31import warnings
32from collections.abc import Iterable, Mapping, MutableMapping
33from contextlib import closing
34from typing import TYPE_CHECKING, Any, cast
35
36import astropy.time
37import numpy as np
38import pandas
39import sqlalchemy
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
43from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
44from lsst.utils.iteration import chunk_iterable
45from sqlalchemy import func, sql
46from sqlalchemy.pool import NullPool
47
48from ..apdb import Apdb
49from ..apdbConfigFreezer import ApdbConfigFreezer
50from ..apdbReplica import ReplicaChunk
51from ..apdbSchema import ApdbTables
52from ..config import ApdbConfig
53from ..monitor import MonAgent
54from ..schema_model import Table
55from ..timer import Timer
56from ..versionTuple import IncompatibleVersionError, VersionTuple
57from .apdbMetadataSql import ApdbMetadataSql
58from .apdbSqlReplica import ApdbSqlReplica
59from .apdbSqlSchema import ApdbSqlSchema, ExtraTables
60from .config import ApdbSqlConfig
61
62if TYPE_CHECKING:
63 import sqlite3
64
65 from ..apdbMetadata import ApdbMetadata
66
67_LOG = logging.getLogger(__name__)
68
69_MON = MonAgent(__name__)
70
71VERSION = VersionTuple(0, 1, 2)
72"""Version for the code controlling non-replication tables. This needs to be
73updated following compatibility rules when schema produced by this code
74changes.
75"""
76
77
78def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
79 """Change the type of uint64 columns to int64, and return copy of data
80 frame.
81 """
82 names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
83 return df.astype({name: np.int64 for name in names})
84
85
86def _make_midpointMjdTai_start(visit_time: astropy.time.Time, months: int) -> float:
87 """Calculate starting point for time-based source search.
88
89 Parameters
90 ----------
91 visit_time : `astropy.time.Time`
92 Time of current visit.
93 months : `int`
94 Number of months in the sources history.
95
96 Returns
97 -------
98 time : `float`
99 A ``midpointMjdTai`` starting point, MJD time.
100 """
101 # TODO: Use of MJD must be consistent with the code in ap_association
102 # (see DM-31996)
103 return float(visit_time.mjd - months * 30)
104
105
107 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
108) -> None:
109 # Enable foreign keys
110 with closing(dbapiConnection.cursor()) as cursor:
111 cursor.execute("PRAGMA foreign_keys=ON;")
112
113
115 """Implementation of APDB interface based on SQL database.
116
117 The implementation is configured via standard ``pex_config`` mechanism
118 using `ApdbSqlConfig` configuration class. For an example of different
119 configurations check ``config/`` folder.
120
121 Parameters
122 ----------
123 config : `ApdbSqlConfig`
124 Configuration object.
125 """
126
127 metadataSchemaVersionKey = "version:schema"
128 """Name of the metadata key to store schema version number."""
129
130 metadataCodeVersionKey = "version:ApdbSql"
131 """Name of the metadata key to store code version number."""
132
133 metadataReplicaVersionKey = "version:ApdbSqlReplica"
134 """Name of the metadata key to store replica code version number."""
135
136 metadataConfigKey = "config:apdb-sql.json"
137 """Name of the metadata key to store code version number."""
138
139 _frozen_parameters = (
140 "enable_replica",
141 "dia_object_index",
142 "pixelization.htm_level",
143 "pixelization.htm_index_column",
144 "ra_dec_columns",
145 )
146 """Names of the config parameters to be frozen in metadata table."""
147
148 def __init__(self, config: ApdbSqlConfig):
149 self._engine = self._makeEngine(config, create=False)
150
151 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
152 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
153 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
154 self._metadata = ApdbMetadataSql(self._engine, meta_table)
155
156 # Read frozen config from metadata.
157 config_json = self._metadata.get(self.metadataConfigKey)
158 if config_json is not None:
159 # Update config from metadata.
160 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
161 self.config = freezer.update(config, config_json)
162 else:
163 self.config = config
164
166 engine=self._engine,
167 dia_object_index=self.config.dia_object_index,
168 schema_file=self.config.schema_file,
169 schema_name=self.config.schema_name,
170 prefix=self.config.prefix,
171 namespace=self.config.namespace,
172 htm_index_column=self.config.pixelization.htm_index_column,
173 enable_replica=self.config.enable_replica,
174 )
175
176 self._versionCheck(self._metadata)
177
178 self.pixelator = HtmPixelization(self.config.pixelization.htm_level)
179
180 if _LOG.isEnabledFor(logging.DEBUG):
181 _LOG.debug("ApdbSql Configuration: %s", self.config.model_dump())
182
183 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
184 """Create `Timer` instance given its name."""
185 return Timer(name, _MON, tags=tags)
186
187 @classmethod
188 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
189 """Make SQLALchemy engine based on configured parameters.
190
191 Parameters
192 ----------
193 config : `ApdbSqlConfig`
194 Configuration object.
195 create : `bool`
196 Whether to try to create new database file, only relevant for
197 SQLite backend which always creates new files by default.
198 """
199 # engine is reused between multiple processes, make sure that we don't
200 # share connections by disabling pool (by using NullPool class)
201 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
202 conn_args: dict[str, Any] = dict()
203 if not config.connection_config.connection_pool:
204 kw.update(poolclass=NullPool)
205 if config.connection_config.isolation_level is not None:
206 kw.update(isolation_level=config.connection_config.isolation_level)
207 elif config.db_url.startswith("sqlite"):
208 # Use READ_UNCOMMITTED as default value for sqlite.
209 kw.update(isolation_level="READ_UNCOMMITTED")
210 if config.connection_config.connection_timeout is not None:
211 if config.db_url.startswith("sqlite"):
212 conn_args.update(timeout=config.connection_config.connection_timeout)
213 elif config.db_url.startswith(("postgresql", "mysql")):
214 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
215 kw.update(connect_args=conn_args)
216 engine = sqlalchemy.create_engine(cls._connection_url(config.db_url, create=create), **kw)
217
218 if engine.dialect.name == "sqlite":
219 # Need to enable foreign keys on every new connection.
220 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
221
222 return engine
223
224 @classmethod
225 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
226 """Generate a complete URL for database with proper credentials.
227
228 Parameters
229 ----------
230 config_url : `str`
231 Database URL as specified in configuration.
232 create : `bool`
233 Whether to try to create new database file, only relevant for
234 SQLite backend which always creates new files by default.
235
236 Returns
237 -------
238 connection_url : `sqlalchemy.engine.URL` or `str`
239 Connection URL including credentials.
240 """
241 # Allow 3rd party authentication mechanisms by assuming connection
242 # string is correct when we can not recognize (dialect, host, database)
243 # matching keys.
244 components = urllib.parse.urlparse(config_url)
245 if all((components.scheme is not None, components.hostname is not None, components.path is not None)):
246 try:
247 db_auth = DbAuth()
248 config_url = db_auth.getUrl(config_url)
249 except DbAuthNotFoundError:
250 # Credentials file doesn't exist or no matching credentials,
251 # use default auth.
252 pass
253
254 # SQLite has a nasty habit creating empty databases when they do not
255 # exist, tell it not to do that unless we do need to create it.
256 if not create:
257 config_url = cls._update_sqlite_url(config_url)
258
259 return config_url
260
261 @classmethod
262 def _update_sqlite_url(cls, url_string: str) -> str:
263 """If URL refers to sqlite dialect, update it so that the backend does
264 not try to create database file if it does not exist already.
265
266 Parameters
267 ----------
268 url_string : `str`
269 Connection string.
270
271 Returns
272 -------
273 url_string : `str`
274 Possibly updated connection string.
275 """
276 try:
277 url = sqlalchemy.make_url(url_string)
278 except sqlalchemy.exc.SQLAlchemyError:
279 # If parsing fails it means some special format, likely not
280 # sqlite so we just return it unchanged.
281 return url_string
282
283 if url.get_backend_name() == "sqlite":
284 # Massage url so that database name starts with "file:" and
285 # option string has "mode=rw&uri=true". Database name
286 # should look like a path (:memory: is not supported by
287 # Apdb, but someone could still try to use it).
288 database = url.database
289 if database and not database.startswith((":", "file:")):
290 query = dict(url.query, mode="rw", uri="true")
291 # If ``database`` is an absolute path then original URL should
292 # include four slashes after "sqlite:". Humans are bad at
293 # counting things beyond four and sometimes an extra slash gets
294 # added unintentionally, which causes sqlite to treat initial
295 # element as "authority" and to complain. Strip extra slashes
296 # at the start of the path to avoid that (DM-46077).
297 if database.startswith("//"):
298 warnings.warn(
299 f"Database URL contains extra leading slashes which will be removed: {url}",
300 stacklevel=3,
301 )
302 database = "/" + database.lstrip("/")
303 url = url.set(database=f"file:{database}", query=query)
304 url_string = url.render_as_string()
305
306 return url_string
307
308 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
309 """Check schema version compatibility."""
310
311 def _get_version(key: str) -> VersionTuple:
312 """Retrieve version number from given metadata key."""
313 version_str = metadata.get(key)
314 if version_str is None:
315 # Should not happen with existing metadata table.
316 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
317 return VersionTuple.fromString(version_str)
318
319 db_schema_version = _get_version(self.metadataSchemaVersionKey)
320 db_code_version = _get_version(self.metadataCodeVersionKey)
321
322 # For now there is no way to make read-only APDB instances, assume that
323 # any access can do updates.
324 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
326 f"Configured schema version {self._schema.schemaVersion()} "
327 f"is not compatible with database version {db_schema_version}"
328 )
329 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
331 f"Current code version {self.apdbImplementationVersion()} "
332 f"is not compatible with database version {db_code_version}"
333 )
334
335 # Check replica code version only if replica is enabled.
336 if self._schema.replication_enabled:
337 db_replica_version = _get_version(self.metadataReplicaVersionKey)
338 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
339 if not code_replica_version.checkCompatibility(db_replica_version):
341 f"Current replication code version {code_replica_version} "
342 f"is not compatible with database version {db_replica_version}"
343 )
344
345 @classmethod
346 def apdbImplementationVersion(cls) -> VersionTuple:
347 """Return version number for current APDB implementation.
348
349 Returns
350 -------
351 version : `VersionTuple`
352 Version of the code defined in implementation class.
353 """
354 return VERSION
355
356 @classmethod
358 cls,
359 db_url: str,
360 *,
361 schema_file: str | None = None,
362 schema_name: str | None = None,
363 read_sources_months: int | None = None,
364 read_forced_sources_months: int | None = None,
365 enable_replica: bool = False,
366 connection_timeout: int | None = None,
367 dia_object_index: str | None = None,
368 htm_level: int | None = None,
369 htm_index_column: str | None = None,
370 ra_dec_columns: tuple[str, str] | None = None,
371 prefix: str | None = None,
372 namespace: str | None = None,
373 drop: bool = False,
374 ) -> ApdbSqlConfig:
375 """Initialize new APDB instance and make configuration object for it.
376
377 Parameters
378 ----------
379 db_url : `str`
380 SQLAlchemy database URL.
381 schema_file : `str`, optional
382 Location of (YAML) configuration file with APDB schema. If not
383 specified then default location will be used.
384 schema_name : str | None
385 Name of the schema in YAML configuration file. If not specified
386 then default name will be used.
387 read_sources_months : `int`, optional
388 Number of months of history to read from DiaSource.
389 read_forced_sources_months : `int`, optional
390 Number of months of history to read from DiaForcedSource.
391 enable_replica : `bool`
392 If True, make additional tables used for replication to PPDB.
393 connection_timeout : `int`, optional
394 Database connection timeout in seconds.
395 dia_object_index : `str`, optional
396 Indexing mode for DiaObject table.
397 htm_level : `int`, optional
398 HTM indexing level.
399 htm_index_column : `str`, optional
400 Name of a HTM index column for DiaObject and DiaSource tables.
401 ra_dec_columns : `tuple` [`str`, `str`], optional
402 Names of ra/dec columns in DiaObject table.
403 prefix : `str`, optional
404 Optional prefix for all table names.
405 namespace : `str`, optional
406 Name of the database schema for all APDB tables. If not specified
407 then default schema is used.
408 drop : `bool`, optional
409 If `True` then drop existing tables before re-creating the schema.
410
411 Returns
412 -------
413 config : `ApdbSqlConfig`
414 Resulting configuration object for a created APDB instance.
415 """
416 config = ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
417 if schema_file is not None:
418 config.schema_file = schema_file
419 if schema_name is not None:
420 config.schema_name = schema_name
421 if read_sources_months is not None:
422 config.read_sources_months = read_sources_months
423 if read_forced_sources_months is not None:
424 config.read_forced_sources_months = read_forced_sources_months
425 if connection_timeout is not None:
426 config.connection_config.connection_timeout = connection_timeout
427 if dia_object_index is not None:
428 config.dia_object_index = dia_object_index
429 if htm_level is not None:
430 config.pixelization.htm_level = htm_level
431 if htm_index_column is not None:
432 config.pixelization.htm_index_column = htm_index_column
433 if ra_dec_columns is not None:
434 config.ra_dec_columns = ra_dec_columns
435 if prefix is not None:
436 config.prefix = prefix
437 if namespace is not None:
438 config.namespace = namespace
439
440 cls._makeSchema(config, drop=drop)
441
442 # SQLite has a nasty habit of creating empty database by default,
443 # update URL in config file to disable that behavior.
444 config.db_url = cls._update_sqlite_url(config.db_url)
445
446 return config
447
448 def get_replica(self) -> ApdbSqlReplica:
449 """Return `ApdbReplica` instance for this database."""
450 return ApdbSqlReplica(self._schema, self._engine)
451
452 def tableRowCount(self) -> dict[str, int]:
453 """Return dictionary with the table names and row counts.
454
455 Used by ``ap_proto`` to keep track of the size of the database tables.
456 Depending on database technology this could be expensive operation.
457
458 Returns
459 -------
460 row_counts : `dict`
461 Dict where key is a table name and value is a row count.
462 """
463 res = {}
464 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
465 if self.config.dia_object_index == "last_object_table":
466 tables.append(ApdbTables.DiaObjectLast)
467 with self._engine.begin() as conn:
468 for table in tables:
469 sa_table = self._schema.get_table(table)
470 stmt = sql.select(func.count()).select_from(sa_table)
471 count: int = conn.execute(stmt).scalar_one()
472 res[table.name] = count
473
474 return res
475
476 def tableDef(self, table: ApdbTables) -> Table | None:
477 # docstring is inherited from a base class
478 return self._schema.tableSchemas.get(table)
479
480 @classmethod
481 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
482 # docstring is inherited from a base class
483
484 if not isinstance(config, ApdbSqlConfig):
485 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
486
487 engine = cls._makeEngine(config, create=True)
488
489 # Ask schema class to create all tables.
490 schema = ApdbSqlSchema(
491 engine=engine,
492 dia_object_index=config.dia_object_index,
493 schema_file=config.schema_file,
494 schema_name=config.schema_name,
495 prefix=config.prefix,
496 namespace=config.namespace,
497 htm_index_column=config.pixelization.htm_index_column,
498 enable_replica=config.enable_replica,
499 )
500 schema.makeSchema(drop=drop)
501
502 # Need metadata table to store few items in it.
503 meta_table = schema.get_table(ApdbTables.metadata)
504 apdb_meta = ApdbMetadataSql(engine, meta_table)
505
506 # Fill version numbers, overwrite if they are already there.
507 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
508 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
509 if config.enable_replica:
510 # Only store replica code version if replica is enabled.
511 apdb_meta.set(
513 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
514 force=True,
515 )
516
517 # Store frozen part of a configuration in metadata.
518 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
519 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
520
521 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
522 # docstring is inherited from a base class
523
524 # decide what columns we need
525 if self.config.dia_object_index == "last_object_table":
526 table_enum = ApdbTables.DiaObjectLast
527 else:
528 table_enum = ApdbTables.DiaObject
529 table = self._schema.get_table(table_enum)
530 if not self.config.dia_object_columns:
531 columns = self._schema.get_apdb_columns(table_enum)
532 else:
533 columns = [table.c[col] for col in self.config.dia_object_columns]
534 query = sql.select(*columns)
535
536 # build selection
537 query = query.where(self._filterRegion(table, region))
538
539 # select latest version of objects
540 if self.config.dia_object_index != "last_object_table":
541 query = query.where(table.c.validityEnd == None) # noqa: E711
542
543 # _LOG.debug("query: %s", query)
544
545 # execute select
546 with self._timer("select_time", tags={"table": "DiaObject"}) as timer:
547 with self._engine.begin() as conn:
548 objects = pandas.read_sql_query(query, conn)
549 timer.add_values(row_count=len(objects))
550 _LOG.debug("found %s DiaObjects", len(objects))
551 return self._fix_result_timestamps(objects)
552
554 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
555 ) -> pandas.DataFrame | None:
556 # docstring is inherited from a base class
557 if self.config.read_sources_months == 0:
558 _LOG.debug("Skip DiaSources fetching")
559 return None
560
561 if object_ids is None:
562 # region-based select
563 return self._getDiaSourcesInRegion(region, visit_time)
564 else:
565 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
566
568 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
569 ) -> pandas.DataFrame | None:
570 # docstring is inherited from a base class
571 if self.config.read_forced_sources_months == 0:
572 _LOG.debug("Skip DiaForceSources fetching")
573 return None
574
575 if object_ids is None:
576 # This implementation does not support region-based selection.
577 raise NotImplementedError("Region-based selection is not supported")
578
579 # TODO: DateTime.MJD must be consistent with code in ap_association,
580 # alternatively we can fill midpointMjdTai ourselves in store()
581 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
582 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
583
584 with self._timer("select_time", tags={"table": "DiaForcedSource"}) as timer:
585 sources = self._getSourcesByIDs(
586 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
587 )
588 timer.add_values(row_count=len(sources))
589
590 _LOG.debug("found %s DiaForcedSources", len(sources))
591 return sources
592
594 self,
595 visit: int,
596 detector: int,
597 region: Region,
598 visit_time: astropy.time.Time,
599 ) -> bool:
600 # docstring is inherited from a base class
601 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
602 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
603 # Query should load only one leaf page of the index
604 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
605
606 with self._engine.begin() as conn:
607 result = conn.execute(query1).scalar_one_or_none()
608 if result is not None:
609 return True
610 else:
611 # Backup query if an image was processed but had no diaSources
612 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
613 result = conn.execute(query2).scalar_one_or_none()
614 return result is not None
615
616 def getSSObjects(self) -> pandas.DataFrame:
617 # docstring is inherited from a base class
618
619 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
620 query = sql.select(*columns)
621
622 # execute select
623 with self._timer("SSObject_select_time", tags={"table": "SSObject"}) as timer:
624 with self._engine.begin() as conn:
625 objects = pandas.read_sql_query(query, conn)
626 timer.add_values(row_count=len(objects))
627 _LOG.debug("found %s SSObjects", len(objects))
628 return self._fix_result_timestamps(objects)
629
630 def store(
631 self,
632 visit_time: astropy.time.Time,
633 objects: pandas.DataFrame,
634 sources: pandas.DataFrame | None = None,
635 forced_sources: pandas.DataFrame | None = None,
636 ) -> None:
637 # docstring is inherited from a base class
638 objects = self._fix_input_timestamps(objects)
639 if sources is not None:
640 sources = self._fix_input_timestamps(sources)
641 if forced_sources is not None:
642 forced_sources = self._fix_input_timestamps(forced_sources)
643
644 # We want to run all inserts in one transaction.
645 with self._engine.begin() as connection:
646 replica_chunk: ReplicaChunk | None = None
647 if self._schema.replication_enabled:
648 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
649 self._storeReplicaChunk(replica_chunk, visit_time, connection)
650
651 # fill pixelId column for DiaObjects
652 objects = self._add_spatial_index(objects)
653 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
654
655 if sources is not None:
656 # fill pixelId column for DiaSources
657 sources = self._add_spatial_index(sources)
658 self._storeDiaSources(sources, replica_chunk, connection)
659
660 if forced_sources is not None:
661 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
662
663 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
664 # docstring is inherited from a base class
665 objects = self._fix_input_timestamps(objects)
666
667 idColumn = "ssObjectId"
668 table = self._schema.get_table(ApdbTables.SSObject)
669
670 # everything to be done in single transaction
671 with self._engine.begin() as conn:
672 # Find record IDs that already exist. Some types like np.int64 can
673 # cause issues with sqlalchemy, convert them to int.
674 ids = sorted(int(oid) for oid in objects[idColumn])
675
676 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
677 result = conn.execute(query)
678 knownIds = set(row.ssObjectId for row in result)
679
680 filter = objects[idColumn].isin(knownIds)
681 toUpdate = cast(pandas.DataFrame, objects[filter])
682 toInsert = cast(pandas.DataFrame, objects[~filter])
683
684 # insert new records
685 if len(toInsert) > 0:
686 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
687
688 # update existing records
689 if len(toUpdate) > 0:
690 whereKey = f"{idColumn}_param"
691 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
692 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
693 values = toUpdate.to_dict("records")
694 result = conn.execute(update, values)
695
696 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
697 # docstring is inherited from a base class
698
699 table = self._schema.get_table(ApdbTables.DiaSource)
700 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
701
702 with self._engine.begin() as conn:
703 # Need to make sure that every ID exists in the database, but
704 # executemany may not support rowcount, so iterate and check what
705 # is missing.
706 missing_ids: list[int] = []
707 for key, value in idMap.items():
708 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
709 result = conn.execute(query, params)
710 if result.rowcount == 0:
711 missing_ids.append(key)
712 if missing_ids:
713 missing = ",".join(str(item) for item in missing_ids)
714 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
715
716 def dailyJob(self) -> None:
717 # docstring is inherited from a base class
718 pass
719
720 def countUnassociatedObjects(self) -> int:
721 # docstring is inherited from a base class
722
723 # Retrieve the DiaObject table.
724 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
725
726 # Construct the sql statement.
727 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
728 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
729
730 # Return the count.
731 with self._engine.begin() as conn:
732 count = conn.execute(stmt).scalar_one()
733
734 return count
735
736 @property
737 def metadata(self) -> ApdbMetadata:
738 # docstring is inherited from a base class
739 return self._metadata
740
741 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
742 """Return catalog of DiaSource instances from given region.
743
744 Parameters
745 ----------
746 region : `lsst.sphgeom.Region`
747 Region to search for DIASources.
748 visit_time : `astropy.time.Time`
749 Time of the current visit.
750
751 Returns
752 -------
753 catalog : `pandas.DataFrame`
754 Catalog containing DiaSource records.
755 """
756 # TODO: DateTime.MJD must be consistent with code in ap_association,
757 # alternatively we can fill midpointMjdTai ourselves in store()
758 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
759 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
760
761 table = self._schema.get_table(ApdbTables.DiaSource)
762 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
763 query = sql.select(*columns)
764
765 # build selection
766 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
767 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
768 query = query.where(where)
769
770 # execute select
771 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}) as timer:
772 with self._engine.begin() as conn:
773 sources = pandas.read_sql_query(query, conn)
774 timer.add_values(row_counts=len(sources))
775 _LOG.debug("found %s DiaSources", len(sources))
776 return self._fix_result_timestamps(sources)
777
778 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
779 """Return catalog of DiaSource instances given set of DiaObject IDs.
780
781 Parameters
782 ----------
783 object_ids :
784 Collection of DiaObject IDs
785 visit_time : `astropy.time.Time`
786 Time of the current visit.
787
788 Returns
789 -------
790 catalog : `pandas.DataFrame`
791 Catalog contaning DiaSource records.
792 """
793 # TODO: DateTime.MJD must be consistent with code in ap_association,
794 # alternatively we can fill midpointMjdTai ourselves in store()
795 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
796 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
797
798 with self._timer("select_time", tags={"table": "DiaSource"}) as timer:
799 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
800 timer.add_values(row_count=len(sources))
801
802 _LOG.debug("found %s DiaSources", len(sources))
803 return sources
804
806 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
807 ) -> pandas.DataFrame:
808 """Return catalog of DiaSource or DiaForcedSource instances given set
809 of DiaObject IDs.
810
811 Parameters
812 ----------
813 table : `sqlalchemy.schema.Table`
814 Database table.
815 object_ids :
816 Collection of DiaObject IDs
817 midpointMjdTai_start : `float`
818 Earliest midpointMjdTai to retrieve.
819
820 Returns
821 -------
822 catalog : `pandas.DataFrame`
823 Catalog contaning DiaSource records. `None` is returned if
824 ``read_sources_months`` configuration parameter is set to 0 or
825 when ``object_ids`` is empty.
826 """
827 table = self._schema.get_table(table_enum)
828 columns = self._schema.get_apdb_columns(table_enum)
829
830 sources: pandas.DataFrame | None = None
831 if len(object_ids) <= 0:
832 _LOG.debug("ID list is empty, just fetch empty result")
833 query = sql.select(*columns).where(sql.literal(False))
834 with self._engine.begin() as conn:
835 sources = pandas.read_sql_query(query, conn)
836 else:
837 data_frames: list[pandas.DataFrame] = []
838 for ids in chunk_iterable(sorted(object_ids), 1000):
839 query = sql.select(*columns)
840
841 # Some types like np.int64 can cause issues with
842 # sqlalchemy, convert them to int.
843 int_ids = [int(oid) for oid in ids]
844
845 # select by object id
846 query = query.where(
847 sql.expression.and_(
848 table.columns["diaObjectId"].in_(int_ids),
849 table.columns["midpointMjdTai"] > midpointMjdTai_start,
850 )
851 )
852
853 # execute select
854 with self._engine.begin() as conn:
855 data_frames.append(pandas.read_sql_query(query, conn))
856
857 if len(data_frames) == 1:
858 sources = data_frames[0]
859 else:
860 sources = pandas.concat(data_frames)
861 assert sources is not None, "Catalog cannot be None"
862 return self._fix_result_timestamps(sources)
863
865 self,
866 replica_chunk: ReplicaChunk,
867 visit_time: astropy.time.Time,
868 connection: sqlalchemy.engine.Connection,
869 ) -> None:
870 # `visit_time.datetime` returns naive datetime, even though all astropy
871 # times are in UTC. Add UTC timezone to timestampt so that database
872 # can store a correct value.
873 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
874
875 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
876
877 # We need UPSERT which is dialect-specific construct
878 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
879 row = {"apdb_replica_chunk": replica_chunk.id} | values
880 if connection.dialect.name == "sqlite":
881 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
882 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
883 connection.execute(insert_sqlite, row)
884 elif connection.dialect.name == "postgresql":
885 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
886 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
887 connection.execute(insert_pg, row)
888 else:
889 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
890
892 self,
893 objs: pandas.DataFrame,
894 visit_time: astropy.time.Time,
895 replica_chunk: ReplicaChunk | None,
896 connection: sqlalchemy.engine.Connection,
897 ) -> None:
898 """Store catalog of DiaObjects from current visit.
899
900 Parameters
901 ----------
902 objs : `pandas.DataFrame`
903 Catalog with DiaObject records.
904 visit_time : `astropy.time.Time`
905 Time of the visit.
906 replica_chunk : `ReplicaChunk`
907 Insert identifier.
908 """
909 if len(objs) == 0:
910 _LOG.debug("No objects to write to database.")
911 return
912
913 # Some types like np.int64 can cause issues with sqlalchemy, convert
914 # them to int.
915 ids = sorted(int(oid) for oid in objs["diaObjectId"])
916 _LOG.debug("first object ID: %d", ids[0])
917
918 # TODO: Need to verify that we are using correct scale here for
919 # DATETIME representation (see DM-31996).
920 dt = visit_time.datetime
921
922 # everything to be done in single transaction
923 if self.config.dia_object_index == "last_object_table":
924 # Insert and replace all records in LAST table.
925 table = self._schema.get_table(ApdbTables.DiaObjectLast)
926
927 # Drop the previous objects (pandas cannot upsert).
928 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
929
930 with self._timer("delete_time", tags={"table": table.name}) as timer:
931 res = connection.execute(query)
932 timer.add_values(row_count=res.rowcount)
933 _LOG.debug("deleted %s objects", res.rowcount)
934
935 # DiaObjectLast is a subset of DiaObject, strip missing columns
936 last_column_names = [column.name for column in table.columns]
937 last_objs = objs[last_column_names]
938 last_objs = _coerce_uint64(last_objs)
939
940 if "lastNonForcedSource" in last_objs.columns:
941 # lastNonForcedSource is defined NOT NULL, fill it with visit
942 # time just in case.
943 last_objs.fillna({"lastNonForcedSource": dt}, inplace=True)
944 else:
945 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
946 last_objs.set_index(extra_column.index, inplace=True)
947 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
948
949 with self._timer("insert_time", tags={"table": "DiaObjectLast"}) as timer:
950 last_objs.to_sql(
951 table.name,
952 connection,
953 if_exists="append",
954 index=False,
955 schema=table.schema,
956 )
957 timer.add_values(row_count=len(last_objs))
958 else:
959 # truncate existing validity intervals
960 table = self._schema.get_table(ApdbTables.DiaObject)
961
962 update = (
963 table.update()
964 .values(validityEnd=dt)
965 .where(
966 sql.expression.and_(
967 table.columns["diaObjectId"].in_(ids),
968 table.columns["validityEnd"].is_(None),
969 )
970 )
971 )
972
973 with self._timer("truncate_time", tags={"table": table.name}) as timer:
974 res = connection.execute(update)
975 timer.add_values(row_count=res.rowcount)
976 _LOG.debug("truncated %s intervals", res.rowcount)
977
978 objs = _coerce_uint64(objs)
979
980 # Fill additional columns
981 extra_columns: list[pandas.Series] = []
982 if "validityStart" in objs.columns:
983 objs["validityStart"] = dt
984 else:
985 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
986 if "validityEnd" in objs.columns:
987 objs["validityEnd"] = None
988 else:
989 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
990 if "lastNonForcedSource" in objs.columns:
991 # lastNonForcedSource is defined NOT NULL, fill it with visit time
992 # just in case.
993 objs.fillna({"lastNonForcedSource": dt}, inplace=True)
994 else:
995 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
996 if extra_columns:
997 objs.set_index(extra_columns[0].index, inplace=True)
998 objs = pandas.concat([objs] + extra_columns, axis="columns")
999
1000 # Insert replica data
1001 table = self._schema.get_table(ApdbTables.DiaObject)
1002 replica_data: list[dict] = []
1003 replica_stmt: Any = None
1004 replica_table_name = ""
1005 if replica_chunk is not None:
1006 pk_names = [column.name for column in table.primary_key]
1007 replica_data = objs[pk_names].to_dict("records")
1008 for row in replica_data:
1009 row["apdb_replica_chunk"] = replica_chunk.id
1010 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1011 replica_table_name = replica_table.name
1012 replica_stmt = replica_table.insert()
1013
1014 # insert new versions
1015 with self._timer("insert_time", tags={"table": table.name}) as timer:
1016 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1017 timer.add_values(row_count=len(objs))
1018 if replica_stmt is not None:
1019 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1020 connection.execute(replica_stmt, replica_data)
1021 timer.add_values(row_count=len(replica_data))
1022
1024 self,
1025 sources: pandas.DataFrame,
1026 replica_chunk: ReplicaChunk | None,
1027 connection: sqlalchemy.engine.Connection,
1028 ) -> None:
1029 """Store catalog of DiaSources from current visit.
1030
1031 Parameters
1032 ----------
1033 sources : `pandas.DataFrame`
1034 Catalog containing DiaSource records
1035 """
1036 table = self._schema.get_table(ApdbTables.DiaSource)
1037
1038 # Insert replica data
1039 replica_data: list[dict] = []
1040 replica_stmt: Any = None
1041 replica_table_name = ""
1042 if replica_chunk is not None:
1043 pk_names = [column.name for column in table.primary_key]
1044 replica_data = sources[pk_names].to_dict("records")
1045 for row in replica_data:
1046 row["apdb_replica_chunk"] = replica_chunk.id
1047 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1048 replica_table_name = replica_table.name
1049 replica_stmt = replica_table.insert()
1050
1051 # everything to be done in single transaction
1052 with self._timer("insert_time", tags={"table": table.name}) as timer:
1053 sources = _coerce_uint64(sources)
1054 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1055 timer.add_values(row_count=len(sources))
1056 if replica_stmt is not None:
1057 with self._timer("replica_insert_time", tags={"table": replica_table_name}) as timer:
1058 connection.execute(replica_stmt, replica_data)
1059 timer.add_values(row_count=len(replica_data))
1060
1062 self,
1063 sources: pandas.DataFrame,
1064 replica_chunk: ReplicaChunk | None,
1065 connection: sqlalchemy.engine.Connection,
1066 ) -> None:
1067 """Store a set of DiaForcedSources from current visit.
1068
1069 Parameters
1070 ----------
1071 sources : `pandas.DataFrame`
1072 Catalog containing DiaForcedSource records
1073 """
1074 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1075
1076 # Insert replica data
1077 replica_data: list[dict] = []
1078 replica_stmt: Any = None
1079 replica_table_name = ""
1080 if replica_chunk is not None:
1081 pk_names = [column.name for column in table.primary_key]
1082 replica_data = sources[pk_names].to_dict("records")
1083 for row in replica_data:
1084 row["apdb_replica_chunk"] = replica_chunk.id
1085 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1086 replica_table_name = replica_table.name
1087 replica_stmt = replica_table.insert()
1088
1089 # everything to be done in single transaction
1090 with self._timer("insert_time", tags={"table": table.name}) as timer:
1091 sources = _coerce_uint64(sources)
1092 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1093 timer.add_values(row_count=len(sources))
1094 if replica_stmt is not None:
1095 with self._timer("insert_time", tags={"table": replica_table_name}):
1096 connection.execute(replica_stmt, replica_data)
1097 timer.add_values(row_count=len(replica_data))
1098
1099 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1100 """Generate a set of HTM indices covering specified region.
1101
1102 Parameters
1103 ----------
1104 region: `sphgeom.Region`
1105 Region that needs to be indexed.
1106
1107 Returns
1108 -------
1109 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1110 """
1111 _LOG.debug("region: %s", region)
1112 indices = self.pixelator.envelope(region, self.config.pixelization.htm_max_ranges)
1113
1114 return indices.ranges()
1115
1116 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1117 """Make SQLAlchemy expression for selecting records in a region."""
1118 htm_index_column = table.columns[self.config.pixelization.htm_index_column]
1119 exprlist = []
1120 pixel_ranges = self._htm_indices(region)
1121 for low, upper in pixel_ranges:
1122 upper -= 1
1123 if low == upper:
1124 exprlist.append(htm_index_column == low)
1125 else:
1126 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1127
1128 return sql.expression.or_(*exprlist)
1129
1130 def _add_spatial_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1131 """Calculate spatial index for each record and add it to a DataFrame.
1132
1133 Parameters
1134 ----------
1135 df : `pandas.DataFrame`
1136 DataFrame which has to contain ra/dec columns, names of these
1137 columns are defined by configuration ``ra_dec_columns`` field.
1138
1139 Returns
1140 -------
1141 df : `pandas.DataFrame`
1142 DataFrame with ``pixelId`` column which contains pixel index
1143 for ra/dec coordinates.
1144
1145 Notes
1146 -----
1147 This overrides any existing column in a DataFrame with the same name
1148 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1149 returned.
1150 """
1151 # calculate HTM index for every DiaObject
1152 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1153 ra_col, dec_col = self.config.ra_dec_columns
1154 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1155 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1156 idx = self.pixelator.index(uv3d)
1157 htm_index[i] = idx
1158 df = df.copy()
1159 df[self.config.pixelization.htm_index_column] = htm_index
1160 return df
1161
1162 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1163 """Update timestamp columns in input DataFrame to be aware datetime
1164 type in in UTC.
1165
1166 AP pipeline generates naive datetime instances, we want them to be
1167 aware before they go to database. All naive timestamps are assumed to
1168 be in UTC timezone (they should be TAI).
1169 """
1170 # Find all columns with aware non-UTC timestamps and convert to UTC.
1171 columns = [
1172 column
1173 for column, dtype in df.dtypes.items()
1174 if isinstance(dtype, pandas.DatetimeTZDtype) and dtype.tz is not datetime.timezone.utc
1175 ]
1176 for column in columns:
1177 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1178 # Find all columns with naive timestamps and add UTC timezone.
1179 columns = [
1180 column for column, dtype in df.dtypes.items() if pandas.api.types.is_datetime64_dtype(dtype)
1181 ]
1182 for column in columns:
1183 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1184 return df
1185
1186 def _fix_result_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1187 """Update timestamp columns to be naive datetime type in returned
1188 DataFrame.
1189
1190 AP pipeline code expects DataFrames to contain naive datetime columns,
1191 while Postgres queries return timezone-aware type. This method converts
1192 those columns to naive datetime in UTC timezone.
1193 """
1194 # Find all columns with aware timestamps.
1195 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1196 for column in columns:
1197 # tz_convert(None) will convert to UTC and drop timezone.
1198 df[column] = df[column].dt.tz_convert(None)
1199 return df
__init__(self, ApdbSqlConfig config)
Definition apdbSql.py:148
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
Definition apdbSql.py:374
pandas.DataFrame getSSObjects(self)
Definition apdbSql.py:616
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:555
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1186
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:569
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1066
pandas.DataFrame getDiaObjects(self, Region region)
Definition apdbSql.py:521
str _update_sqlite_url(cls, str url_string)
Definition apdbSql.py:262
VersionTuple apdbImplementationVersion(cls)
Definition apdbSql.py:346
None _versionCheck(self, ApdbMetadataSql metadata)
Definition apdbSql.py:308
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:869
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
Definition apdbSql.py:1130
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
Definition apdbSql.py:188
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:741
sqlalchemy.engine.Engine _engine
Definition apdbSql.py:149
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1162
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
Definition apdbSql.py:225
ApdbMetadata metadata(self)
Definition apdbSql.py:737
None _makeSchema(cls, ApdbConfig config, bool drop=False)
Definition apdbSql.py:481
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:778
None reassignDiaSources(self, Mapping[int, int] idMap)
Definition apdbSql.py:696
list[tuple[int, int]] _htm_indices(self, Region region)
Definition apdbSql.py:1099
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:599
dict[str, int] tableRowCount(self)
Definition apdbSql.py:452
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
Definition apdbSql.py:807
ApdbSqlReplica get_replica(self)
Definition apdbSql.py:448
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
Definition apdbSql.py:636
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:897
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Definition apdbSql.py:183
Table|None tableDef(self, ApdbTables table)
Definition apdbSql.py:476
None storeSSObjects(self, pandas.DataFrame objects)
Definition apdbSql.py:663
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1028
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
Definition apdbSql.py:1116
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
Definition apdbSql.py:86
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
Definition apdbSql.py:108
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)
Definition apdbSql.py:78