LSST Applications g0fba68d861+5616995c1c,g1ebb85f214+2420ccdea7,g1fd858c14a+44c57a1f81,g21d47ad084+8e51fce9ac,g262e1987ae+1a7d68eb3b,g2cef7863aa+3bd8df3d95,g35bb328faa+fcb1d3bbc8,g36ff55ed5b+2420ccdea7,g47891489e3+5c6313fe9a,g53246c7159+fcb1d3bbc8,g646c943bdb+dbb9921566,g67b6fd64d1+5c6313fe9a,g6bd32b75b5+2420ccdea7,g74acd417e5+37fc0c974d,g786e29fd12+cf7ec2a62a,g86c591e316+6e13bcb9e9,g87389fa792+1e0a283bba,g89139ef638+5c6313fe9a,g90f42f885a+fce05a46d3,g9125e01d80+fcb1d3bbc8,g93e38de9ac+5345a64125,g95a1e89356+47d08a1cc6,g97be763408+bba861c665,ga9e4eb89a6+85210110a1,gb0b61e0e8e+1f27f70249,gb58c049af0+f03b321e39,gb89ab40317+5c6313fe9a,gc4e39d7843+4e09c98c3d,gd16ba4ae74+5402bcf54a,gd8ff7fe66e+2420ccdea7,gd9a9a58781+fcb1d3bbc8,gdab6d2f7ff+37fc0c974d,gde280f09ee+604b327636,ge278dab8ac+50e2446c94,ge410e46f29+5c6313fe9a,gef3c2e6661+6b480e0fb7,gf67bdafdda+5c6313fe9a,gffca2db377+fcb1d3bbc8,v29.2.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSql.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSql"]
27
28import datetime
29import logging
30import urllib.parse
31import warnings
32from collections.abc import Iterable, Mapping, MutableMapping
33from contextlib import closing, suppress
34from typing import TYPE_CHECKING, Any, cast
35
36import astropy.time
37import numpy as np
38import pandas
39import sqlalchemy
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
43from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
44from lsst.utils.iteration import chunk_iterable
45from sqlalchemy import func, sql
46from sqlalchemy.pool import NullPool
47
48from .._auth import DB_AUTH_ENVVAR, DB_AUTH_PATH
49from ..apdb import Apdb
50from ..apdbConfigFreezer import ApdbConfigFreezer
51from ..apdbReplica import ReplicaChunk
52from ..apdbSchema import ApdbTables
53from ..config import ApdbConfig
54from ..monitor import MonAgent
55from ..schema_model import Table
56from ..timer import Timer
57from ..versionTuple import IncompatibleVersionError, VersionTuple
58from .apdbMetadataSql import ApdbMetadataSql
59from .apdbSqlReplica import ApdbSqlReplica
60from .apdbSqlSchema import ApdbSqlSchema, ExtraTables
61from .config import ApdbSqlConfig
62
63if TYPE_CHECKING:
64 import sqlite3
65
66 from ..apdbMetadata import ApdbMetadata
67
68_LOG = logging.getLogger(__name__)
69
70_MON = MonAgent(__name__)
71
72VERSION = VersionTuple(0, 1, 1)
73"""Version for the code controlling non-replication tables. This needs to be
74updated following compatibility rules when schema produced by this code
75changes.
76"""
77
78
79def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
80 """Change the type of uint64 columns to int64, and return copy of data
81 frame.
82 """
83 names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
84 return df.astype({name: np.int64 for name in names})
85
86
87def _make_midpointMjdTai_start(visit_time: astropy.time.Time, months: int) -> float:
88 """Calculate starting point for time-based source search.
89
90 Parameters
91 ----------
92 visit_time : `astropy.time.Time`
93 Time of current visit.
94 months : `int`
95 Number of months in the sources history.
96
97 Returns
98 -------
99 time : `float`
100 A ``midpointMjdTai`` starting point, MJD time.
101 """
102 # TODO: Use of MJD must be consistent with the code in ap_association
103 # (see DM-31996)
104 return float(visit_time.mjd - months * 30)
105
106
108 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
109) -> None:
110 # Enable foreign keys
111 with closing(dbapiConnection.cursor()) as cursor:
112 cursor.execute("PRAGMA foreign_keys=ON;")
113
114
116 """Implementation of APDB interface based on SQL database.
117
118 The implementation is configured via standard ``pex_config`` mechanism
119 using `ApdbSqlConfig` configuration class. For an example of different
120 configurations check ``config/`` folder.
121
122 Parameters
123 ----------
124 config : `ApdbSqlConfig`
125 Configuration object.
126 """
127
128 metadataSchemaVersionKey = "version:schema"
129 """Name of the metadata key to store schema version number."""
130
131 metadataCodeVersionKey = "version:ApdbSql"
132 """Name of the metadata key to store code version number."""
133
134 metadataReplicaVersionKey = "version:ApdbSqlReplica"
135 """Name of the metadata key to store replica code version number."""
136
137 metadataConfigKey = "config:apdb-sql.json"
138 """Name of the metadata key to store code version number."""
139
140 _frozen_parameters = (
141 "enable_replica",
142 "dia_object_index",
143 "pixelization.htm_level",
144 "pixelization.htm_index_column",
145 "ra_dec_columns",
146 )
147 """Names of the config parameters to be frozen in metadata table."""
148
149 def __init__(self, config: ApdbSqlConfig):
150 self._engine = self._makeEngine(config, create=False)
151
152 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
153 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
154 meta_table: sqlalchemy.schema.Table | None = None
155 with suppress(sqlalchemy.exc.NoSuchTableError):
156 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
157
158 self._metadata = ApdbMetadataSql(self._engine, meta_table)
159
160 # Read frozen config from metadata.
161 config_json = self._metadata.get(self.metadataConfigKey)
162 if config_json is not None:
163 # Update config from metadata.
164 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
165 self.config = freezer.update(config, config_json)
166 else:
167 self.config = config
168
170 engine=self._engine,
171 dia_object_index=self.config.dia_object_index,
172 schema_file=self.config.schema_file,
173 schema_name=self.config.schema_name,
174 prefix=self.config.prefix,
175 namespace=self.config.namespace,
176 htm_index_column=self.config.pixelization.htm_index_column,
177 enable_replica=self.config.enable_replica,
178 )
179
180 if self._metadata.table_exists():
181 self._versionCheck(self._metadata)
182
183 self.pixelator = HtmPixelization(self.config.pixelization.htm_level)
184
185 if _LOG.isEnabledFor(logging.DEBUG):
186 _LOG.debug("ApdbSql Configuration: %s", self.config.model_dump())
187
188 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
189 """Create `Timer` instance given its name."""
190 return Timer(name, _MON, tags=tags)
191
192 @classmethod
193 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
194 """Make SQLALchemy engine based on configured parameters.
195
196 Parameters
197 ----------
198 config : `ApdbSqlConfig`
199 Configuration object.
200 create : `bool`
201 Whether to try to create new database file, only relevant for
202 SQLite backend which always creates new files by default.
203 """
204 # engine is reused between multiple processes, make sure that we don't
205 # share connections by disabling pool (by using NullPool class)
206 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
207 conn_args: dict[str, Any] = dict()
208 if not config.connection_config.connection_pool:
209 kw.update(poolclass=NullPool)
210 if config.connection_config.isolation_level is not None:
211 kw.update(isolation_level=config.connection_config.isolation_level)
212 elif config.db_url.startswith("sqlite"):
213 # Use READ_UNCOMMITTED as default value for sqlite.
214 kw.update(isolation_level="READ_UNCOMMITTED")
215 if config.connection_config.connection_timeout is not None:
216 if config.db_url.startswith("sqlite"):
217 conn_args.update(timeout=config.connection_config.connection_timeout)
218 elif config.db_url.startswith(("postgresql", "mysql")):
219 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
220 kw.update(connect_args=conn_args)
221 engine = sqlalchemy.create_engine(cls._connection_url(config.db_url, create=create), **kw)
222
223 if engine.dialect.name == "sqlite":
224 # Need to enable foreign keys on every new connection.
225 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
226
227 return engine
228
229 @classmethod
230 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
231 """Generate a complete URL for database with proper credentials.
232
233 Parameters
234 ----------
235 config_url : `str`
236 Database URL as specified in configuration.
237 create : `bool`
238 Whether to try to create new database file, only relevant for
239 SQLite backend which always creates new files by default.
240
241 Returns
242 -------
243 connection_url : `sqlalchemy.engine.URL` or `str`
244 Connection URL including credentials.
245 """
246 # Allow 3rd party authentication mechanisms by assuming connection
247 # string is correct when we can not recognize (dialect, host, database)
248 # matching keys.
249 components = urllib.parse.urlparse(config_url)
250 if all((components.scheme is not None, components.hostname is not None, components.path is not None)):
251 try:
252 db_auth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
253 config_url = db_auth.getUrl(config_url)
254 except DbAuthNotFoundError:
255 # Credentials file doesn't exist or no matching credentials,
256 # use default auth.
257 pass
258
259 # SQLite has a nasty habit creating empty databases when they do not
260 # exist, tell it not to do that unless we do need to create it.
261 if not create:
262 config_url = cls._update_sqlite_url(config_url)
263
264 return config_url
265
266 @classmethod
267 def _update_sqlite_url(cls, url_string: str) -> str:
268 """If URL refers to sqlite dialect, update it so that the backend does
269 not try to create database file if it does not exist already.
270
271 Parameters
272 ----------
273 url_string : `str`
274 Connection string.
275
276 Returns
277 -------
278 url_string : `str`
279 Possibly updated connection string.
280 """
281 try:
282 url = sqlalchemy.make_url(url_string)
283 except sqlalchemy.exc.SQLAlchemyError:
284 # If parsing fails it means some special format, likely not
285 # sqlite so we just return it unchanged.
286 return url_string
287
288 if url.get_backend_name() == "sqlite":
289 # Massage url so that database name starts with "file:" and
290 # option string has "mode=rw&uri=true". Database name
291 # should look like a path (:memory: is not supported by
292 # Apdb, but someone could still try to use it).
293 database = url.database
294 if database and not database.startswith((":", "file:")):
295 query = dict(url.query, mode="rw", uri="true")
296 # If ``database`` is an absolute path then original URL should
297 # include four slashes after "sqlite:". Humans are bad at
298 # counting things beyond four and sometimes an extra slash gets
299 # added unintentionally, which causes sqlite to treat initial
300 # element as "authority" and to complain. Strip extra slashes
301 # at the start of the path to avoid that (DM-46077).
302 if database.startswith("//"):
303 warnings.warn(
304 f"Database URL contains extra leading slashes which will be removed: {url}",
305 stacklevel=3,
306 )
307 database = "/" + database.lstrip("/")
308 url = url.set(database=f"file:{database}", query=query)
309 url_string = url.render_as_string()
310
311 return url_string
312
313 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
314 """Check schema version compatibility."""
315
316 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
317 """Retrieve version number from given metadata key."""
318 if metadata.table_exists():
319 version_str = metadata.get(key)
320 if version_str is None:
321 # Should not happen with existing metadata table.
322 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
323 return VersionTuple.fromString(version_str)
324 return default
325
326 # For old databases where metadata table does not exist we assume that
327 # version of both code and schema is 0.1.0.
328 initial_version = VersionTuple(0, 1, 0)
329 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
330 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
331
332 # For now there is no way to make read-only APDB instances, assume that
333 # any access can do updates.
334 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
336 f"Configured schema version {self._schema.schemaVersion()} "
337 f"is not compatible with database version {db_schema_version}"
338 )
339 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
341 f"Current code version {self.apdbImplementationVersion()} "
342 f"is not compatible with database version {db_code_version}"
343 )
344
345 # Check replica code version only if replica is enabled.
346 if self._schema.has_replica_chunks:
347 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
348 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
349 if not code_replica_version.checkCompatibility(db_replica_version):
351 f"Current replication code version {code_replica_version} "
352 f"is not compatible with database version {db_replica_version}"
353 )
354
355 @classmethod
356 def apdbImplementationVersion(cls) -> VersionTuple:
357 """Return version number for current APDB implementation.
358
359 Returns
360 -------
361 version : `VersionTuple`
362 Version of the code defined in implementation class.
363 """
364 return VERSION
365
366 @classmethod
368 cls,
369 db_url: str,
370 *,
371 schema_file: str | None = None,
372 schema_name: str | None = None,
373 read_sources_months: int | None = None,
374 read_forced_sources_months: int | None = None,
375 enable_replica: bool = False,
376 connection_timeout: int | None = None,
377 dia_object_index: str | None = None,
378 htm_level: int | None = None,
379 htm_index_column: str | None = None,
380 ra_dec_columns: tuple[str, str] | None = None,
381 prefix: str | None = None,
382 namespace: str | None = None,
383 drop: bool = False,
384 ) -> ApdbSqlConfig:
385 """Initialize new APDB instance and make configuration object for it.
386
387 Parameters
388 ----------
389 db_url : `str`
390 SQLAlchemy database URL.
391 schema_file : `str`, optional
392 Location of (YAML) configuration file with APDB schema. If not
393 specified then default location will be used.
394 schema_name : str | None
395 Name of the schema in YAML configuration file. If not specified
396 then default name will be used.
397 read_sources_months : `int`, optional
398 Number of months of history to read from DiaSource.
399 read_forced_sources_months : `int`, optional
400 Number of months of history to read from DiaForcedSource.
401 enable_replica : `bool`
402 If True, make additional tables used for replication to PPDB.
403 connection_timeout : `int`, optional
404 Database connection timeout in seconds.
405 dia_object_index : `str`, optional
406 Indexing mode for DiaObject table.
407 htm_level : `int`, optional
408 HTM indexing level.
409 htm_index_column : `str`, optional
410 Name of a HTM index column for DiaObject and DiaSource tables.
411 ra_dec_columns : `tuple` [`str`, `str`], optional
412 Names of ra/dec columns in DiaObject table.
413 prefix : `str`, optional
414 Optional prefix for all table names.
415 namespace : `str`, optional
416 Name of the database schema for all APDB tables. If not specified
417 then default schema is used.
418 drop : `bool`, optional
419 If `True` then drop existing tables before re-creating the schema.
420
421 Returns
422 -------
423 config : `ApdbSqlConfig`
424 Resulting configuration object for a created APDB instance.
425 """
426 config = ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
427 if schema_file is not None:
428 config.schema_file = schema_file
429 if schema_name is not None:
430 config.schema_name = schema_name
431 if read_sources_months is not None:
432 config.read_sources_months = read_sources_months
433 if read_forced_sources_months is not None:
434 config.read_forced_sources_months = read_forced_sources_months
435 if connection_timeout is not None:
436 config.connection_config.connection_timeout = connection_timeout
437 if dia_object_index is not None:
438 config.dia_object_index = dia_object_index
439 if htm_level is not None:
440 config.pixelization.htm_level = htm_level
441 if htm_index_column is not None:
442 config.pixelization.htm_index_column = htm_index_column
443 if ra_dec_columns is not None:
444 config.ra_dec_columns = ra_dec_columns
445 if prefix is not None:
446 config.prefix = prefix
447 if namespace is not None:
448 config.namespace = namespace
449
450 cls._makeSchema(config, drop=drop)
451
452 # SQLite has a nasty habit of creating empty database by default,
453 # update URL in config file to disable that behavior.
454 config.db_url = cls._update_sqlite_url(config.db_url)
455
456 return config
457
458 def get_replica(self) -> ApdbSqlReplica:
459 """Return `ApdbReplica` instance for this database."""
460 return ApdbSqlReplica(self._schema, self._engine)
461
462 def tableRowCount(self) -> dict[str, int]:
463 """Return dictionary with the table names and row counts.
464
465 Used by ``ap_proto`` to keep track of the size of the database tables.
466 Depending on database technology this could be expensive operation.
467
468 Returns
469 -------
470 row_counts : `dict`
471 Dict where key is a table name and value is a row count.
472 """
473 res = {}
474 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
475 if self.config.dia_object_index == "last_object_table":
476 tables.append(ApdbTables.DiaObjectLast)
477 with self._engine.begin() as conn:
478 for table in tables:
479 sa_table = self._schema.get_table(table)
480 stmt = sql.select(func.count()).select_from(sa_table)
481 count: int = conn.execute(stmt).scalar_one()
482 res[table.name] = count
483
484 return res
485
486 def tableDef(self, table: ApdbTables) -> Table | None:
487 # docstring is inherited from a base class
488 return self._schema.tableSchemas.get(table)
489
490 @classmethod
491 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
492 # docstring is inherited from a base class
493
494 if not isinstance(config, ApdbSqlConfig):
495 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
496
497 engine = cls._makeEngine(config, create=True)
498
499 # Ask schema class to create all tables.
500 schema = ApdbSqlSchema(
501 engine=engine,
502 dia_object_index=config.dia_object_index,
503 schema_file=config.schema_file,
504 schema_name=config.schema_name,
505 prefix=config.prefix,
506 namespace=config.namespace,
507 htm_index_column=config.pixelization.htm_index_column,
508 enable_replica=config.enable_replica,
509 )
510 schema.makeSchema(drop=drop)
511
512 # Need metadata table to store few items in it, if table exists.
513 meta_table: sqlalchemy.schema.Table | None = None
514 with suppress(ValueError):
515 meta_table = schema.get_table(ApdbTables.metadata)
516
517 apdb_meta = ApdbMetadataSql(engine, meta_table)
518 if apdb_meta.table_exists():
519 # Fill version numbers, overwrite if they are already there.
520 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
521 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
522 if config.enable_replica:
523 # Only store replica code version if replica is enabled.
524 apdb_meta.set(
526 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
527 force=True,
528 )
529
530 # Store frozen part of a configuration in metadata.
531 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
532 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
533
534 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
535 # docstring is inherited from a base class
536
537 # decide what columns we need
538 if self.config.dia_object_index == "last_object_table":
539 table_enum = ApdbTables.DiaObjectLast
540 else:
541 table_enum = ApdbTables.DiaObject
542 table = self._schema.get_table(table_enum)
543 if not self.config.dia_object_columns:
544 columns = self._schema.get_apdb_columns(table_enum)
545 else:
546 columns = [table.c[col] for col in self.config.dia_object_columns]
547 query = sql.select(*columns)
548
549 # build selection
550 query = query.where(self._filterRegion(table, region))
551
552 # select latest version of objects
553 if self.config.dia_object_index != "last_object_table":
554 query = query.where(table.c.validityEnd == None) # noqa: E711
555
556 # _LOG.debug("query: %s", query)
557
558 # execute select
559 with self._timer("select_time", tags={"table": "DiaObject"}) as timer:
560 with self._engine.begin() as conn:
561 objects = pandas.read_sql_query(query, conn)
562 timer.add_values(row_count=len(objects))
563 _LOG.debug("found %s DiaObjects", len(objects))
564 return self._fix_result_timestamps(objects)
565
567 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
568 ) -> pandas.DataFrame | None:
569 # docstring is inherited from a base class
570 if self.config.read_sources_months == 0:
571 _LOG.debug("Skip DiaSources fetching")
572 return None
573
574 if object_ids is None:
575 # region-based select
576 return self._getDiaSourcesInRegion(region, visit_time)
577 else:
578 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
579
581 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
582 ) -> pandas.DataFrame | None:
583 # docstring is inherited from a base class
584 if self.config.read_forced_sources_months == 0:
585 _LOG.debug("Skip DiaForceSources fetching")
586 return None
587
588 if object_ids is None:
589 # This implementation does not support region-based selection.
590 raise NotImplementedError("Region-based selection is not supported")
591
592 # TODO: DateTime.MJD must be consistent with code in ap_association,
593 # alternatively we can fill midpointMjdTai ourselves in store()
594 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
595 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
596
597 with self._timer("select_time", tags={"table": "DiaForcedSource"}) as timer:
598 sources = self._getSourcesByIDs(
599 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
600 )
601 timer.add_values(row_count=len(sources))
602
603 _LOG.debug("found %s DiaForcedSources", len(sources))
604 return sources
605
606 def containsVisitDetector(self, visit: int, detector: int) -> bool:
607 # docstring is inherited from a base class
608 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
609 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
610 # Query should load only one leaf page of the index
611 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
612
613 with self._engine.begin() as conn:
614 result = conn.execute(query1).scalar_one_or_none()
615 if result is not None:
616 return True
617 else:
618 # Backup query if an image was processed but had no diaSources
619 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
620 result = conn.execute(query2).scalar_one_or_none()
621 return result is not None
622
623 def getSSObjects(self) -> pandas.DataFrame:
624 # docstring is inherited from a base class
625
626 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
627 query = sql.select(*columns)
628
629 # execute select
630 with self._timer("SSObject_select_time", tags={"table": "SSObject"}) as timer:
631 with self._engine.begin() as conn:
632 objects = pandas.read_sql_query(query, conn)
633 timer.add_values(row_count=len(objects))
634 _LOG.debug("found %s SSObjects", len(objects))
635 return self._fix_result_timestamps(objects)
636
637 def store(
638 self,
639 visit_time: astropy.time.Time,
640 objects: pandas.DataFrame,
641 sources: pandas.DataFrame | None = None,
642 forced_sources: pandas.DataFrame | None = None,
643 ) -> None:
644 # docstring is inherited from a base class
645 objects = self._fix_input_timestamps(objects)
646 if sources is not None:
647 sources = self._fix_input_timestamps(sources)
648 if forced_sources is not None:
649 forced_sources = self._fix_input_timestamps(forced_sources)
650
651 # We want to run all inserts in one transaction.
652 with self._engine.begin() as connection:
653 replica_chunk: ReplicaChunk | None = None
654 if self._schema.has_replica_chunks:
655 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
656 self._storeReplicaChunk(replica_chunk, visit_time, connection)
657
658 # fill pixelId column for DiaObjects
659 objects = self._add_spatial_index(objects)
660 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
661
662 if sources is not None:
663 # fill pixelId column for DiaSources
664 sources = self._add_spatial_index(sources)
665 self._storeDiaSources(sources, replica_chunk, connection)
666
667 if forced_sources is not None:
668 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
669
670 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
671 # docstring is inherited from a base class
672 objects = self._fix_input_timestamps(objects)
673
674 idColumn = "ssObjectId"
675 table = self._schema.get_table(ApdbTables.SSObject)
676
677 # everything to be done in single transaction
678 with self._engine.begin() as conn:
679 # Find record IDs that already exist. Some types like np.int64 can
680 # cause issues with sqlalchemy, convert them to int.
681 ids = sorted(int(oid) for oid in objects[idColumn])
682
683 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
684 result = conn.execute(query)
685 knownIds = set(row.ssObjectId for row in result)
686
687 filter = objects[idColumn].isin(knownIds)
688 toUpdate = cast(pandas.DataFrame, objects[filter])
689 toInsert = cast(pandas.DataFrame, objects[~filter])
690
691 # insert new records
692 if len(toInsert) > 0:
693 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
694
695 # update existing records
696 if len(toUpdate) > 0:
697 whereKey = f"{idColumn}_param"
698 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
699 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
700 values = toUpdate.to_dict("records")
701 result = conn.execute(update, values)
702
703 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
704 # docstring is inherited from a base class
705
706 table = self._schema.get_table(ApdbTables.DiaSource)
707 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
708
709 with self._engine.begin() as conn:
710 # Need to make sure that every ID exists in the database, but
711 # executemany may not support rowcount, so iterate and check what
712 # is missing.
713 missing_ids: list[int] = []
714 for key, value in idMap.items():
715 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
716 result = conn.execute(query, params)
717 if result.rowcount == 0:
718 missing_ids.append(key)
719 if missing_ids:
720 missing = ",".join(str(item) for item in missing_ids)
721 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
722
723 def dailyJob(self) -> None:
724 # docstring is inherited from a base class
725 pass
726
727 def countUnassociatedObjects(self) -> int:
728 # docstring is inherited from a base class
729
730 # Retrieve the DiaObject table.
731 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
732
733 # Construct the sql statement.
734 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
735 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
736
737 # Return the count.
738 with self._engine.begin() as conn:
739 count = conn.execute(stmt).scalar_one()
740
741 return count
742
743 @property
744 def metadata(self) -> ApdbMetadata:
745 # docstring is inherited from a base class
746 if self._metadata is None:
747 raise RuntimeError("Database schema was not initialized.")
748 return self._metadata
749
750 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
751 """Return catalog of DiaSource instances from given region.
752
753 Parameters
754 ----------
755 region : `lsst.sphgeom.Region`
756 Region to search for DIASources.
757 visit_time : `astropy.time.Time`
758 Time of the current visit.
759
760 Returns
761 -------
762 catalog : `pandas.DataFrame`
763 Catalog containing DiaSource records.
764 """
765 # TODO: DateTime.MJD must be consistent with code in ap_association,
766 # alternatively we can fill midpointMjdTai ourselves in store()
767 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
768 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
769
770 table = self._schema.get_table(ApdbTables.DiaSource)
771 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
772 query = sql.select(*columns)
773
774 # build selection
775 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
776 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
777 query = query.where(where)
778
779 # execute select
780 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}) as timer:
781 with self._engine.begin() as conn:
782 sources = pandas.read_sql_query(query, conn)
783 timer.add_values(row_counts=len(sources))
784 _LOG.debug("found %s DiaSources", len(sources))
785 return self._fix_result_timestamps(sources)
786
787 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
788 """Return catalog of DiaSource instances given set of DiaObject IDs.
789
790 Parameters
791 ----------
792 object_ids :
793 Collection of DiaObject IDs
794 visit_time : `astropy.time.Time`
795 Time of the current visit.
796
797 Returns
798 -------
799 catalog : `pandas.DataFrame`
800 Catalog contaning DiaSource records.
801 """
802 # TODO: DateTime.MJD must be consistent with code in ap_association,
803 # alternatively we can fill midpointMjdTai ourselves in store()
804 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
805 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
806
807 with self._timer("select_time", tags={"table": "DiaSource"}) as timer:
808 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
809 timer.add_values(row_count=len(sources))
810
811 _LOG.debug("found %s DiaSources", len(sources))
812 return sources
813
815 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
816 ) -> pandas.DataFrame:
817 """Return catalog of DiaSource or DiaForcedSource instances given set
818 of DiaObject IDs.
819
820 Parameters
821 ----------
822 table : `sqlalchemy.schema.Table`
823 Database table.
824 object_ids :
825 Collection of DiaObject IDs
826 midpointMjdTai_start : `float`
827 Earliest midpointMjdTai to retrieve.
828
829 Returns
830 -------
831 catalog : `pandas.DataFrame`
832 Catalog contaning DiaSource records. `None` is returned if
833 ``read_sources_months`` configuration parameter is set to 0 or
834 when ``object_ids`` is empty.
835 """
836 table = self._schema.get_table(table_enum)
837 columns = self._schema.get_apdb_columns(table_enum)
838
839 sources: pandas.DataFrame | None = None
840 if len(object_ids) <= 0:
841 _LOG.debug("ID list is empty, just fetch empty result")
842 query = sql.select(*columns).where(sql.literal(False))
843 with self._engine.begin() as conn:
844 sources = pandas.read_sql_query(query, conn)
845 else:
846 data_frames: list[pandas.DataFrame] = []
847 for ids in chunk_iterable(sorted(object_ids), 1000):
848 query = sql.select(*columns)
849
850 # Some types like np.int64 can cause issues with
851 # sqlalchemy, convert them to int.
852 int_ids = [int(oid) for oid in ids]
853
854 # select by object id
855 query = query.where(
856 sql.expression.and_(
857 table.columns["diaObjectId"].in_(int_ids),
858 table.columns["midpointMjdTai"] > midpointMjdTai_start,
859 )
860 )
861
862 # execute select
863 with self._engine.begin() as conn:
864 data_frames.append(pandas.read_sql_query(query, conn))
865
866 if len(data_frames) == 1:
867 sources = data_frames[0]
868 else:
869 sources = pandas.concat(data_frames)
870 assert sources is not None, "Catalog cannot be None"
871 return self._fix_result_timestamps(sources)
872
874 self,
875 replica_chunk: ReplicaChunk,
876 visit_time: astropy.time.Time,
877 connection: sqlalchemy.engine.Connection,
878 ) -> None:
879 # `visit_time.datetime` returns naive datetime, even though all astropy
880 # times are in UTC. Add UTC timezone to timestampt so that database
881 # can store a correct value.
882 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
883
884 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
885
886 # We need UPSERT which is dialect-specific construct
887 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
888 row = {"apdb_replica_chunk": replica_chunk.id} | values
889 if connection.dialect.name == "sqlite":
890 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
891 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
892 connection.execute(insert_sqlite, row)
893 elif connection.dialect.name == "postgresql":
894 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
895 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
896 connection.execute(insert_pg, row)
897 else:
898 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
899
901 self,
902 objs: pandas.DataFrame,
903 visit_time: astropy.time.Time,
904 replica_chunk: ReplicaChunk | None,
905 connection: sqlalchemy.engine.Connection,
906 ) -> None:
907 """Store catalog of DiaObjects from current visit.
908
909 Parameters
910 ----------
911 objs : `pandas.DataFrame`
912 Catalog with DiaObject records.
913 visit_time : `astropy.time.Time`
914 Time of the visit.
915 replica_chunk : `ReplicaChunk`
916 Insert identifier.
917 """
918 if len(objs) == 0:
919 _LOG.debug("No objects to write to database.")
920 return
921
922 # Some types like np.int64 can cause issues with sqlalchemy, convert
923 # them to int.
924 ids = sorted(int(oid) for oid in objs["diaObjectId"])
925 _LOG.debug("first object ID: %d", ids[0])
926
927 # TODO: Need to verify that we are using correct scale here for
928 # DATETIME representation (see DM-31996).
929 dt = visit_time.datetime
930
931 # everything to be done in single transaction
932 if self.config.dia_object_index == "last_object_table":
933 # Insert and replace all records in LAST table.
934 table = self._schema.get_table(ApdbTables.DiaObjectLast)
935
936 # Drop the previous objects (pandas cannot upsert).
937 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
938
939 with self._timer("delete_time", tags={"table": table.name}) as timer:
940 res = connection.execute(query)
941 timer.add_values(row_count=res.rowcount)
942 _LOG.debug("deleted %s objects", res.rowcount)
943
944 # DiaObjectLast is a subset of DiaObject, strip missing columns
945 last_column_names = [column.name for column in table.columns]
946 last_objs = objs[last_column_names]
947 last_objs = _coerce_uint64(last_objs)
948
949 if "lastNonForcedSource" in last_objs.columns:
950 # lastNonForcedSource is defined NOT NULL, fill it with visit
951 # time just in case.
952 last_objs.fillna({"lastNonForcedSource": dt}, inplace=True)
953 else:
954 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
955 last_objs.set_index(extra_column.index, inplace=True)
956 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
957
958 with self._timer("insert_time", tags={"table": "DiaObjectLast"}) as timer:
959 last_objs.to_sql(
960 table.name,
961 connection,
962 if_exists="append",
963 index=False,
964 schema=table.schema,
965 )
966 timer.add_values(row_count=len(last_objs))
967 else:
968 # truncate existing validity intervals
969 table = self._schema.get_table(ApdbTables.DiaObject)
970
971 update = (
972 table.update()
973 .values(validityEnd=dt)
974 .where(
975 sql.expression.and_(
976 table.columns["diaObjectId"].in_(ids),
977 table.columns["validityEnd"].is_(None),
978 )
979 )
980 )
981
982 with self._timer("truncate_time", tags={"table": table.name}) as timer:
983 res = connection.execute(update)
984 timer.add_values(row_count=res.rowcount)
985 _LOG.debug("truncated %s intervals", res.rowcount)
986
987 objs = _coerce_uint64(objs)
988
989 # Fill additional columns
990 extra_columns: list[pandas.Series] = []
991 if "validityStart" in objs.columns:
992 objs["validityStart"] = dt
993 else:
994 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
995 if "validityEnd" in objs.columns:
996 objs["validityEnd"] = None
997 else:
998 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
999 if "lastNonForcedSource" in objs.columns:
1000 # lastNonForcedSource is defined NOT NULL, fill it with visit time
1001 # just in case.
1002 objs.fillna({"lastNonForcedSource": dt}, inplace=True)
1003 else:
1004 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
1005 if extra_columns:
1006 objs.set_index(extra_columns[0].index, inplace=True)
1007 objs = pandas.concat([objs] + extra_columns, axis="columns")
1008
1009 # Insert replica data
1010 table = self._schema.get_table(ApdbTables.DiaObject)
1011 replica_data: list[dict] = []
1012 replica_stmt: Any = None
1013 replica_table_name = ""
1014 if replica_chunk is not None:
1015 pk_names = [column.name for column in table.primary_key]
1016 replica_data = objs[pk_names].to_dict("records")
1017 for row in replica_data:
1018 row["apdb_replica_chunk"] = replica_chunk.id
1019 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1020 replica_table_name = replica_table.name
1021 replica_stmt = replica_table.insert()
1022
1023 # insert new versions
1024 with self._timer("insert_time", tags={"table": table.name}) as timer:
1025 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1026 timer.add_values(row_count=len(objs))
1027 if replica_stmt is not None:
1028 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1029 connection.execute(replica_stmt, replica_data)
1030 timer.add_values(row_count=len(replica_data))
1031
1033 self,
1034 sources: pandas.DataFrame,
1035 replica_chunk: ReplicaChunk | None,
1036 connection: sqlalchemy.engine.Connection,
1037 ) -> None:
1038 """Store catalog of DiaSources from current visit.
1039
1040 Parameters
1041 ----------
1042 sources : `pandas.DataFrame`
1043 Catalog containing DiaSource records
1044 """
1045 table = self._schema.get_table(ApdbTables.DiaSource)
1046
1047 # Insert replica data
1048 replica_data: list[dict] = []
1049 replica_stmt: Any = None
1050 replica_table_name = ""
1051 if replica_chunk is not None:
1052 pk_names = [column.name for column in table.primary_key]
1053 replica_data = sources[pk_names].to_dict("records")
1054 for row in replica_data:
1055 row["apdb_replica_chunk"] = replica_chunk.id
1056 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1057 replica_table_name = replica_table.name
1058 replica_stmt = replica_table.insert()
1059
1060 # everything to be done in single transaction
1061 with self._timer("insert_time", tags={"table": table.name}) as timer:
1062 sources = _coerce_uint64(sources)
1063 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1064 timer.add_values(row_count=len(sources))
1065 if replica_stmt is not None:
1066 with self._timer("replica_insert_time", tags={"table": replica_table_name}) as timer:
1067 connection.execute(replica_stmt, replica_data)
1068 timer.add_values(row_count=len(replica_data))
1069
1071 self,
1072 sources: pandas.DataFrame,
1073 replica_chunk: ReplicaChunk | None,
1074 connection: sqlalchemy.engine.Connection,
1075 ) -> None:
1076 """Store a set of DiaForcedSources from current visit.
1077
1078 Parameters
1079 ----------
1080 sources : `pandas.DataFrame`
1081 Catalog containing DiaForcedSource records
1082 """
1083 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1084
1085 # Insert replica data
1086 replica_data: list[dict] = []
1087 replica_stmt: Any = None
1088 replica_table_name = ""
1089 if replica_chunk is not None:
1090 pk_names = [column.name for column in table.primary_key]
1091 replica_data = sources[pk_names].to_dict("records")
1092 for row in replica_data:
1093 row["apdb_replica_chunk"] = replica_chunk.id
1094 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1095 replica_table_name = replica_table.name
1096 replica_stmt = replica_table.insert()
1097
1098 # everything to be done in single transaction
1099 with self._timer("insert_time", tags={"table": table.name}) as timer:
1100 sources = _coerce_uint64(sources)
1101 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1102 timer.add_values(row_count=len(sources))
1103 if replica_stmt is not None:
1104 with self._timer("insert_time", tags={"table": replica_table_name}):
1105 connection.execute(replica_stmt, replica_data)
1106 timer.add_values(row_count=len(replica_data))
1107
1108 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1109 """Generate a set of HTM indices covering specified region.
1110
1111 Parameters
1112 ----------
1113 region: `sphgeom.Region`
1114 Region that needs to be indexed.
1115
1116 Returns
1117 -------
1118 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1119 """
1120 _LOG.debug("region: %s", region)
1121 indices = self.pixelator.envelope(region, self.config.pixelization.htm_max_ranges)
1122
1123 return indices.ranges()
1124
1125 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1126 """Make SQLAlchemy expression for selecting records in a region."""
1127 htm_index_column = table.columns[self.config.pixelization.htm_index_column]
1128 exprlist = []
1129 pixel_ranges = self._htm_indices(region)
1130 for low, upper in pixel_ranges:
1131 upper -= 1
1132 if low == upper:
1133 exprlist.append(htm_index_column == low)
1134 else:
1135 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1136
1137 return sql.expression.or_(*exprlist)
1138
1139 def _add_spatial_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1140 """Calculate spatial index for each record and add it to a DataFrame.
1141
1142 Parameters
1143 ----------
1144 df : `pandas.DataFrame`
1145 DataFrame which has to contain ra/dec columns, names of these
1146 columns are defined by configuration ``ra_dec_columns`` field.
1147
1148 Returns
1149 -------
1150 df : `pandas.DataFrame`
1151 DataFrame with ``pixelId`` column which contains pixel index
1152 for ra/dec coordinates.
1153
1154 Notes
1155 -----
1156 This overrides any existing column in a DataFrame with the same name
1157 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1158 returned.
1159 """
1160 # calculate HTM index for every DiaObject
1161 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1162 ra_col, dec_col = self.config.ra_dec_columns
1163 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1164 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1165 idx = self.pixelator.index(uv3d)
1166 htm_index[i] = idx
1167 df = df.copy()
1168 df[self.config.pixelization.htm_index_column] = htm_index
1169 return df
1170
1171 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1172 """Update timestamp columns in input DataFrame to be aware datetime
1173 type in in UTC.
1174
1175 AP pipeline generates naive datetime instances, we want them to be
1176 aware before they go to database. All naive timestamps are assumed to
1177 be in UTC timezone (they should be TAI).
1178 """
1179 # Find all columns with aware non-UTC timestamps and convert to UTC.
1180 columns = [
1181 column
1182 for column, dtype in df.dtypes.items()
1183 if isinstance(dtype, pandas.DatetimeTZDtype) and dtype.tz is not datetime.timezone.utc
1184 ]
1185 for column in columns:
1186 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1187 # Find all columns with naive timestamps and add UTC timezone.
1188 columns = [
1189 column for column, dtype in df.dtypes.items() if pandas.api.types.is_datetime64_dtype(dtype)
1190 ]
1191 for column in columns:
1192 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1193 return df
1194
1195 def _fix_result_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1196 """Update timestamp columns to be naive datetime type in returned
1197 DataFrame.
1198
1199 AP pipeline code expects DataFrames to contain naive datetime columns,
1200 while Postgres queries return timezone-aware type. This method converts
1201 those columns to naive datetime in UTC timezone.
1202 """
1203 # Find all columns with aware timestamps.
1204 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1205 for column in columns:
1206 # tz_convert(None) will convert to UTC and drop timezone.
1207 df[column] = df[column].dt.tz_convert(None)
1208 return df
__init__(self, ApdbSqlConfig config)
Definition apdbSql.py:149
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
Definition apdbSql.py:384
pandas.DataFrame getSSObjects(self)
Definition apdbSql.py:623
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:568
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1195
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:582
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1075
pandas.DataFrame getDiaObjects(self, Region region)
Definition apdbSql.py:534
str _update_sqlite_url(cls, str url_string)
Definition apdbSql.py:267
VersionTuple apdbImplementationVersion(cls)
Definition apdbSql.py:356
None _versionCheck(self, ApdbMetadataSql metadata)
Definition apdbSql.py:313
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:878
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
Definition apdbSql.py:1139
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
Definition apdbSql.py:193
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:750
sqlalchemy.engine.Engine _engine
Definition apdbSql.py:150
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1171
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
Definition apdbSql.py:230
ApdbMetadata metadata(self)
Definition apdbSql.py:744
None _makeSchema(cls, ApdbConfig config, bool drop=False)
Definition apdbSql.py:491
bool containsVisitDetector(self, int visit, int detector)
Definition apdbSql.py:606
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:787
None reassignDiaSources(self, Mapping[int, int] idMap)
Definition apdbSql.py:703
list[tuple[int, int]] _htm_indices(self, Region region)
Definition apdbSql.py:1108
dict[str, int] tableRowCount(self)
Definition apdbSql.py:462
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
Definition apdbSql.py:816
ApdbSqlReplica get_replica(self)
Definition apdbSql.py:458
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
Definition apdbSql.py:643
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:906
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Definition apdbSql.py:188
Table|None tableDef(self, ApdbTables table)
Definition apdbSql.py:486
None storeSSObjects(self, pandas.DataFrame objects)
Definition apdbSql.py:670
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1037
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
Definition apdbSql.py:1125
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
Definition apdbSql.py:87
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
Definition apdbSql.py:109
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)
Definition apdbSql.py:79