LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSql.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlConfig", "ApdbSql"]
28
29import datetime
30import logging
31import urllib.parse
32import warnings
33from collections.abc import Iterable, Mapping, MutableMapping
34from contextlib import closing, suppress
35from typing import TYPE_CHECKING, Any, cast
36
37import astropy.time
38import numpy as np
39import pandas
40import sqlalchemy
41import sqlalchemy.dialects.postgresql
42import sqlalchemy.dialects.sqlite
43from lsst.pex.config import ChoiceField, Field, ListField
44from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
45from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
46from lsst.utils.iteration import chunk_iterable
47from sqlalchemy import func, sql
48from sqlalchemy.pool import NullPool
49
50from .._auth import DB_AUTH_ENVVAR, DB_AUTH_PATH
51from ..apdb import Apdb, ApdbConfig
52from ..apdbConfigFreezer import ApdbConfigFreezer
53from ..apdbReplica import ReplicaChunk
54from ..apdbSchema import ApdbTables
55from ..monitor import MonAgent
56from ..schema_model import Table
57from ..timer import Timer
58from ..versionTuple import IncompatibleVersionError, VersionTuple
59from .apdbMetadataSql import ApdbMetadataSql
60from .apdbSqlReplica import ApdbSqlReplica
61from .apdbSqlSchema import ApdbSqlSchema, ExtraTables
62
63if TYPE_CHECKING:
64 import sqlite3
65
66 from ..apdbMetadata import ApdbMetadata
67
68_LOG = logging.getLogger(__name__)
69
70_MON = MonAgent(__name__)
71
72VERSION = VersionTuple(0, 1, 1)
73"""Version for the code controlling non-replication tables. This needs to be
74updated following compatibility rules when schema produced by this code
75changes.
76"""
77
78
79def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
80 """Change the type of uint64 columns to int64, and return copy of data
81 frame.
82 """
83 names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
84 return df.astype({name: np.int64 for name in names})
85
86
87def _make_midpointMjdTai_start(visit_time: astropy.time.Time, months: int) -> float:
88 """Calculate starting point for time-based source search.
89
90 Parameters
91 ----------
92 visit_time : `astropy.time.Time`
93 Time of current visit.
94 months : `int`
95 Number of months in the sources history.
96
97 Returns
98 -------
99 time : `float`
100 A ``midpointMjdTai`` starting point, MJD time.
101 """
102 # TODO: Use of MJD must be consistent with the code in ap_association
103 # (see DM-31996)
104 return visit_time.mjd - months * 30
105
106
108 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
109) -> None:
110 # Enable foreign keys
111 with closing(dbapiConnection.cursor()) as cursor:
112 cursor.execute("PRAGMA foreign_keys=ON;")
113
114
116 """APDB configuration class for SQL implementation (ApdbSql)."""
117
118 db_url = Field[str](doc="SQLAlchemy database connection URI")
119 isolation_level = ChoiceField[str](
120 doc=(
121 "Transaction isolation level, if unset then backend-default value "
122 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
123 "Some backends may not support every allowed value."
124 ),
125 allowed={
126 "READ_COMMITTED": "Read committed",
127 "READ_UNCOMMITTED": "Read uncommitted",
128 "REPEATABLE_READ": "Repeatable read",
129 "SERIALIZABLE": "Serializable",
130 },
131 default=None,
132 optional=True,
133 )
134 connection_pool = Field[bool](
135 doc="If False then disable SQLAlchemy connection pool. Do not use connection pool when forking.",
136 default=True,
137 )
138 connection_timeout = Field[float](
139 doc=(
140 "Maximum time to wait time for database lock to be released before exiting. "
141 "Defaults to sqlalchemy defaults if not set."
142 ),
143 default=None,
144 optional=True,
145 )
146 sql_echo = Field[bool](doc="If True then pass SQLAlchemy echo option.", default=False)
147 dia_object_index = ChoiceField[str](
148 doc="Indexing mode for DiaObject table",
149 allowed={
150 "baseline": "Index defined in baseline schema",
151 "pix_id_iov": "(pixelId, objectId, iovStart) PK",
152 "last_object_table": "Separate DiaObjectLast table",
153 },
154 default="baseline",
155 )
156 htm_level = Field[int](doc="HTM indexing level", default=20)
157 htm_max_ranges = Field[int](doc="Max number of ranges in HTM envelope", default=64)
158 htm_index_column = Field[str](
159 default="pixelId", doc="Name of a HTM index column for DiaObject and DiaSource tables"
160 )
161 ra_dec_columns = ListField[str](default=["ra", "dec"], doc="Names of ra/dec columns in DiaObject table")
162 dia_object_columns = ListField[str](
163 doc="List of columns to read from DiaObject, by default read all columns", default=[]
164 )
165 prefix = Field[str](doc="Prefix to add to table names and index names", default="")
166 namespace = Field[str](
167 doc=(
168 "Namespace or schema name for all tables in APDB database. "
169 "Presently only works for PostgreSQL backend. "
170 "If schema with this name does not exist it will be created when "
171 "APDB tables are created."
172 ),
173 default=None,
174 optional=True,
175 )
176 timer = Field[bool](doc="If True then print/log timing information", default=False)
177
178 def validate(self) -> None:
179 super().validate()
180 if len(self.ra_dec_columns) != 2:
181 raise ValueError("ra_dec_columns must have exactly two column names")
182
183
185 """Implementation of APDB interface based on SQL database.
186
187 The implementation is configured via standard ``pex_config`` mechanism
188 using `ApdbSqlConfig` configuration class. For an example of different
189 configurations check ``config/`` folder.
190
191 Parameters
192 ----------
193 config : `ApdbSqlConfig`
194 Configuration object.
195 """
196
197 ConfigClass = ApdbSqlConfig
198
199 metadataSchemaVersionKey = "version:schema"
200 """Name of the metadata key to store schema version number."""
201
202 metadataCodeVersionKey = "version:ApdbSql"
203 """Name of the metadata key to store code version number."""
204
205 metadataReplicaVersionKey = "version:ApdbSqlReplica"
206 """Name of the metadata key to store replica code version number."""
207
208 metadataConfigKey = "config:apdb-sql.json"
209 """Name of the metadata key to store code version number."""
210
211 _frozen_parameters = (
212 "use_insert_id",
213 "dia_object_index",
214 "htm_level",
215 "htm_index_column",
216 "ra_dec_columns",
217 )
218 """Names of the config parameters to be frozen in metadata table."""
219
220 def __init__(self, config: ApdbSqlConfig):
221 self._engine = self._makeEngine(config, create=False)
222
223 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
224 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
225 meta_table: sqlalchemy.schema.Table | None = None
226 with suppress(sqlalchemy.exc.NoSuchTableError):
227 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
228
229 self._metadata = ApdbMetadataSql(self._engine, meta_table)
230
231 # Read frozen config from metadata.
232 config_json = self._metadata.get(self.metadataConfigKeymetadataConfigKey)
233 if config_json is not None:
234 # Update config from metadata.
235 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
236 self.config = freezer.update(config, config_json)
237 else:
238 self.config = config
239 self.config.validate()
240
242 engine=self._engine,
243 dia_object_index=self.config.dia_object_index,
244 schema_file=self.config.schema_file,
245 schema_name=self.config.schema_name,
246 prefix=self.config.prefix,
247 namespace=self.config.namespace,
248 htm_index_column=self.config.htm_index_column,
249 enable_replica=self.config.use_insert_id,
250 )
251
252 if self._metadata.table_exists():
253 self._versionCheck(self._metadata)
254
255 self.pixelator = HtmPixelization(self.config.htm_level)
256
257 _LOG.debug("APDB Configuration:")
258 _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
259 _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
260 _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
261 _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
262 _LOG.debug(" schema_file: %s", self.config.schema_file)
263 _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
264 _LOG.debug(" schema prefix: %s", self.config.prefix)
265
266 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
267 if self.config.timer:
268 self._timer_args.append(_LOG)
269
270 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
271 """Create `Timer` instance given its name."""
272 return Timer(name, *self._timer_args, tags=tags)
273
274 @classmethod
275 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
276 """Make SQLALchemy engine based on configured parameters.
277
278 Parameters
279 ----------
280 config : `ApdbSqlConfig`
281 Configuration object.
282 create : `bool`
283 Whether to try to create new database file, only relevant for
284 SQLite backend which always creates new files by default.
285 """
286 # engine is reused between multiple processes, make sure that we don't
287 # share connections by disabling pool (by using NullPool class)
288 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
289 conn_args: dict[str, Any] = dict()
290 if not config.connection_pool:
291 kw.update(poolclass=NullPool)
292 if config.isolation_level is not None:
293 kw.update(isolation_level=config.isolation_level)
294 elif config.db_url.startswith("sqlite"): # type: ignore
295 # Use READ_UNCOMMITTED as default value for sqlite.
296 kw.update(isolation_level="READ_UNCOMMITTED")
297 if config.connection_timeout is not None:
298 if config.db_url.startswith("sqlite"):
299 conn_args.update(timeout=config.connection_timeout)
300 elif config.db_url.startswith(("postgresql", "mysql")):
301 conn_args.update(connect_timeout=config.connection_timeout)
302 kw.update(connect_args=conn_args)
303 engine = sqlalchemy.create_engine(cls._connection_url(config.db_url, create=create), **kw)
304
305 if engine.dialect.name == "sqlite":
306 # Need to enable foreign keys on every new connection.
307 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
308
309 return engine
310
311 @classmethod
312 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
313 """Generate a complete URL for database with proper credentials.
314
315 Parameters
316 ----------
317 config_url : `str`
318 Database URL as specified in configuration.
319 create : `bool`
320 Whether to try to create new database file, only relevant for
321 SQLite backend which always creates new files by default.
322
323 Returns
324 -------
325 connection_url : `sqlalchemy.engine.URL` or `str`
326 Connection URL including credentials.
327 """
328 # Allow 3rd party authentication mechanisms by assuming connection
329 # string is correct when we can not recognize (dialect, host, database)
330 # matching keys.
331 components = urllib.parse.urlparse(config_url)
332 if all((components.scheme is not None, components.hostname is not None, components.path is not None)):
333 try:
334 db_auth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
335 config_url = db_auth.getUrl(config_url)
336 except DbAuthNotFoundError:
337 # Credentials file doesn't exist or no matching credentials,
338 # use default auth.
339 pass
340
341 # SQLite has a nasty habit creating empty databases when they do not
342 # exist, tell it not to do that unless we do need to create it.
343 if not create:
344 config_url = cls._update_sqlite_url(config_url)
345
346 return config_url
347
348 @classmethod
349 def _update_sqlite_url(cls, url_string: str) -> str:
350 """If URL refers to sqlite dialect, update it so that the backend does
351 not try to create database file if it does not exist already.
352
353 Parameters
354 ----------
355 url_string : `str`
356 Connection string.
357
358 Returns
359 -------
360 url_string : `str`
361 Possibly updated connection string.
362 """
363 try:
364 url = sqlalchemy.make_url(url_string)
365 except sqlalchemy.exc.SQLAlchemyError:
366 # If parsing fails it means some special format, likely not
367 # sqlite so we just return it unchanged.
368 return url_string
369
370 if url.get_backend_name() == "sqlite":
371 # Massage url so that database name starts with "file:" and
372 # option string has "mode=rw&uri=true". Database name
373 # should look like a path (:memory: is not supported by
374 # Apdb, but someone could still try to use it).
375 database = url.database
376 if database and not database.startswith((":", "file:")):
377 query = dict(url.query, mode="rw", uri="true")
378 # If ``database`` is an absolute path then original URL should
379 # include four slashes after "sqlite:". Humans are bad at
380 # counting things beyond four and sometimes an extra slash gets
381 # added unintentionally, which causes sqlite to treat initial
382 # element as "authority" and to complain. Strip extra slashes
383 # at the start of the path to avoid that (DM-46077).
384 if database.startswith("//"):
385 warnings.warn(
386 f"Database URL contains extra leading slashes which will be removed: {url}",
387 stacklevel=3,
388 )
389 database = "/" + database.lstrip("/")
390 url = url.set(database=f"file:{database}", query=query)
391 url_string = url.render_as_string()
392
393 return url_string
394
395 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
396 """Check schema version compatibility."""
397
398 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
399 """Retrieve version number from given metadata key."""
400 if metadata.table_exists():
401 version_str = metadata.get(key)
402 if version_str is None:
403 # Should not happen with existing metadata table.
404 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
405 return VersionTuple.fromString(version_str)
406 return default
407
408 # For old databases where metadata table does not exist we assume that
409 # version of both code and schema is 0.1.0.
410 initial_version = VersionTuple(0, 1, 0)
411 db_schema_version = _get_version(self.metadataSchemaVersionKeymetadataSchemaVersionKey, initial_version)
412 db_code_version = _get_version(self.metadataCodeVersionKeymetadataCodeVersionKey, initial_version)
413
414 # For now there is no way to make read-only APDB instances, assume that
415 # any access can do updates.
416 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
418 f"Configured schema version {self._schema.schemaVersion()} "
419 f"is not compatible with database version {db_schema_version}"
420 )
421 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
423 f"Current code version {self.apdbImplementationVersion()} "
424 f"is not compatible with database version {db_code_version}"
425 )
426
427 # Check replica code version only if replica is enabled.
428 if self._schema.has_replica_chunks:
429 db_replica_version = _get_version(self.metadataReplicaVersionKeymetadataReplicaVersionKey, initial_version)
430 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
431 if not code_replica_version.checkCompatibility(db_replica_version):
433 f"Current replication code version {code_replica_version} "
434 f"is not compatible with database version {db_replica_version}"
435 )
436
437 @classmethod
438 def apdbImplementationVersion(cls) -> VersionTuple:
439 """Return version number for current APDB implementation.
440
441 Returns
442 -------
443 version : `VersionTuple`
444 Version of the code defined in implementation class.
445 """
446 return VERSION
447
448 @classmethod
450 cls,
451 db_url: str,
452 *,
453 schema_file: str | None = None,
454 schema_name: str | None = None,
455 read_sources_months: int | None = None,
456 read_forced_sources_months: int | None = None,
457 use_insert_id: bool = False,
458 connection_timeout: int | None = None,
459 dia_object_index: str | None = None,
460 htm_level: int | None = None,
461 htm_index_column: str | None = None,
462 ra_dec_columns: list[str] | None = None,
463 prefix: str | None = None,
464 namespace: str | None = None,
465 drop: bool = False,
466 ) -> ApdbSqlConfig:
467 """Initialize new APDB instance and make configuration object for it.
468
469 Parameters
470 ----------
471 db_url : `str`
472 SQLAlchemy database URL.
473 schema_file : `str`, optional
474 Location of (YAML) configuration file with APDB schema. If not
475 specified then default location will be used.
476 schema_name : str | None
477 Name of the schema in YAML configuration file. If not specified
478 then default name will be used.
479 read_sources_months : `int`, optional
480 Number of months of history to read from DiaSource.
481 read_forced_sources_months : `int`, optional
482 Number of months of history to read from DiaForcedSource.
483 use_insert_id : `bool`
484 If True, make additional tables used for replication to PPDB.
485 connection_timeout : `int`, optional
486 Database connection timeout in seconds.
487 dia_object_index : `str`, optional
488 Indexing mode for DiaObject table.
489 htm_level : `int`, optional
490 HTM indexing level.
491 htm_index_column : `str`, optional
492 Name of a HTM index column for DiaObject and DiaSource tables.
493 ra_dec_columns : `list` [`str`], optional
494 Names of ra/dec columns in DiaObject table.
495 prefix : `str`, optional
496 Optional prefix for all table names.
497 namespace : `str`, optional
498 Name of the database schema for all APDB tables. If not specified
499 then default schema is used.
500 drop : `bool`, optional
501 If `True` then drop existing tables before re-creating the schema.
502
503 Returns
504 -------
505 config : `ApdbSqlConfig`
506 Resulting configuration object for a created APDB instance.
507 """
508 config = ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
509 if schema_file is not None:
510 config.schema_file = schema_file
511 if schema_name is not None:
512 config.schema_name = schema_name
513 if read_sources_months is not None:
514 config.read_sources_months = read_sources_months
515 if read_forced_sources_months is not None:
516 config.read_forced_sources_months = read_forced_sources_months
517 if connection_timeout is not None:
518 config.connection_timeout = connection_timeout
519 if dia_object_index is not None:
520 config.dia_object_index = dia_object_index
521 if htm_level is not None:
522 config.htm_level = htm_level
523 if htm_index_column is not None:
524 config.htm_index_column = htm_index_column
525 if ra_dec_columns is not None:
526 config.ra_dec_columns = ra_dec_columns
527 if prefix is not None:
528 config.prefix = prefix
529 if namespace is not None:
530 config.namespace = namespace
531
532 cls._makeSchema(config, drop=drop)
533
534 # SQLite has a nasty habit of creating empty database by default,
535 # update URL in config file to disable that behavior.
536 config.db_url = cls._update_sqlite_url(config.db_url)
537
538 return config
539
540 def get_replica(self) -> ApdbSqlReplica:
541 """Return `ApdbReplica` instance for this database."""
542 return ApdbSqlReplica(self._schema, self._engine)
543
544 def tableRowCount(self) -> dict[str, int]:
545 """Return dictionary with the table names and row counts.
546
547 Used by ``ap_proto`` to keep track of the size of the database tables.
548 Depending on database technology this could be expensive operation.
549
550 Returns
551 -------
552 row_counts : `dict`
553 Dict where key is a table name and value is a row count.
554 """
555 res = {}
556 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
557 if self.config.dia_object_index == "last_object_table":
558 tables.append(ApdbTables.DiaObjectLast)
559 with self._engine.begin() as conn:
560 for table in tables:
561 sa_table = self._schema.get_table(table)
562 stmt = sql.select(func.count()).select_from(sa_table)
563 count: int = conn.execute(stmt).scalar_one()
564 res[table.name] = count
565
566 return res
567
568 def tableDef(self, table: ApdbTables) -> Table | None:
569 # docstring is inherited from a base class
570 return self._schema.tableSchemas.get(table)
571
572 @classmethod
573 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
574 # docstring is inherited from a base class
575
576 if not isinstance(config, ApdbSqlConfig):
577 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
578
579 engine = cls._makeEngine(config, create=True)
580
581 # Ask schema class to create all tables.
582 schema = ApdbSqlSchema(
583 engine=engine,
584 dia_object_index=config.dia_object_index,
585 schema_file=config.schema_file,
586 schema_name=config.schema_name,
587 prefix=config.prefix,
588 namespace=config.namespace,
589 htm_index_column=config.htm_index_column,
590 enable_replica=config.use_insert_id,
591 )
592 schema.makeSchema(drop=drop)
593
594 # Need metadata table to store few items in it, if table exists.
595 meta_table: sqlalchemy.schema.Table | None = None
596 with suppress(ValueError):
597 meta_table = schema.get_table(ApdbTables.metadata)
598
599 apdb_meta = ApdbMetadataSql(engine, meta_table)
600 if apdb_meta.table_exists():
601 # Fill version numbers, overwrite if they are already there.
602 apdb_meta.set(cls.metadataSchemaVersionKeymetadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
604 if config.use_insert_id:
605 # Only store replica code version if replcia is enabled.
606 apdb_meta.set(
608 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
609 force=True,
610 )
611
612 # Store frozen part of a configuration in metadata.
613 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
614 apdb_meta.set(cls.metadataConfigKeymetadataConfigKey, freezer.to_json(config), force=True)
615
616 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
617 # docstring is inherited from a base class
618
619 # decide what columns we need
620 if self.config.dia_object_index == "last_object_table":
621 table_enum = ApdbTables.DiaObjectLast
622 else:
623 table_enum = ApdbTables.DiaObject
624 table = self._schema.get_table(table_enum)
625 if not self.config.dia_object_columns:
626 columns = self._schema.get_apdb_columns(table_enum)
627 else:
628 columns = [table.c[col] for col in self.config.dia_object_columns]
629 query = sql.select(*columns)
630
631 # build selection
632 query = query.where(self._filterRegion(table, region))
633
634 # select latest version of objects
635 if self.config.dia_object_index != "last_object_table":
636 query = query.where(table.c.validityEnd == None) # noqa: E711
637
638 # _LOG.debug("query: %s", query)
639
640 # execute select
641 with self._timer("select_time", tags={"table": "DiaObject"}) as timer:
642 with self._engine.begin() as conn:
643 objects = pandas.read_sql_query(query, conn)
644 timer.add_values(row_count=len(objects))
645 _LOG.debug("found %s DiaObjects", len(objects))
646 return self._fix_result_timestamps(objects)
647
649 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
650 ) -> pandas.DataFrame | None:
651 # docstring is inherited from a base class
652 if self.config.read_sources_months == 0:
653 _LOG.debug("Skip DiaSources fetching")
654 return None
655
656 if object_ids is None:
657 # region-based select
658 return self._getDiaSourcesInRegion(region, visit_time)
659 else:
660 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
661
663 self, region: Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
664 ) -> pandas.DataFrame | None:
665 # docstring is inherited from a base class
666 if self.config.read_forced_sources_months == 0:
667 _LOG.debug("Skip DiaForceSources fetching")
668 return None
669
670 if object_ids is None:
671 # This implementation does not support region-based selection.
672 raise NotImplementedError("Region-based selection is not supported")
673
674 # TODO: DateTime.MJD must be consistent with code in ap_association,
675 # alternatively we can fill midpointMjdTai ourselves in store()
676 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
677 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
678
679 with self._timer("select_time", tags={"table": "DiaForcedSource"}) as timer:
680 sources = self._getSourcesByIDs(
681 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
682 )
683 timer.add_values(row_count=len(sources))
684
685 _LOG.debug("found %s DiaForcedSources", len(sources))
686 return sources
687
688 def containsVisitDetector(self, visit: int, detector: int) -> bool:
689 # docstring is inherited from a base class
690 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
691 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
692 # Query should load only one leaf page of the index
693 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
694
695 with self._engine.begin() as conn:
696 result = conn.execute(query1).scalar_one_or_none()
697 if result is not None:
698 return True
699 else:
700 # Backup query if an image was processed but had no diaSources
701 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
702 result = conn.execute(query2).scalar_one_or_none()
703 return result is not None
704
705 def getSSObjects(self) -> pandas.DataFrame:
706 # docstring is inherited from a base class
707
708 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
709 query = sql.select(*columns)
710
711 # execute select
712 with self._timer("SSObject_select_time", tags={"table": "SSObject"}) as timer:
713 with self._engine.begin() as conn:
714 objects = pandas.read_sql_query(query, conn)
715 timer.add_values(row_count=len(objects))
716 _LOG.debug("found %s SSObjects", len(objects))
717 return self._fix_result_timestamps(objects)
718
719 def store(
720 self,
721 visit_time: astropy.time.Time,
722 objects: pandas.DataFrame,
723 sources: pandas.DataFrame | None = None,
724 forced_sources: pandas.DataFrame | None = None,
725 ) -> None:
726 # docstring is inherited from a base class
727 objects = self._fix_input_timestamps(objects)
728 if sources is not None:
729 sources = self._fix_input_timestamps(sources)
730 if forced_sources is not None:
731 forced_sources = self._fix_input_timestamps(forced_sources)
732
733 # We want to run all inserts in one transaction.
734 with self._engine.begin() as connection:
735 replica_chunk: ReplicaChunk | None = None
736 if self._schema.has_replica_chunks:
737 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
738 self._storeReplicaChunk(replica_chunk, visit_time, connection)
739
740 # fill pixelId column for DiaObjects
741 objects = self._add_spatial_index(objects)
742 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
743
744 if sources is not None:
745 # fill pixelId column for DiaSources
746 sources = self._add_spatial_index(sources)
747 self._storeDiaSources(sources, replica_chunk, connection)
748
749 if forced_sources is not None:
750 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
751
752 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
753 # docstring is inherited from a base class
754 objects = self._fix_input_timestamps(objects)
755
756 idColumn = "ssObjectId"
757 table = self._schema.get_table(ApdbTables.SSObject)
758
759 # everything to be done in single transaction
760 with self._engine.begin() as conn:
761 # Find record IDs that already exist. Some types like np.int64 can
762 # cause issues with sqlalchemy, convert them to int.
763 ids = sorted(int(oid) for oid in objects[idColumn])
764
765 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
766 result = conn.execute(query)
767 knownIds = set(row.ssObjectId for row in result)
768
769 filter = objects[idColumn].isin(knownIds)
770 toUpdate = cast(pandas.DataFrame, objects[filter])
771 toInsert = cast(pandas.DataFrame, objects[~filter])
772
773 # insert new records
774 if len(toInsert) > 0:
775 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
776
777 # update existing records
778 if len(toUpdate) > 0:
779 whereKey = f"{idColumn}_param"
780 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
781 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
782 values = toUpdate.to_dict("records")
783 result = conn.execute(update, values)
784
785 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
786 # docstring is inherited from a base class
787
788 table = self._schema.get_table(ApdbTables.DiaSource)
789 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
790
791 with self._engine.begin() as conn:
792 # Need to make sure that every ID exists in the database, but
793 # executemany may not support rowcount, so iterate and check what
794 # is missing.
795 missing_ids: list[int] = []
796 for key, value in idMap.items():
797 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
798 result = conn.execute(query, params)
799 if result.rowcount == 0:
800 missing_ids.append(key)
801 if missing_ids:
802 missing = ",".join(str(item) for item in missing_ids)
803 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
804
805 def dailyJob(self) -> None:
806 # docstring is inherited from a base class
807 pass
808
809 def countUnassociatedObjects(self) -> int:
810 # docstring is inherited from a base class
811
812 # Retrieve the DiaObject table.
813 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
814
815 # Construct the sql statement.
816 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
817 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
818
819 # Return the count.
820 with self._engine.begin() as conn:
821 count = conn.execute(stmt).scalar_one()
822
823 return count
824
825 @property
826 def metadata(self) -> ApdbMetadata:
827 # docstring is inherited from a base class
828 if self._metadata is None:
829 raise RuntimeError("Database schema was not initialized.")
830 return self._metadata
831
832 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
833 """Return catalog of DiaSource instances from given region.
834
835 Parameters
836 ----------
837 region : `lsst.sphgeom.Region`
838 Region to search for DIASources.
839 visit_time : `astropy.time.Time`
840 Time of the current visit.
841
842 Returns
843 -------
844 catalog : `pandas.DataFrame`
845 Catalog containing DiaSource records.
846 """
847 # TODO: DateTime.MJD must be consistent with code in ap_association,
848 # alternatively we can fill midpointMjdTai ourselves in store()
849 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
850 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
851
852 table = self._schema.get_table(ApdbTables.DiaSource)
853 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
854 query = sql.select(*columns)
855
856 # build selection
857 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
858 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
859 query = query.where(where)
860
861 # execute select
862 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}) as timer:
863 with self._engine.begin() as conn:
864 sources = pandas.read_sql_query(query, conn)
865 timer.add_values(row_counts=len(sources))
866 _LOG.debug("found %s DiaSources", len(sources))
867 return self._fix_result_timestamps(sources)
868
869 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
870 """Return catalog of DiaSource instances given set of DiaObject IDs.
871
872 Parameters
873 ----------
874 object_ids :
875 Collection of DiaObject IDs
876 visit_time : `astropy.time.Time`
877 Time of the current visit.
878
879 Returns
880 -------
881 catalog : `pandas.DataFrame`
882 Catalog contaning DiaSource records.
883 """
884 # TODO: DateTime.MJD must be consistent with code in ap_association,
885 # alternatively we can fill midpointMjdTai ourselves in store()
886 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
887 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
888
889 with self._timer("select_time", tags={"table": "DiaSource"}) as timer:
890 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
891 timer.add_values(row_count=len(sources))
892
893 _LOG.debug("found %s DiaSources", len(sources))
894 return sources
895
897 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
898 ) -> pandas.DataFrame:
899 """Return catalog of DiaSource or DiaForcedSource instances given set
900 of DiaObject IDs.
901
902 Parameters
903 ----------
904 table : `sqlalchemy.schema.Table`
905 Database table.
906 object_ids :
907 Collection of DiaObject IDs
908 midpointMjdTai_start : `float`
909 Earliest midpointMjdTai to retrieve.
910
911 Returns
912 -------
913 catalog : `pandas.DataFrame`
914 Catalog contaning DiaSource records. `None` is returned if
915 ``read_sources_months`` configuration parameter is set to 0 or
916 when ``object_ids`` is empty.
917 """
918 table = self._schema.get_table(table_enum)
919 columns = self._schema.get_apdb_columns(table_enum)
920
921 sources: pandas.DataFrame | None = None
922 if len(object_ids) <= 0:
923 _LOG.debug("ID list is empty, just fetch empty result")
924 query = sql.select(*columns).where(sql.literal(False))
925 with self._engine.begin() as conn:
926 sources = pandas.read_sql_query(query, conn)
927 else:
928 data_frames: list[pandas.DataFrame] = []
929 for ids in chunk_iterable(sorted(object_ids), 1000):
930 query = sql.select(*columns)
931
932 # Some types like np.int64 can cause issues with
933 # sqlalchemy, convert them to int.
934 int_ids = [int(oid) for oid in ids]
935
936 # select by object id
937 query = query.where(
938 sql.expression.and_(
939 table.columns["diaObjectId"].in_(int_ids),
940 table.columns["midpointMjdTai"] > midpointMjdTai_start,
941 )
942 )
943
944 # execute select
945 with self._engine.begin() as conn:
946 data_frames.append(pandas.read_sql_query(query, conn))
947
948 if len(data_frames) == 1:
949 sources = data_frames[0]
950 else:
951 sources = pandas.concat(data_frames)
952 assert sources is not None, "Catalog cannot be None"
953 return self._fix_result_timestamps(sources)
954
956 self,
957 replica_chunk: ReplicaChunk,
958 visit_time: astropy.time.Time,
959 connection: sqlalchemy.engine.Connection,
960 ) -> None:
961 # `visit_time.datetime` returns naive datetime, even though all astropy
962 # times are in UTC. Add UTC timezone to timestampt so that database
963 # can store a correct value.
964 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
965
966 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
967
968 # We need UPSERT which is dialect-specific construct
969 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
970 row = {"apdb_replica_chunk": replica_chunk.id} | values
971 if connection.dialect.name == "sqlite":
972 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
973 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
974 connection.execute(insert_sqlite, row)
975 elif connection.dialect.name == "postgresql":
976 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
977 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
978 connection.execute(insert_pg, row)
979 else:
980 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
981
983 self,
984 objs: pandas.DataFrame,
985 visit_time: astropy.time.Time,
986 replica_chunk: ReplicaChunk | None,
987 connection: sqlalchemy.engine.Connection,
988 ) -> None:
989 """Store catalog of DiaObjects from current visit.
990
991 Parameters
992 ----------
993 objs : `pandas.DataFrame`
994 Catalog with DiaObject records.
995 visit_time : `astropy.time.Time`
996 Time of the visit.
997 replica_chunk : `ReplicaChunk`
998 Insert identifier.
999 """
1000 if len(objs) == 0:
1001 _LOG.debug("No objects to write to database.")
1002 return
1003
1004 # Some types like np.int64 can cause issues with sqlalchemy, convert
1005 # them to int.
1006 ids = sorted(int(oid) for oid in objs["diaObjectId"])
1007 _LOG.debug("first object ID: %d", ids[0])
1008
1009 # TODO: Need to verify that we are using correct scale here for
1010 # DATETIME representation (see DM-31996).
1011 dt = visit_time.datetime
1012
1013 # everything to be done in single transaction
1014 if self.config.dia_object_index == "last_object_table":
1015 # Insert and replace all records in LAST table.
1016 table = self._schema.get_table(ApdbTables.DiaObjectLast)
1017
1018 # Drop the previous objects (pandas cannot upsert).
1019 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
1020
1021 with self._timer("delete_time", tags={"table": table.name}) as timer:
1022 res = connection.execute(query)
1023 timer.add_values(row_count=res.rowcount)
1024 _LOG.debug("deleted %s objects", res.rowcount)
1025
1026 # DiaObjectLast is a subset of DiaObject, strip missing columns
1027 last_column_names = [column.name for column in table.columns]
1028 last_objs = objs[last_column_names]
1029 last_objs = _coerce_uint64(last_objs)
1030
1031 if "lastNonForcedSource" in last_objs.columns:
1032 # lastNonForcedSource is defined NOT NULL, fill it with visit
1033 # time just in case.
1034 last_objs.fillna({"lastNonForcedSource": dt}, inplace=True)
1035 else:
1036 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
1037 last_objs.set_index(extra_column.index, inplace=True)
1038 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
1039
1040 with self._timer("insert_time", tags={"table": "DiaObjectLast"}) as timer:
1041 last_objs.to_sql(
1042 table.name,
1043 connection,
1044 if_exists="append",
1045 index=False,
1046 schema=table.schema,
1047 )
1048 timer.add_values(row_count=len(last_objs))
1049 else:
1050 # truncate existing validity intervals
1051 table = self._schema.get_table(ApdbTables.DiaObject)
1052
1053 update = (
1054 table.update()
1055 .values(validityEnd=dt)
1056 .where(
1057 sql.expression.and_(
1058 table.columns["diaObjectId"].in_(ids),
1059 table.columns["validityEnd"].is_(None),
1060 )
1061 )
1062 )
1063
1064 with self._timer("truncate_time", tags={"table": table.name}) as timer:
1065 res = connection.execute(update)
1066 timer.add_values(row_count=res.rowcount)
1067 _LOG.debug("truncated %s intervals", res.rowcount)
1068
1069 objs = _coerce_uint64(objs)
1070
1071 # Fill additional columns
1072 extra_columns: list[pandas.Series] = []
1073 if "validityStart" in objs.columns:
1074 objs["validityStart"] = dt
1075 else:
1076 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
1077 if "validityEnd" in objs.columns:
1078 objs["validityEnd"] = None
1079 else:
1080 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
1081 if "lastNonForcedSource" in objs.columns:
1082 # lastNonForcedSource is defined NOT NULL, fill it with visit time
1083 # just in case.
1084 objs.fillna({"lastNonForcedSource": dt}, inplace=True)
1085 else:
1086 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
1087 if extra_columns:
1088 objs.set_index(extra_columns[0].index, inplace=True)
1089 objs = pandas.concat([objs] + extra_columns, axis="columns")
1090
1091 # Insert replica data
1092 table = self._schema.get_table(ApdbTables.DiaObject)
1093 replica_data: list[dict] = []
1094 replica_stmt: Any = None
1095 replica_table_name = ""
1096 if replica_chunk is not None:
1097 pk_names = [column.name for column in table.primary_key]
1098 replica_data = objs[pk_names].to_dict("records")
1099 for row in replica_data:
1100 row["apdb_replica_chunk"] = replica_chunk.id
1101 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1102 replica_table_name = replica_table.name
1103 replica_stmt = replica_table.insert()
1104
1105 # insert new versions
1106 with self._timer("insert_time", tags={"table": table.name}) as timer:
1107 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1108 timer.add_values(row_count=len(objs))
1109 if replica_stmt is not None:
1110 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1111 connection.execute(replica_stmt, replica_data)
1112 timer.add_values(row_count=len(replica_data))
1113
1115 self,
1116 sources: pandas.DataFrame,
1117 replica_chunk: ReplicaChunk | None,
1118 connection: sqlalchemy.engine.Connection,
1119 ) -> None:
1120 """Store catalog of DiaSources from current visit.
1121
1122 Parameters
1123 ----------
1124 sources : `pandas.DataFrame`
1125 Catalog containing DiaSource records
1126 """
1127 table = self._schema.get_table(ApdbTables.DiaSource)
1128
1129 # Insert replica data
1130 replica_data: list[dict] = []
1131 replica_stmt: Any = None
1132 replica_table_name = ""
1133 if replica_chunk is not None:
1134 pk_names = [column.name for column in table.primary_key]
1135 replica_data = sources[pk_names].to_dict("records")
1136 for row in replica_data:
1137 row["apdb_replica_chunk"] = replica_chunk.id
1138 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1139 replica_table_name = replica_table.name
1140 replica_stmt = replica_table.insert()
1141
1142 # everything to be done in single transaction
1143 with self._timer("insert_time", tags={"table": table.name}) as timer:
1144 sources = _coerce_uint64(sources)
1145 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1146 timer.add_values(row_count=len(sources))
1147 if replica_stmt is not None:
1148 with self._timer("replica_insert_time", tags={"table": replica_table_name}) as timer:
1149 connection.execute(replica_stmt, replica_data)
1150 timer.add_values(row_count=len(replica_data))
1151
1153 self,
1154 sources: pandas.DataFrame,
1155 replica_chunk: ReplicaChunk | None,
1156 connection: sqlalchemy.engine.Connection,
1157 ) -> None:
1158 """Store a set of DiaForcedSources from current visit.
1159
1160 Parameters
1161 ----------
1162 sources : `pandas.DataFrame`
1163 Catalog containing DiaForcedSource records
1164 """
1165 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1166
1167 # Insert replica data
1168 replica_data: list[dict] = []
1169 replica_stmt: Any = None
1170 replica_table_name = ""
1171 if replica_chunk is not None:
1172 pk_names = [column.name for column in table.primary_key]
1173 replica_data = sources[pk_names].to_dict("records")
1174 for row in replica_data:
1175 row["apdb_replica_chunk"] = replica_chunk.id
1176 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1177 replica_table_name = replica_table.name
1178 replica_stmt = replica_table.insert()
1179
1180 # everything to be done in single transaction
1181 with self._timer("insert_time", tags={"table": table.name}) as timer:
1182 sources = _coerce_uint64(sources)
1183 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1184 timer.add_values(row_count=len(sources))
1185 if replica_stmt is not None:
1186 with self._timer("insert_time", tags={"table": replica_table_name}):
1187 connection.execute(replica_stmt, replica_data)
1188 timer.add_values(row_count=len(replica_data))
1189
1190 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1191 """Generate a set of HTM indices covering specified region.
1192
1193 Parameters
1194 ----------
1195 region: `sphgeom.Region`
1196 Region that needs to be indexed.
1197
1198 Returns
1199 -------
1200 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1201 """
1202 _LOG.debug("region: %s", region)
1203 indices = self.pixelator.envelope(region, self.config.htm_max_ranges)
1204
1205 return indices.ranges()
1206
1207 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1208 """Make SQLAlchemy expression for selecting records in a region."""
1209 htm_index_column = table.columns[self.config.htm_index_column]
1210 exprlist = []
1211 pixel_ranges = self._htm_indices(region)
1212 for low, upper in pixel_ranges:
1213 upper -= 1
1214 if low == upper:
1215 exprlist.append(htm_index_column == low)
1216 else:
1217 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1218
1219 return sql.expression.or_(*exprlist)
1220
1221 def _add_spatial_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1222 """Calculate spatial index for each record and add it to a DataFrame.
1223
1224 Parameters
1225 ----------
1226 df : `pandas.DataFrame`
1227 DataFrame which has to contain ra/dec columns, names of these
1228 columns are defined by configuration ``ra_dec_columns`` field.
1229
1230 Returns
1231 -------
1232 df : `pandas.DataFrame`
1233 DataFrame with ``pixelId`` column which contains pixel index
1234 for ra/dec coordinates.
1235
1236 Notes
1237 -----
1238 This overrides any existing column in a DataFrame with the same name
1239 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1240 returned.
1241 """
1242 # calculate HTM index for every DiaObject
1243 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1244 ra_col, dec_col = self.config.ra_dec_columns
1245 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1246 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1247 idx = self.pixelator.index(uv3d)
1248 htm_index[i] = idx
1249 df = df.copy()
1250 df[self.config.htm_index_column] = htm_index
1251 return df
1252
1253 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1254 """Update timestamp columns in input DataFrame to be aware datetime
1255 type in in UTC.
1256
1257 AP pipeline generates naive datetime instances, we want them to be
1258 aware before they go to database. All naive timestamps are assumed to
1259 be in UTC timezone (they should be TAI).
1260 """
1261 # Find all columns with aware non-UTC timestamps and convert to UTC.
1262 columns = [
1263 column
1264 for column, dtype in df.dtypes.items()
1265 if isinstance(dtype, pandas.DatetimeTZDtype) and dtype.tz is not datetime.timezone.utc
1266 ]
1267 for column in columns:
1268 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1269 # Find all columns with naive timestamps and add UTC timezone.
1270 columns = [
1271 column for column, dtype in df.dtypes.items() if pandas.api.types.is_datetime64_dtype(dtype)
1272 ]
1273 for column in columns:
1274 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1275 return df
1276
1277 def _fix_result_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1278 """Update timestamp columns to be naive datetime type in returned
1279 DataFrame.
1280
1281 AP pipeline code expects DataFrames to contain naive datetime columns,
1282 while Postgres queries return timezone-aware type. This method converts
1283 those columns to naive datetime in UTC timezone.
1284 """
1285 # Find all columns with aware timestamps.
1286 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1287 for column in columns:
1288 # tz_convert(None) will convert to UTC and drop timezone.
1289 df[column] = df[column].dt.tz_convert(None)
1290 return df
Tag types used to declare specialized field types.
Definition misc.h:31
__init__(self, ApdbSqlConfig config)
Definition apdbSql.py:220
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *bool create)
Definition apdbSql.py:275
pandas.DataFrame getSSObjects(self)
Definition apdbSql.py:705
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:650
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1277
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
Definition apdbSql.py:270
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:664
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1157
pandas.DataFrame getDiaObjects(self, Region region)
Definition apdbSql.py:616
str _update_sqlite_url(cls, str url_string)
Definition apdbSql.py:349
VersionTuple apdbImplementationVersion(cls)
Definition apdbSql.py:438
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *bool create)
Definition apdbSql.py:312
None _versionCheck(self, ApdbMetadataSql metadata)
Definition apdbSql.py:395
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:960
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
Definition apdbSql.py:1221
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
Definition apdbSql.py:832
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
Definition apdbSql.py:1253
ApdbMetadata metadata(self)
Definition apdbSql.py:826
None _makeSchema(cls, ApdbConfig config, bool drop=False)
Definition apdbSql.py:573
bool containsVisitDetector(self, int visit, int detector)
Definition apdbSql.py:688
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
Definition apdbSql.py:869
None reassignDiaSources(self, Mapping[int, int] idMap)
Definition apdbSql.py:785
list[tuple[int, int]] _htm_indices(self, Region region)
Definition apdbSql.py:1190
dict[str, int] tableRowCount(self)
Definition apdbSql.py:544
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
Definition apdbSql.py:898
ApdbSqlConfig init_database(cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
Definition apdbSql.py:466
ApdbSqlReplica get_replica(self)
Definition apdbSql.py:540
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
Definition apdbSql.py:725
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:988
Table|None tableDef(self, ApdbTables table)
Definition apdbSql.py:568
None storeSSObjects(self, pandas.DataFrame objects)
Definition apdbSql.py:752
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Definition apdbSql.py:1119
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
Definition apdbSql.py:1207
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
Definition apdbSql.py:87
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
Definition apdbSql.py:109
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)
Definition apdbSql.py:79