LSST Applications g013ef56533+d2224463a4,g199a45376c+0ba108daf9,g19c4beb06c+9f335b2115,g1fd858c14a+2459ca3e43,g210f2d0738+2d3d333a78,g262e1987ae+abbb004f04,g2825c19fe3+eedc38578d,g29ae962dfc+0cb55f06ef,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+19c3a54948,g47891489e3+501a489530,g4cdb532a89+a047e97985,g511e8cfd20+ce1f47b6d6,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5fd55ab2c7+951cc3f256,g64539dfbff+2d3d333a78,g67b6fd64d1+501a489530,g67fd3c3899+2d3d333a78,g74acd417e5+0ea5dee12c,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+501a489530,g8d7436a09f+5ea4c44d25,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g9486f8a5af+165c016931,g97be763408+d5e351dcc8,gbf99507273+8c5ae1fdc5,gc2a301910b+2d3d333a78,gca7fc764a6+501a489530,gce8aa8abaa+8c5ae1fdc5,gd7ef33dd92+501a489530,gdab6d2f7ff+0ea5dee12c,ge410e46f29+501a489530,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.41
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSql.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSql"]
27
28import datetime
29import logging
30import urllib.parse
31import warnings
32from collections.abc import Iterable, Mapping, MutableMapping
33from contextlib import closing
34from typing import TYPE_CHECKING, Any, cast
35
36import astropy.time
37import numpy as np
38import pandas
39import sqlalchemy
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from sqlalchemy import func, sql
43from sqlalchemy.pool import NullPool
44
45from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
46from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
47from lsst.utils.iteration import chunk_iterable
48
49from ..apdb import Apdb
50from ..apdbConfigFreezer import ApdbConfigFreezer
51from ..apdbReplica import ReplicaChunk
52from ..apdbSchema import ApdbTables
53from ..config import ApdbConfig
54from ..monitor import MonAgent
55from ..schema_model import Table
56from ..timer import Timer
57from ..versionTuple import IncompatibleVersionError, VersionTuple
58from .apdbMetadataSql import ApdbMetadataSql
59from .apdbSqlAdmin import ApdbSqlAdmin
60from .apdbSqlReplica import ApdbSqlReplica
61from .apdbSqlSchema import ApdbSqlSchema, ExtraTables
62from .config import ApdbSqlConfig
63
64if TYPE_CHECKING:
65 import sqlite3
66
67 from ..apdbMetadata import ApdbMetadata
68
69_LOG = logging.getLogger(__name__)
70
71_MON = MonAgent(__name__)
72
73VERSION = VersionTuple(1, 1, 0)
74"""Version for the code controlling non-replication tables. This needs to be
75updated following compatibility rules when schema produced by this code
76changes.
77"""
78
79
80def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
81 """Change the type of uint64 columns to int64, and return copy of data
82 frame.
83 """
84 names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
85 return df.astype(dict.fromkeys(names, np.int64))
86
87
88def _make_midpointMjdTai_start(visit_time: astropy.time.Time, months: int) -> float:
89 """Calculate starting point for time-based source search.
90
91 Parameters
92 ----------
93 visit_time : `astropy.time.Time`
94 Time of current visit.
95 months : `int`
96 Number of months in the sources history.
97
98 Returns
99 -------
100 time : `float`
101 A ``midpointMjdTai`` starting point, MJD time.
102 """
103 # TODO: Use of MJD must be consistent with the code in ap_association
104 # (see DM-31996)
105 return float(visit_time.tai.mjd) - months * 30
106
107
109 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
110) -> None:
111 # Enable foreign keys
112 with closing(dbapiConnection.cursor()) as cursor:
113 cursor.execute("PRAGMA foreign_keys=ON;")
114
115
117 """Implementation of APDB interface based on SQL database.
118
119 The implementation is configured via standard ``pex_config`` mechanism
120 using `ApdbSqlConfig` configuration class. For an example of different
121 configurations check ``config/`` folder.
122
123 Parameters
124 ----------
125 config : `ApdbSqlConfig`
126 Configuration object.
127 """
128
129 metadataSchemaVersionKey = "version:schema"
130 """Name of the metadata key to store schema version number."""
131
132 metadataCodeVersionKey = "version:ApdbSql"
133 """Name of the metadata key to store code version number."""
134
135 metadataReplicaVersionKey = "version:ApdbSqlReplica"
136 """Name of the metadata key to store replica code version number."""
137
138 metadataConfigKey = "config:apdb-sql.json"
139 """Name of the metadata key to store code version number."""
140
141 _frozen_parameters = (
142 "enable_replica",
143 "dia_object_index",
144 "pixelization.htm_level",
145 "pixelization.htm_index_column",
146 "ra_dec_columns",
147 )
148 """Names of the config parameters to be frozen in metadata table."""
149
150 def __init__(self, config: ApdbSqlConfig):
151 self._engine = self._makeEngine(config, create=False)
152
153 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
154 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
155 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
156 self._metadata = ApdbMetadataSql(self._engine, meta_table)
157
158 # Read frozen config from metadata.
159 config_json = self._metadata.get(self.metadataConfigKey)
160 if config_json is not None:
161 # Update config from metadata.
162 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
163 self.config = freezer.update(config, config_json)
164 else:
165 self.config = config
166
168 engine=self._engine,
169 dia_object_index=self.config.dia_object_index,
170 schema_file=self.config.schema_file,
171 schema_name=self.config.schema_name,
172 prefix=self.config.prefix,
173 namespace=self.config.namespace,
174 htm_index_column=self.config.pixelization.htm_index_column,
175 enable_replica=self.config.enable_replica,
176 )
177
179
180 self.pixelator = HtmPixelization(self.config.pixelization.htm_level)
181
182 if _LOG.isEnabledFor(logging.DEBUG):
183 _LOG.debug("ApdbSql Configuration: %s", self.config.model_dump())
184
185 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
186 """Create `Timer` instance given its name."""
187 return Timer(name, _MON, tags=tags)
188
189 @classmethod
190 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
191 """Make SQLALchemy engine based on configured parameters.
192
193 Parameters
194 ----------
195 config : `ApdbSqlConfig`
196 Configuration object.
197 create : `bool`
198 Whether to try to create new database file, only relevant for
199 SQLite backend which always creates new files by default.
200 """
201 # engine is reused between multiple processes, make sure that we don't
202 # share connections by disabling pool (by using NullPool class)
203 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
204 conn_args: dict[str, Any] = {}
205 if not config.connection_config.connection_pool:
206 kw.update(poolclass=NullPool)
207 if config.connection_config.isolation_level is not None:
208 kw.update(isolation_level=config.connection_config.isolation_level)
209 elif config.db_url.startswith("sqlite"):
210 # Use READ_UNCOMMITTED as default value for sqlite.
211 kw.update(isolation_level="READ_UNCOMMITTED")
212 if config.connection_config.connection_timeout is not None:
213 if config.db_url.startswith("sqlite"):
214 conn_args.update(timeout=config.connection_config.connection_timeout)
215 elif config.db_url.startswith(("postgresql", "mysql")):
216 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
217 kw.update(connect_args=conn_args)
218 engine = sqlalchemy.create_engine(cls._connection_url(config.db_url, create=create), **kw)
219
220 if engine.dialect.name == "sqlite":
221 # Need to enable foreign keys on every new connection.
222 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
223
224 return engine
225
226 @classmethod
227 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
228 """Generate a complete URL for database with proper credentials.
229
230 Parameters
231 ----------
232 config_url : `str`
233 Database URL as specified in configuration.
234 create : `bool`
235 Whether to try to create new database file, only relevant for
236 SQLite backend which always creates new files by default.
237
238 Returns
239 -------
240 connection_url : `sqlalchemy.engine.URL` or `str`
241 Connection URL including credentials.
242 """
243 # Allow 3rd party authentication mechanisms by assuming connection
244 # string is correct when we can not recognize (dialect, host, database)
245 # matching keys.
246 components = urllib.parse.urlparse(config_url)
247 if all((components.scheme is not None, components.hostname is not None, components.path is not None)):
248 try:
249 db_auth = DbAuth()
250 config_url = db_auth.getUrl(config_url)
251 except DbAuthNotFoundError:
252 # Credentials file doesn't exist or no matching credentials,
253 # use default auth.
254 pass
255
256 # SQLite has a nasty habit creating empty databases when they do not
257 # exist, tell it not to do that unless we do need to create it.
258 if not create:
259 config_url = cls._update_sqlite_url(config_url)
260
261 return config_url
262
263 @classmethod
264 def _update_sqlite_url(cls, url_string: str) -> str:
265 """If URL refers to sqlite dialect, update it so that the backend does
266 not try to create database file if it does not exist already.
267
268 Parameters
269 ----------
270 url_string : `str`
271 Connection string.
272
273 Returns
274 -------
275 url_string : `str`
276 Possibly updated connection string.
277 """
278 try:
279 url = sqlalchemy.make_url(url_string)
280 except sqlalchemy.exc.SQLAlchemyError:
281 # If parsing fails it means some special format, likely not
282 # sqlite so we just return it unchanged.
283 return url_string
284
285 if url.get_backend_name() == "sqlite":
286 # Massage url so that database name starts with "file:" and
287 # option string has "mode=rw&uri=true". Database name
288 # should look like a path (:memory: is not supported by
289 # Apdb, but someone could still try to use it).
290 database = url.database
291 if database and not database.startswith((":", "file:")):
292 query = dict(url.query, mode="rw", uri="true")
293 # If ``database`` is an absolute path then original URL should
294 # include four slashes after "sqlite:". Humans are bad at
295 # counting things beyond four and sometimes an extra slash gets
296 # added unintentionally, which causes sqlite to treat initial
297 # element as "authority" and to complain. Strip extra slashes
298 # at the start of the path to avoid that (DM-46077).
299 if database.startswith("//"):
300 warnings.warn(
301 f"Database URL contains extra leading slashes which will be removed: {url}",
302 stacklevel=3,
303 )
304 database = "/" + database.lstrip("/")
305 url = url.set(database=f"file:{database}", query=query)
306 url_string = url.render_as_string()
307
308 return url_string
309
310 def _versionCheck(self, metadata: ApdbMetadataSql) -> VersionTuple:
311 """Check schema version compatibility and return the database schema
312 version.
313 """
314
315 def _get_version(key: str) -> VersionTuple:
316 """Retrieve version number from given metadata key."""
317 version_str = metadata.get(key)
318 if version_str is None:
319 # Should not happen with existing metadata table.
320 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
321 return VersionTuple.fromString(version_str)
322
323 db_schema_version = _get_version(self.metadataSchemaVersionKey)
324 db_code_version = _get_version(self.metadataCodeVersionKey)
325
326 # For now there is no way to make read-only APDB instances, assume that
327 # any access can do updates.
328 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
330 f"Configured schema version {self._schema.schemaVersion()} "
331 f"is not compatible with database version {db_schema_version}"
332 )
333 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
335 f"Current code version {self.apdbImplementationVersion()} "
336 f"is not compatible with database version {db_code_version}"
337 )
338
339 # Check replica code version only if replica is enabled.
340 if self._schema.replication_enabled:
341 db_replica_version = _get_version(self.metadataReplicaVersionKey)
342 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
343 if not code_replica_version.checkCompatibility(db_replica_version):
345 f"Current replication code version {code_replica_version} "
346 f"is not compatible with database version {db_replica_version}"
347 )
348
349 return db_schema_version
350
351 @classmethod
352 def apdbImplementationVersion(cls) -> VersionTuple:
353 """Return version number for current APDB implementation.
354
355 Returns
356 -------
357 version : `VersionTuple`
358 Version of the code defined in implementation class.
359 """
360 return VERSION
361
362 @classmethod
364 cls,
365 db_url: str,
366 *,
367 schema_file: str | None = None,
368 schema_name: str | None = None,
369 read_sources_months: int | None = None,
370 read_forced_sources_months: int | None = None,
371 enable_replica: bool = False,
372 connection_timeout: int | None = None,
373 dia_object_index: str | None = None,
374 htm_level: int | None = None,
375 htm_index_column: str | None = None,
376 ra_dec_columns: tuple[str, str] | None = None,
377 prefix: str | None = None,
378 namespace: str | None = None,
379 drop: bool = False,
380 ) -> ApdbSqlConfig:
381 """Initialize new APDB instance and make configuration object for it.
382
383 Parameters
384 ----------
385 db_url : `str`
386 SQLAlchemy database URL.
387 schema_file : `str`, optional
388 Location of (YAML) configuration file with APDB schema. If not
389 specified then default location will be used.
390 schema_name : str | None
391 Name of the schema in YAML configuration file. If not specified
392 then default name will be used.
393 read_sources_months : `int`, optional
394 Number of months of history to read from DiaSource.
395 read_forced_sources_months : `int`, optional
396 Number of months of history to read from DiaForcedSource.
397 enable_replica : `bool`
398 If True, make additional tables used for replication to PPDB.
399 connection_timeout : `int`, optional
400 Database connection timeout in seconds.
401 dia_object_index : `str`, optional
402 Indexing mode for DiaObject table.
403 htm_level : `int`, optional
404 HTM indexing level.
405 htm_index_column : `str`, optional
406 Name of a HTM index column for DiaObject and DiaSource tables.
407 ra_dec_columns : `tuple` [`str`, `str`], optional
408 Names of ra/dec columns in DiaObject table.
409 prefix : `str`, optional
410 Optional prefix for all table names.
411 namespace : `str`, optional
412 Name of the database schema for all APDB tables. If not specified
413 then default schema is used.
414 drop : `bool`, optional
415 If `True` then drop existing tables before re-creating the schema.
416
417 Returns
418 -------
419 config : `ApdbSqlConfig`
420 Resulting configuration object for a created APDB instance.
421 """
422 config = ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
423 if schema_file is not None:
424 config.schema_file = schema_file
425 if schema_name is not None:
426 config.schema_name = schema_name
427 if read_sources_months is not None:
428 config.read_sources_months = read_sources_months
429 if read_forced_sources_months is not None:
430 config.read_forced_sources_months = read_forced_sources_months
431 if connection_timeout is not None:
432 config.connection_config.connection_timeout = connection_timeout
433 if dia_object_index is not None:
434 config.dia_object_index = dia_object_index
435 if htm_level is not None:
436 config.pixelization.htm_level = htm_level
437 if htm_index_column is not None:
438 config.pixelization.htm_index_column = htm_index_column
439 if ra_dec_columns is not None:
440 config.ra_dec_columns = ra_dec_columns
441 if prefix is not None:
442 config.prefix = prefix
443 if namespace is not None:
444 config.namespace = namespace
445
446 cls._makeSchema(config, drop=drop)
447
448 # SQLite has a nasty habit of creating empty database by default,
449 # update URL in config file to disable that behavior.
450 config.db_url = cls._update_sqlite_url(config.db_url)
451
452 return config
453
454 def get_replica(self) -> ApdbSqlReplica:
455 """Return `ApdbReplica` instance for this database."""
456 return ApdbSqlReplica(self._schema, self._engine, self._db_schema_version)
457
458 def tableRowCount(self) -> dict[str, int]:
459 """Return dictionary with the table names and row counts.
460
461 Used by ``ap_proto`` to keep track of the size of the database tables.
462 Depending on database technology this could be expensive operation.
463
464 Returns
465 -------
466 row_counts : `dict`
467 Dict where key is a table name and value is a row count.
468 """
469 res = {}
470 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
471 if self.config.dia_object_index == "last_object_table":
472 tables.append(ApdbTables.DiaObjectLast)
473 with self._engine.begin() as conn:
474 for table in tables:
475 sa_table = self._schema.get_table(table)
476 stmt = sql.select(func.count()).select_from(sa_table)
477 count: int = conn.execute(stmt).scalar_one()
478 res[table.name] = count
479
480 return res
481
482 def getConfig(self) -> ApdbSqlConfig:
483 # docstring is inherited from a base class
484 return self.config
485
486 def tableDef(self, table: ApdbTables) -> Table | None:
487 # docstring is inherited from a base class
488 return self._schema.tableSchemas.get(table)
489
490 @classmethod
491 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
492 # docstring is inherited from a base class
493
494 if not isinstance(config, ApdbSqlConfig):
495 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
496
497 engine = cls._makeEngine(config, create=True)
498
499 # Ask schema class to create all tables.
500 schema = ApdbSqlSchema(
501 engine=engine,
502 dia_object_index=config.dia_object_index,
503 schema_file=config.schema_file,
504 schema_name=config.schema_name,
505 prefix=config.prefix,
506 namespace=config.namespace,
507 htm_index_column=config.pixelization.htm_index_column,
508 enable_replica=config.enable_replica,
509 )
510 schema.makeSchema(drop=drop)
511
512 # Need metadata table to store few items in it.
513 meta_table = schema.get_table(ApdbTables.metadata)
514 apdb_meta = ApdbMetadataSql(engine, meta_table)
515
516 # Fill version numbers, overwrite if they are already there.
517 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
518 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
519 if config.enable_replica:
520 # Only store replica code version if replica is enabled.
521 apdb_meta.set(
523 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
524 force=True,
525 )
526
527 # Store frozen part of a configuration in metadata.
528 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
529 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
530
531 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
532 # docstring is inherited from a base class
533
534 # decide what columns we need
535 if self.config.dia_object_index == "last_object_table":
536 table_enum = ApdbTables.DiaObjectLast
537 else:
538 table_enum = ApdbTables.DiaObject
539 table = self._schema.get_table(table_enum)
540 if not self.config.dia_object_columns:
541 columns = self._schema.get_apdb_columns(table_enum)
542 else:
543 columns = [table.c[col] for col in self.config.dia_object_columns]
544 query = sql.select(*columns)
545
546 # build selection
547 query = query.where(self._filterRegion(table, region))
548
549 if self._schema.has_mjd_timestamps:
550 validity_end_column = "validityEndMjdTai"
551 else:
552 validity_end_column = "validityEnd"
553
554 # select latest version of objects
555 if self.config.dia_object_index != "last_object_table":
556 query = query.where(table.columns[validity_end_column] == None) # noqa: E711
557
558 # _LOG.debug("query: %s", query)
559
560 # execute select
561 with self._timer("select_time", tags={"table": "DiaObject"}) as timer:
562 with self._engine.begin() as conn:
563 objects = pandas.read_sql_query(query, conn)
564 timer.add_values(row_count=len(objects))
565 _LOG.debug("found %s DiaObjects", len(objects))
566 return self._fix_result_timestamps(objects)
567
569 self,
570 region: Region,
571 object_ids: Iterable[int] | None,
572 visit_time: astropy.time.Time,
573 start_time: astropy.time.Time | None = None,
574 ) -> pandas.DataFrame | None:
575 # docstring is inherited from a base class
576 if start_time is None and self.config.read_sources_months == 0:
577 _LOG.debug("Skip DiaSources fetching")
578 return None
579
580 if start_time is None:
581 start_time_mjdTai = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
582 else:
583 start_time_mjdTai = float(start_time.tai.mjd)
584 _LOG.debug("start_time_mjdTai = %.6f", start_time_mjdTai)
585
586 if object_ids is None:
587 # region-based select
588 return self._getDiaSourcesInRegion(region, start_time_mjdTai)
589 else:
590 return self._getDiaSourcesByIDs(list(object_ids), start_time_mjdTai)
591
593 self,
594 region: Region,
595 object_ids: Iterable[int] | None,
596 visit_time: astropy.time.Time,
597 start_time: astropy.time.Time | None = None,
598 ) -> pandas.DataFrame | None:
599 # docstring is inherited from a base class
600 if start_time is None and self.config.read_forced_sources_months == 0:
601 _LOG.debug("Skip DiaForceSources fetching")
602 return None
603
604 if object_ids is None:
605 # This implementation does not support region-based selection. In
606 # the past DiaForcedSource schema did not have ra/dec columns (it
607 # had x/y columns). ra/dec were added at some point, so we could
608 # add pixelOd column to this table if/when needed.
609 raise NotImplementedError("Region-based selection is not supported")
610
611 # TODO: DateTime.MJD must be consistent with code in ap_association,
612 # alternatively we can fill midpointMjdTai ourselves in store()
613 if start_time is None:
614 start_time_mjdTai = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
615 else:
616 start_time_mjdTai = float(start_time.tai.mjd)
617 _LOG.debug("start_time_mjdTai = %.6f", start_time_mjdTai)
618
619 with self._timer("select_time", tags={"table": "DiaForcedSource"}) as timer:
620 sources = self._getSourcesByIDs(ApdbTables.DiaForcedSource, list(object_ids), start_time_mjdTai)
621 timer.add_values(row_count=len(sources))
622
623 _LOG.debug("found %s DiaForcedSources", len(sources))
624 return sources
625
627 self,
628 visit: int,
629 detector: int,
630 region: Region,
631 visit_time: astropy.time.Time,
632 ) -> bool:
633 # docstring is inherited from a base class
634 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
635 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
636 # Query should load only one leaf page of the index
637 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
638
639 with self._engine.begin() as conn:
640 result = conn.execute(query1).scalar_one_or_none()
641 if result is not None:
642 return True
643 else:
644 # Backup query if an image was processed but had no diaSources
645 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
646 result = conn.execute(query2).scalar_one_or_none()
647 return result is not None
648
649 def getSSObjects(self) -> pandas.DataFrame:
650 # docstring is inherited from a base class
651
652 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
653 query = sql.select(*columns)
654
655 # execute select
656 with self._timer("SSObject_select_time", tags={"table": "SSObject"}) as timer:
657 with self._engine.begin() as conn:
658 objects = pandas.read_sql_query(query, conn)
659 timer.add_values(row_count=len(objects))
660 _LOG.debug("found %s SSObjects", len(objects))
661 return self._fix_result_timestamps(objects)
662
663 def store(
664 self,
665 visit_time: astropy.time.Time,
666 objects: pandas.DataFrame,
667 sources: pandas.DataFrame | None = None,
668 forced_sources: pandas.DataFrame | None = None,
669 ) -> None:
670 # docstring is inherited from a base class
671 objects = self._fix_input_timestamps(objects)
672 if sources is not None:
673 sources = self._fix_input_timestamps(sources)
674 if forced_sources is not None:
675 forced_sources = self._fix_input_timestamps(forced_sources)
676
677 # We want to run all inserts in one transaction.
678 with self._engine.begin() as connection:
679 replica_chunk: ReplicaChunk | None = None
680 if self._schema.replication_enabled:
681 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
682 self._storeReplicaChunk(replica_chunk, visit_time, connection)
683
684 # fill pixelId column for DiaObjects
685 objects = self._add_spatial_index(objects)
686 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
687
688 if sources is not None:
689 # fill pixelId column for DiaSources
690 sources = self._add_spatial_index(sources)
691 self._storeDiaSources(sources, replica_chunk, connection)
692
693 if forced_sources is not None:
694 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
695
696 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
697 # docstring is inherited from a base class
698 objects = self._fix_input_timestamps(objects)
699
700 idColumn = "ssObjectId"
701 table = self._schema.get_table(ApdbTables.SSObject)
702
703 # everything to be done in single transaction
704 with self._engine.begin() as conn:
705 # Find record IDs that already exist. Some types like np.int64 can
706 # cause issues with sqlalchemy, convert them to int.
707 ids = sorted(int(oid) for oid in objects[idColumn])
708
709 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
710 result = conn.execute(query)
711 knownIds = {row.ssObjectId for row in result}
712
713 filter = objects[idColumn].isin(knownIds)
714 toUpdate = cast(pandas.DataFrame, objects[filter])
715 toInsert = cast(pandas.DataFrame, objects[~filter])
716
717 # insert new records
718 if len(toInsert) > 0:
719 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
720
721 # update existing records
722 if len(toUpdate) > 0:
723 whereKey = f"{idColumn}_param"
724 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
725 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
726 values = toUpdate.to_dict("records")
727 result = conn.execute(update, values)
728
729 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
730 # docstring is inherited from a base class
731
732 reassignTime = datetime.datetime.now(tz=datetime.UTC)
733
734 table = self._schema.get_table(ApdbTables.DiaSource)
735 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
736
737 with self._engine.begin() as conn:
738 # Need to make sure that every ID exists in the database, but
739 # executemany may not support rowcount, so iterate and check what
740 # is missing.
741 missing_ids: list[int] = []
742 for key, value in idMap.items():
743 params = {
744 "srcId": key,
745 "diaObjectId": 0,
746 "ssObjectId": value,
747 "ssObjectReassocTime": reassignTime,
748 }
749 result = conn.execute(query, params)
750 if result.rowcount == 0:
751 missing_ids.append(key)
752 if missing_ids:
753 missing = ",".join(str(item) for item in missing_ids)
754 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
755
756 def dailyJob(self) -> None:
757 # docstring is inherited from a base class
758 pass
759
760 def countUnassociatedObjects(self) -> int:
761 # docstring is inherited from a base class
762
763 # Retrieve the DiaObject table.
764 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
765
766 if self._schema.has_mjd_timestamps:
767 validity_end_column = "validityEndMjdTai"
768 else:
769 validity_end_column = "validityEnd"
770
771 # Construct the sql statement.
772 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
773 stmt = stmt.where(table.columns[validity_end_column] == None) # noqa: E711
774
775 # Return the count.
776 with self._engine.begin() as conn:
777 count = conn.execute(stmt).scalar_one()
778
779 return count
780
781 @property
782 def metadata(self) -> ApdbMetadata:
783 # docstring is inherited from a base class
784 return self._metadata
785
786 @property
787 def admin(self) -> ApdbSqlAdmin:
788 # docstring is inherited from a base class
789 return ApdbSqlAdmin(self.pixelator)
790
791 def _getDiaSourcesInRegion(self, region: Region, start_time_mjdTai: float) -> pandas.DataFrame:
792 """Return catalog of DiaSource instances from given region.
793
794 Parameters
795 ----------
796 region : `lsst.sphgeom.Region`
797 Region to search for DIASources.
798 start_time_mjdTai : `float`
799 Lower bound of time window for the query.
800
801 Returns
802 -------
803 catalog : `pandas.DataFrame`
804 Catalog containing DiaSource records.
805 """
806 table = self._schema.get_table(ApdbTables.DiaSource)
807 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
808 query = sql.select(*columns)
809
810 # build selection
811 time_filter = table.columns["midpointMjdTai"] > start_time_mjdTai
812 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
813 query = query.where(where)
814
815 # execute select
816 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}) as timer:
817 with self._engine.begin() as conn:
818 sources = pandas.read_sql_query(query, conn)
819 timer.add_values(row_counts=len(sources))
820 _LOG.debug("found %s DiaSources", len(sources))
821 return self._fix_result_timestamps(sources)
822
823 def _getDiaSourcesByIDs(self, object_ids: list[int], start_time_mjdTai: float) -> pandas.DataFrame:
824 """Return catalog of DiaSource instances given set of DiaObject IDs.
825
826 Parameters
827 ----------
828 object_ids :
829 Collection of DiaObject IDs
830 start_time_mjdTai : `float`
831 Lower bound of time window for the query.
832
833 Returns
834 -------
835 catalog : `pandas.DataFrame`
836 Catalog containing DiaSource records.
837 """
838 with self._timer("select_time", tags={"table": "DiaSource"}) as timer:
839 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, start_time_mjdTai)
840 timer.add_values(row_count=len(sources))
841
842 _LOG.debug("found %s DiaSources", len(sources))
843 return sources
844
846 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
847 ) -> pandas.DataFrame:
848 """Return catalog of DiaSource or DiaForcedSource instances given set
849 of DiaObject IDs.
850
851 Parameters
852 ----------
853 table : `sqlalchemy.schema.Table`
854 Database table.
855 object_ids :
856 Collection of DiaObject IDs
857 midpointMjdTai_start : `float`
858 Earliest midpointMjdTai to retrieve.
859
860 Returns
861 -------
862 catalog : `pandas.DataFrame`
863 Catalog contaning DiaSource records. `None` is returned if
864 ``read_sources_months`` configuration parameter is set to 0 or
865 when ``object_ids`` is empty.
866 """
867 table = self._schema.get_table(table_enum)
868 columns = self._schema.get_apdb_columns(table_enum)
869
870 sources: pandas.DataFrame | None = None
871 if len(object_ids) <= 0:
872 _LOG.debug("ID list is empty, just fetch empty result")
873 query = sql.select(*columns).where(sql.literal(False))
874 with self._engine.begin() as conn:
875 sources = pandas.read_sql_query(query, conn)
876 else:
877 data_frames: list[pandas.DataFrame] = []
878 for ids in chunk_iterable(sorted(object_ids), 1000):
879 query = sql.select(*columns)
880
881 # Some types like np.int64 can cause issues with
882 # sqlalchemy, convert them to int.
883 int_ids = [int(oid) for oid in ids]
884
885 # select by object id
886 query = query.where(
887 sql.expression.and_(
888 table.columns["diaObjectId"].in_(int_ids),
889 table.columns["midpointMjdTai"] > midpointMjdTai_start,
890 )
891 )
892
893 # execute select
894 with self._engine.begin() as conn:
895 data_frames.append(pandas.read_sql_query(query, conn))
896
897 if len(data_frames) == 1:
898 sources = data_frames[0]
899 else:
900 sources = pandas.concat(data_frames)
901 assert sources is not None, "Catalog cannot be None"
902 return self._fix_result_timestamps(sources)
903
905 self,
906 replica_chunk: ReplicaChunk,
907 visit_time: astropy.time.Time,
908 connection: sqlalchemy.engine.Connection,
909 ) -> None:
910 # `visit_time.datetime` returns naive datetime, even though all astropy
911 # times are in UTC. Add UTC timezone to timestampt so that database
912 # can store a correct value.
913 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.UTC)
914
915 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
916
917 # We need UPSERT which is dialect-specific construct
918 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
919 row = {"apdb_replica_chunk": replica_chunk.id} | values
920 if connection.dialect.name == "sqlite":
921 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
922 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
923 connection.execute(insert_sqlite, row)
924 elif connection.dialect.name == "postgresql":
925 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
926 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
927 connection.execute(insert_pg, row)
928 else:
929 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
930
932 self,
933 objs: pandas.DataFrame,
934 visit_time: astropy.time.Time,
935 replica_chunk: ReplicaChunk | None,
936 connection: sqlalchemy.engine.Connection,
937 ) -> None:
938 """Store catalog of DiaObjects from current visit.
939
940 Parameters
941 ----------
942 objs : `pandas.DataFrame`
943 Catalog with DiaObject records.
944 visit_time : `astropy.time.Time`
945 Time of the visit.
946 replica_chunk : `ReplicaChunk`
947 Insert identifier.
948 """
949 if len(objs) == 0:
950 _LOG.debug("No objects to write to database.")
951 return
952
953 # Some types like np.int64 can cause issues with sqlalchemy, convert
954 # them to int.
955 ids = sorted(int(oid) for oid in objs["diaObjectId"])
956 _LOG.debug("first object ID: %d", ids[0])
957
958 if self._schema.has_mjd_timestamps:
959 validity_start_column = "validityStartMjdTai"
960 validity_end_column = "validityEndMjdTai"
961 timestamp = float(visit_time.tai.mjd)
962 else:
963 validity_start_column = "validityStart"
964 validity_end_column = "validityEnd"
965 timestamp = visit_time.datetime
966
967 # everything to be done in single transaction
968 if self.config.dia_object_index == "last_object_table":
969 # Insert and replace all records in LAST table.
970 table = self._schema.get_table(ApdbTables.DiaObjectLast)
971
972 # Drop the previous objects (pandas cannot upsert).
973 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
974
975 with self._timer("delete_time", tags={"table": table.name}) as timer:
976 res = connection.execute(query)
977 timer.add_values(row_count=res.rowcount)
978 _LOG.debug("deleted %s objects", res.rowcount)
979
980 # DiaObjectLast is a subset of DiaObject, strip missing columns
981 last_column_names = [column.name for column in table.columns]
982 last_objs = objs[last_column_names]
983 last_objs = _coerce_uint64(last_objs)
984
985 with self._timer("insert_time", tags={"table": "DiaObjectLast"}) as timer:
986 last_objs.to_sql(
987 table.name,
988 connection,
989 if_exists="append",
990 index=False,
991 schema=table.schema,
992 )
993 timer.add_values(row_count=len(last_objs))
994 else:
995 # truncate existing validity intervals
996 table = self._schema.get_table(ApdbTables.DiaObject)
997
998 update = (
999 table.update()
1000 .values(**{validity_end_column: timestamp})
1001 .where(
1002 sql.expression.and_(
1003 table.columns["diaObjectId"].in_(ids),
1004 table.columns[validity_end_column].is_(None),
1005 )
1006 )
1007 )
1008
1009 with self._timer("truncate_time", tags={"table": table.name}) as timer:
1010 res = connection.execute(update)
1011 timer.add_values(row_count=res.rowcount)
1012 _LOG.debug("truncated %s intervals", res.rowcount)
1013
1014 objs = _coerce_uint64(objs)
1015
1016 # Fill additional columns
1017 extra_columns: list[pandas.Series] = []
1018 if validity_start_column in objs.columns:
1019 objs[validity_start_column] = timestamp
1020 else:
1021 extra_columns.append(pandas.Series([timestamp] * len(objs), name=validity_start_column))
1022 if validity_end_column in objs.columns:
1023 objs[validity_end_column] = None
1024 else:
1025 extra_columns.append(pandas.Series([None] * len(objs), name=validity_end_column))
1026 if extra_columns:
1027 objs.set_index(extra_columns[0].index, inplace=True)
1028 objs = pandas.concat([objs] + extra_columns, axis="columns")
1029
1030 # Insert replica data
1031 table = self._schema.get_table(ApdbTables.DiaObject)
1032 replica_data: list[dict] = []
1033 replica_stmt: Any = None
1034 replica_table_name = ""
1035 if replica_chunk is not None:
1036 pk_names = [column.name for column in table.primary_key]
1037 replica_data = objs[pk_names].to_dict("records")
1038 for row in replica_data:
1039 row["apdb_replica_chunk"] = replica_chunk.id
1040 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1041 replica_table_name = replica_table.name
1042 replica_stmt = replica_table.insert()
1043
1044 # insert new versions
1045 with self._timer("insert_time", tags={"table": table.name}) as timer:
1046 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1047 timer.add_values(row_count=len(objs))
1048 if replica_stmt is not None:
1049 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1050 connection.execute(replica_stmt, replica_data)
1051 timer.add_values(row_count=len(replica_data))
1052
1054 self,
1055 sources: pandas.DataFrame,
1056 replica_chunk: ReplicaChunk | None,
1057 connection: sqlalchemy.engine.Connection,
1058 ) -> None:
1059 """Store catalog of DiaSources from current visit.
1060
1061 Parameters
1062 ----------
1063 sources : `pandas.DataFrame`
1064 Catalog containing DiaSource records
1065 """
1066 table = self._schema.get_table(ApdbTables.DiaSource)
1067
1068 # Insert replica data
1069 replica_data: list[dict] = []
1070 replica_stmt: Any = None
1071 replica_table_name = ""
1072 if replica_chunk is not None:
1073 pk_names = [column.name for column in table.primary_key]
1074 replica_data = sources[pk_names].to_dict("records")
1075 for row in replica_data:
1076 row["apdb_replica_chunk"] = replica_chunk.id
1077 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1078 replica_table_name = replica_table.name
1079 replica_stmt = replica_table.insert()
1080
1081 # everything to be done in single transaction
1082 with self._timer("insert_time", tags={"table": table.name}) as timer:
1083 sources = _coerce_uint64(sources)
1084 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1085 timer.add_values(row_count=len(sources))
1086 if replica_stmt is not None:
1087 with self._timer("replica_insert_time", tags={"table": replica_table_name}) as timer:
1088 connection.execute(replica_stmt, replica_data)
1089 timer.add_values(row_count=len(replica_data))
1090
1092 self,
1093 sources: pandas.DataFrame,
1094 replica_chunk: ReplicaChunk | None,
1095 connection: sqlalchemy.engine.Connection,
1096 ) -> None:
1097 """Store a set of DiaForcedSources from current visit.
1098
1099 Parameters
1100 ----------
1101 sources : `pandas.DataFrame`
1102 Catalog containing DiaForcedSource records
1103 """
1104 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1105
1106 # Insert replica data
1107 replica_data: list[dict] = []
1108 replica_stmt: Any = None
1109 replica_table_name = ""
1110 if replica_chunk is not None:
1111 pk_names = [column.name for column in table.primary_key]
1112 replica_data = sources[pk_names].to_dict("records")
1113 for row in replica_data:
1114 row["apdb_replica_chunk"] = replica_chunk.id
1115 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1116 replica_table_name = replica_table.name
1117 replica_stmt = replica_table.insert()
1118
1119 # everything to be done in single transaction
1120 with self._timer("insert_time", tags={"table": table.name}) as timer:
1121 sources = _coerce_uint64(sources)
1122 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1123 timer.add_values(row_count=len(sources))
1124 if replica_stmt is not None:
1125 with self._timer("insert_time", tags={"table": replica_table_name}):
1126 connection.execute(replica_stmt, replica_data)
1127 timer.add_values(row_count=len(replica_data))
1128
1129 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1130 """Generate a set of HTM indices covering specified region.
1131
1132 Parameters
1133 ----------
1134 region: `sphgeom.Region`
1135 Region that needs to be indexed.
1136
1137 Returns
1138 -------
1139 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1140 """
1141 _LOG.debug("region: %s", region)
1142 indices = self.pixelator.envelope(region, self.config.pixelization.htm_max_ranges)
1143
1144 return indices.ranges()
1145
1146 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1147 """Make SQLAlchemy expression for selecting records in a region."""
1148 htm_index_column = table.columns[self.config.pixelization.htm_index_column]
1149 exprlist = []
1150 pixel_ranges = self._htm_indices(region)
1151 for low, upper in pixel_ranges:
1152 upper -= 1
1153 if low == upper:
1154 exprlist.append(htm_index_column == low)
1155 else:
1156 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1157
1158 return sql.expression.or_(*exprlist)
1159
1160 def _add_spatial_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1161 """Calculate spatial index for each record and add it to a DataFrame.
1162
1163 Parameters
1164 ----------
1165 df : `pandas.DataFrame`
1166 DataFrame which has to contain ra/dec columns, names of these
1167 columns are defined by configuration ``ra_dec_columns`` field.
1168
1169 Returns
1170 -------
1171 df : `pandas.DataFrame`
1172 DataFrame with ``pixelId`` column which contains pixel index
1173 for ra/dec coordinates.
1174
1175 Notes
1176 -----
1177 This overrides any existing column in a DataFrame with the same name
1178 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1179 returned.
1180 """
1181 # calculate HTM index for every DiaObject
1182 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1183 ra_col, dec_col = self.config.ra_dec_columns
1184 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1185 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1186 idx = self.pixelator.index(uv3d)
1187 htm_index[i] = idx
1188 df = df.copy()
1189 df[self.config.pixelization.htm_index_column] = htm_index
1190 return df
1191
1192 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1193 """Update timestamp columns in input DataFrame to be aware datetime
1194 type in in UTC.
1195
1196 AP pipeline generates naive datetime instances, we want them to be
1197 aware before they go to database. All naive timestamps are assumed to
1198 be in UTC timezone (they should be TAI).
1199 """
1200 # Find all columns with aware non-UTC timestamps and convert to UTC.
1201 columns = [
1202 column
1203 for column, dtype in df.dtypes.items()
1204 if isinstance(dtype, pandas.DatetimeTZDtype) and dtype.tz is not datetime.UTC
1205 ]
1206 for column in columns:
1207 df[column] = df[column].dt.tz_convert(datetime.UTC)
1208 # Find all columns with naive timestamps and add UTC timezone.
1209 columns = [
1210 column for column, dtype in df.dtypes.items() if pandas.api.types.is_datetime64_dtype(dtype)
1211 ]
1212 for column in columns:
1213 df[column] = df[column].dt.tz_localize(datetime.UTC)
1214 return df
1215
1216 def _fix_result_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1217 """Update timestamp columns to be naive datetime type in returned
1218 DataFrame.
1219
1220 AP pipeline code expects DataFrames to contain naive datetime columns,
1221 while Postgres queries return timezone-aware type. This method converts
1222 those columns to naive datetime in UTC timezone.
1223 """
1224 # Find all columns with aware timestamps.
1225 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1226 for column in columns:
1227 # tz_convert(None) will convert to UTC and drop timezone.
1228 df[column] = df[column].dt.tz_convert(None)
1229 return df
__init__(self, ApdbSqlConfig config)
Definition apdbSql.py:150
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
Definition apdbSql.py:380
pandas.DataFrame getSSObjects(self)
Definition apdbSql.py:649
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
Definition apdbSql.py:598
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1216
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1096
pandas.DataFrame getDiaObjects(self, Region region)
Definition apdbSql.py:531
str _update_sqlite_url(cls, str url_string)
Definition apdbSql.py:264
VersionTuple apdbImplementationVersion(cls)
Definition apdbSql.py:352
VersionTuple _versionCheck(self, ApdbMetadataSql metadata)
Definition apdbSql.py:310
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:909
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
Definition apdbSql.py:1160
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
Definition apdbSql.py:190
ApdbSqlConfig getConfig(self)
Definition apdbSql.py:482
sqlalchemy.engine.Engine _engine
Definition apdbSql.py:151
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1192
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
Definition apdbSql.py:227
ApdbMetadata metadata(self)
Definition apdbSql.py:782
None _makeSchema(cls, ApdbConfig config, bool drop=False)
Definition apdbSql.py:491
None reassignDiaSources(self, Mapping[int, int] idMap)
Definition apdbSql.py:729
list[tuple[int, int]] _htm_indices(self, Region region)
Definition apdbSql.py:1129
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:632
dict[str, int] tableRowCount(self)
Definition apdbSql.py:458
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
Definition apdbSql.py:847
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, float start_time_mjdTai)
Definition apdbSql.py:823
ApdbSqlReplica get_replica(self)
Definition apdbSql.py:454
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
Definition apdbSql.py:669
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
Definition apdbSql.py:574
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:937
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Definition apdbSql.py:185
Table|None tableDef(self, ApdbTables table)
Definition apdbSql.py:486
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, float start_time_mjdTai)
Definition apdbSql.py:791
None storeSSObjects(self, pandas.DataFrame objects)
Definition apdbSql.py:696
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1058
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
Definition apdbSql.py:1146
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
Definition apdbSql.py:88
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
Definition apdbSql.py:110
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)
Definition apdbSql.py:80