22"""Module defining Apdb class and related methods."""
24from __future__
import annotations
33from contextlib
import closing
34from typing
import TYPE_CHECKING, Any, cast
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from sqlalchemy
import func, sql
43from sqlalchemy.pool
import NullPool
45from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
46from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
47from lsst.utils.iteration
import chunk_iterable
49from ..apdb
import Apdb
50from ..apdbConfigFreezer
import ApdbConfigFreezer
51from ..apdbReplica
import ReplicaChunk
52from ..apdbSchema
import ApdbTables
53from ..config
import ApdbConfig
54from ..monitor
import MonAgent
55from ..schema_model
import Table
56from ..timer
import Timer
57from ..versionTuple
import IncompatibleVersionError, VersionTuple
58from .apdbMetadataSql
import ApdbMetadataSql
59from .apdbSqlAdmin
import ApdbSqlAdmin
60from .apdbSqlReplica
import ApdbSqlReplica
61from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
62from .config
import ApdbSqlConfig
67 from ..apdbMetadata
import ApdbMetadata
69_LOG = logging.getLogger(__name__)
74"""Version for the code controlling non-replication tables. This needs to be
75updated following compatibility rules when schema produced by this code
81 """Change the type of uint64 columns to int64, and return copy of data
84 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
85 return df.astype(dict.fromkeys(names, np.int64))
89 """Calculate starting point for time-based source search.
93 visit_time : `astropy.time.Time`
94 Time of current visit.
96 Number of months in the sources history.
101 A ``midpointMjdTai`` starting point, MJD time.
105 return float(visit_time.tai.mjd) - months * 30
109 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
112 with closing(dbapiConnection.cursor())
as cursor:
113 cursor.execute(
"PRAGMA foreign_keys=ON;")
117 """Implementation of APDB interface based on SQL database.
119 The implementation is configured via standard ``pex_config`` mechanism
120 using `ApdbSqlConfig` configuration class. For an example of different
121 configurations check ``config/`` folder.
125 config : `ApdbSqlConfig`
126 Configuration object.
129 metadataSchemaVersionKey =
"version:schema"
130 """Name of the metadata key to store schema version number."""
132 metadataCodeVersionKey =
"version:ApdbSql"
133 """Name of the metadata key to store code version number."""
135 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
136 """Name of the metadata key to store replica code version number."""
138 metadataConfigKey =
"config:apdb-sql.json"
139 """Name of the metadata key to store code version number."""
141 _frozen_parameters = (
144 "pixelization.htm_level",
145 "pixelization.htm_index_column",
148 """Names of the config parameters to be frozen in metadata table."""
153 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
154 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
155 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
160 if config_json
is not None:
163 self.
config = freezer.update(config, config_json)
169 dia_object_index=self.
config.dia_object_index,
170 schema_file=self.
config.schema_file,
171 schema_name=self.
config.schema_name,
172 prefix=self.
config.prefix,
173 namespace=self.
config.namespace,
174 htm_index_column=self.
config.pixelization.htm_index_column,
175 enable_replica=self.
config.enable_replica,
182 if _LOG.isEnabledFor(logging.DEBUG):
183 _LOG.debug(
"ApdbSql Configuration: %s", self.
config.model_dump())
185 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
186 """Create `Timer` instance given its name."""
187 return Timer(name, _MON, tags=tags)
190 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
191 """Make SQLALchemy engine based on configured parameters.
195 config : `ApdbSqlConfig`
196 Configuration object.
198 Whether to try to create new database file, only relevant for
199 SQLite backend which always creates new files by default.
203 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
204 conn_args: dict[str, Any] = {}
205 if not config.connection_config.connection_pool:
206 kw.update(poolclass=NullPool)
207 if config.connection_config.isolation_level
is not None:
208 kw.update(isolation_level=config.connection_config.isolation_level)
209 elif config.db_url.startswith(
"sqlite"):
211 kw.update(isolation_level=
"READ_UNCOMMITTED")
212 if config.connection_config.connection_timeout
is not None:
213 if config.db_url.startswith(
"sqlite"):
214 conn_args.update(timeout=config.connection_config.connection_timeout)
215 elif config.db_url.startswith((
"postgresql",
"mysql")):
216 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
217 kw.update(connect_args=conn_args)
218 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
220 if engine.dialect.name ==
"sqlite":
222 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
227 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
228 """Generate a complete URL for database with proper credentials.
233 Database URL as specified in configuration.
235 Whether to try to create new database file, only relevant for
236 SQLite backend which always creates new files by default.
240 connection_url : `sqlalchemy.engine.URL` or `str`
241 Connection URL including credentials.
246 components = urllib.parse.urlparse(config_url)
247 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
250 config_url = db_auth.getUrl(config_url)
251 except DbAuthNotFoundError:
265 """If URL refers to sqlite dialect, update it so that the backend does
266 not try to create database file if it does not exist already.
276 Possibly updated connection string.
279 url = sqlalchemy.make_url(url_string)
280 except sqlalchemy.exc.SQLAlchemyError:
285 if url.get_backend_name() ==
"sqlite":
290 database = url.database
291 if database
and not database.startswith((
":",
"file:")):
292 query = dict(url.query, mode=
"rw", uri=
"true")
299 if database.startswith(
"//"):
301 f
"Database URL contains extra leading slashes which will be removed: {url}",
304 database =
"/" + database.lstrip(
"/")
305 url = url.set(database=f
"file:{database}", query=query)
306 url_string = url.render_as_string()
311 """Check schema version compatibility and return the database schema
315 def _get_version(key: str) -> VersionTuple:
316 """Retrieve version number from given metadata key."""
317 version_str = metadata.get(key)
318 if version_str
is None:
320 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
321 return VersionTuple.fromString(version_str)
328 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
330 f
"Configured schema version {self._schema.schemaVersion()} "
331 f
"is not compatible with database version {db_schema_version}"
335 f
"Current code version {self.apdbImplementationVersion()} "
336 f
"is not compatible with database version {db_code_version}"
340 if self.
_schema.replication_enabled:
342 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
343 if not code_replica_version.checkCompatibility(db_replica_version):
345 f
"Current replication code version {code_replica_version} "
346 f
"is not compatible with database version {db_replica_version}"
349 return db_schema_version
353 """Return version number for current APDB implementation.
357 version : `VersionTuple`
358 Version of the code defined in implementation class.
367 schema_file: str |
None =
None,
368 schema_name: str |
None =
None,
369 read_sources_months: int |
None =
None,
370 read_forced_sources_months: int |
None =
None,
371 enable_replica: bool =
False,
372 connection_timeout: int |
None =
None,
373 dia_object_index: str |
None =
None,
374 htm_level: int |
None =
None,
375 htm_index_column: str |
None =
None,
376 ra_dec_columns: tuple[str, str] |
None =
None,
377 prefix: str |
None =
None,
378 namespace: str |
None =
None,
381 """Initialize new APDB instance and make configuration object for it.
386 SQLAlchemy database URL.
387 schema_file : `str`, optional
388 Location of (YAML) configuration file with APDB schema. If not
389 specified then default location will be used.
390 schema_name : str | None
391 Name of the schema in YAML configuration file. If not specified
392 then default name will be used.
393 read_sources_months : `int`, optional
394 Number of months of history to read from DiaSource.
395 read_forced_sources_months : `int`, optional
396 Number of months of history to read from DiaForcedSource.
397 enable_replica : `bool`
398 If True, make additional tables used for replication to PPDB.
399 connection_timeout : `int`, optional
400 Database connection timeout in seconds.
401 dia_object_index : `str`, optional
402 Indexing mode for DiaObject table.
403 htm_level : `int`, optional
405 htm_index_column : `str`, optional
406 Name of a HTM index column for DiaObject and DiaSource tables.
407 ra_dec_columns : `tuple` [`str`, `str`], optional
408 Names of ra/dec columns in DiaObject table.
409 prefix : `str`, optional
410 Optional prefix for all table names.
411 namespace : `str`, optional
412 Name of the database schema for all APDB tables. If not specified
413 then default schema is used.
414 drop : `bool`, optional
415 If `True` then drop existing tables before re-creating the schema.
419 config : `ApdbSqlConfig`
420 Resulting configuration object for a created APDB instance.
422 config =
ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
423 if schema_file
is not None:
424 config.schema_file = schema_file
425 if schema_name
is not None:
426 config.schema_name = schema_name
427 if read_sources_months
is not None:
428 config.read_sources_months = read_sources_months
429 if read_forced_sources_months
is not None:
430 config.read_forced_sources_months = read_forced_sources_months
431 if connection_timeout
is not None:
432 config.connection_config.connection_timeout = connection_timeout
433 if dia_object_index
is not None:
434 config.dia_object_index = dia_object_index
435 if htm_level
is not None:
436 config.pixelization.htm_level = htm_level
437 if htm_index_column
is not None:
438 config.pixelization.htm_index_column = htm_index_column
439 if ra_dec_columns
is not None:
440 config.ra_dec_columns = ra_dec_columns
441 if prefix
is not None:
442 config.prefix = prefix
443 if namespace
is not None:
444 config.namespace = namespace
455 """Return `ApdbReplica` instance for this database."""
459 """Return dictionary with the table names and row counts.
461 Used by ``ap_proto`` to keep track of the size of the database tables.
462 Depending on database technology this could be expensive operation.
467 Dict where key is a table name and value is a row count.
470 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
471 if self.
config.dia_object_index ==
"last_object_table":
472 tables.append(ApdbTables.DiaObjectLast)
473 with self.
_engine.begin()
as conn:
475 sa_table = self.
_schema.get_table(table)
476 stmt = sql.select(func.count()).select_from(sa_table)
477 count: int = conn.execute(stmt).scalar_one()
478 res[table.name] = count
486 def tableDef(self, table: ApdbTables) -> Table |
None:
488 return self.
_schema.tableSchemas.get(table)
491 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
494 if not isinstance(config, ApdbSqlConfig):
495 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
502 dia_object_index=config.dia_object_index,
503 schema_file=config.schema_file,
504 schema_name=config.schema_name,
505 prefix=config.prefix,
506 namespace=config.namespace,
507 htm_index_column=config.pixelization.htm_index_column,
508 enable_replica=config.enable_replica,
510 schema.makeSchema(drop=drop)
513 meta_table = schema.get_table(ApdbTables.metadata)
519 if config.enable_replica:
523 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
535 if self.
config.dia_object_index ==
"last_object_table":
536 table_enum = ApdbTables.DiaObjectLast
538 table_enum = ApdbTables.DiaObject
539 table = self.
_schema.get_table(table_enum)
540 if not self.
config.dia_object_columns:
541 columns = self.
_schema.get_apdb_columns(table_enum)
543 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
544 query = sql.select(*columns)
549 if self.
_schema.has_mjd_timestamps:
550 validity_end_column =
"validityEndMjdTai"
552 validity_end_column =
"validityEnd"
555 if self.
config.dia_object_index !=
"last_object_table":
556 query = query.where(table.columns[validity_end_column] ==
None)
561 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
562 with self.
_engine.begin()
as conn:
563 objects = pandas.read_sql_query(query, conn)
564 timer.add_values(row_count=len(objects))
565 _LOG.debug(
"found %s DiaObjects", len(objects))
571 object_ids: Iterable[int] |
None,
572 visit_time: astropy.time.Time,
573 start_time: astropy.time.Time |
None =
None,
574 ) -> pandas.DataFrame |
None:
576 if start_time
is None and self.
config.read_sources_months == 0:
577 _LOG.debug(
"Skip DiaSources fetching")
580 if start_time
is None:
583 start_time_mjdTai = float(start_time.tai.mjd)
584 _LOG.debug(
"start_time_mjdTai = %.6f", start_time_mjdTai)
586 if object_ids
is None:
595 object_ids: Iterable[int] |
None,
596 visit_time: astropy.time.Time,
597 start_time: astropy.time.Time |
None =
None,
598 ) -> pandas.DataFrame |
None:
600 if start_time
is None and self.
config.read_forced_sources_months == 0:
601 _LOG.debug(
"Skip DiaForceSources fetching")
604 if object_ids
is None:
609 raise NotImplementedError(
"Region-based selection is not supported")
613 if start_time
is None:
616 start_time_mjdTai = float(start_time.tai.mjd)
617 _LOG.debug(
"start_time_mjdTai = %.6f", start_time_mjdTai)
619 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
620 sources = self.
_getSourcesByIDs(ApdbTables.DiaForcedSource, list(object_ids), start_time_mjdTai)
621 timer.add_values(row_count=len(sources))
623 _LOG.debug(
"found %s DiaForcedSources", len(sources))
631 visit_time: astropy.time.Time,
634 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
635 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
637 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
639 with self.
_engine.begin()
as conn:
640 result = conn.execute(query1).scalar_one_or_none()
641 if result
is not None:
645 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
646 result = conn.execute(query2).scalar_one_or_none()
647 return result
is not None
652 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
653 query = sql.select(*columns)
656 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
657 with self.
_engine.begin()
as conn:
658 objects = pandas.read_sql_query(query, conn)
659 timer.add_values(row_count=len(objects))
660 _LOG.debug(
"found %s SSObjects", len(objects))
665 visit_time: astropy.time.Time,
666 objects: pandas.DataFrame,
667 sources: pandas.DataFrame |
None =
None,
668 forced_sources: pandas.DataFrame |
None =
None,
672 if sources
is not None:
674 if forced_sources
is not None:
678 with self.
_engine.begin()
as connection:
679 replica_chunk: ReplicaChunk |
None =
None
680 if self.
_schema.replication_enabled:
681 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
688 if sources
is not None:
693 if forced_sources
is not None:
700 idColumn =
"ssObjectId"
701 table = self.
_schema.get_table(ApdbTables.SSObject)
704 with self.
_engine.begin()
as conn:
707 ids = sorted(int(oid)
for oid
in objects[idColumn])
709 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
710 result = conn.execute(query)
711 knownIds = {row.ssObjectId
for row
in result}
713 filter = objects[idColumn].isin(knownIds)
714 toUpdate = cast(pandas.DataFrame, objects[filter])
715 toInsert = cast(pandas.DataFrame, objects[~filter])
718 if len(toInsert) > 0:
719 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
722 if len(toUpdate) > 0:
723 whereKey = f
"{idColumn}_param"
724 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
725 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
726 values = toUpdate.to_dict(
"records")
727 result = conn.execute(update, values)
732 reassignTime = datetime.datetime.now(tz=datetime.UTC)
734 table = self.
_schema.get_table(ApdbTables.DiaSource)
735 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
737 with self.
_engine.begin()
as conn:
741 missing_ids: list[int] = []
742 for key, value
in idMap.items():
747 "ssObjectReassocTime": reassignTime,
749 result = conn.execute(query, params)
750 if result.rowcount == 0:
751 missing_ids.append(key)
753 missing =
",".join(str(item)
for item
in missing_ids)
754 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
764 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
766 if self._schema.has_mjd_timestamps:
767 validity_end_column =
"validityEndMjdTai"
769 validity_end_column =
"validityEnd"
772 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
773 stmt = stmt.where(table.columns[validity_end_column] ==
None)
776 with self._engine.begin()
as conn:
777 count = conn.execute(stmt).scalar_one()
792 """Return catalog of DiaSource instances from given region.
796 region : `lsst.sphgeom.Region`
797 Region to search for DIASources.
798 start_time_mjdTai : `float`
799 Lower bound of time window for the query.
803 catalog : `pandas.DataFrame`
804 Catalog containing DiaSource records.
806 table = self.
_schema.get_table(ApdbTables.DiaSource)
807 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
808 query = sql.select(*columns)
811 time_filter = table.columns[
"midpointMjdTai"] > start_time_mjdTai
812 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
813 query = query.where(where)
816 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
817 with self.
_engine.begin()
as conn:
818 sources = pandas.read_sql_query(query, conn)
819 timer.add_values(row_counts=len(sources))
820 _LOG.debug(
"found %s DiaSources", len(sources))
824 """Return catalog of DiaSource instances given set of DiaObject IDs.
829 Collection of DiaObject IDs
830 start_time_mjdTai : `float`
831 Lower bound of time window for the query.
835 catalog : `pandas.DataFrame`
836 Catalog containing DiaSource records.
838 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
839 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, start_time_mjdTai)
840 timer.add_values(row_count=len(sources))
842 _LOG.debug(
"found %s DiaSources", len(sources))
846 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
847 ) -> pandas.DataFrame:
848 """Return catalog of DiaSource or DiaForcedSource instances given set
853 table : `sqlalchemy.schema.Table`
856 Collection of DiaObject IDs
857 midpointMjdTai_start : `float`
858 Earliest midpointMjdTai to retrieve.
862 catalog : `pandas.DataFrame`
863 Catalog contaning DiaSource records. `None` is returned if
864 ``read_sources_months`` configuration parameter is set to 0 or
865 when ``object_ids`` is empty.
867 table = self.
_schema.get_table(table_enum)
868 columns = self.
_schema.get_apdb_columns(table_enum)
870 sources: pandas.DataFrame |
None =
None
871 if len(object_ids) <= 0:
872 _LOG.debug(
"ID list is empty, just fetch empty result")
873 query = sql.select(*columns).where(sql.literal(
False))
874 with self.
_engine.begin()
as conn:
875 sources = pandas.read_sql_query(query, conn)
877 data_frames: list[pandas.DataFrame] = []
878 for ids
in chunk_iterable(sorted(object_ids), 1000):
879 query = sql.select(*columns)
883 int_ids = [int(oid)
for oid
in ids]
888 table.columns[
"diaObjectId"].in_(int_ids),
889 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
894 with self.
_engine.begin()
as conn:
895 data_frames.append(pandas.read_sql_query(query, conn))
897 if len(data_frames) == 1:
898 sources = data_frames[0]
900 sources = pandas.concat(data_frames)
901 assert sources
is not None,
"Catalog cannot be None"
906 replica_chunk: ReplicaChunk,
907 visit_time: astropy.time.Time,
908 connection: sqlalchemy.engine.Connection,
913 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.UTC)
915 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
918 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
919 row = {
"apdb_replica_chunk": replica_chunk.id} | values
920 if connection.dialect.name ==
"sqlite":
921 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
922 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
923 connection.execute(insert_sqlite, row)
924 elif connection.dialect.name ==
"postgresql":
925 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
926 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
927 connection.execute(insert_pg, row)
929 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
933 objs: pandas.DataFrame,
934 visit_time: astropy.time.Time,
935 replica_chunk: ReplicaChunk |
None,
936 connection: sqlalchemy.engine.Connection,
938 """Store catalog of DiaObjects from current visit.
942 objs : `pandas.DataFrame`
943 Catalog with DiaObject records.
944 visit_time : `astropy.time.Time`
946 replica_chunk : `ReplicaChunk`
950 _LOG.debug(
"No objects to write to database.")
955 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
956 _LOG.debug(
"first object ID: %d", ids[0])
958 if self.
_schema.has_mjd_timestamps:
959 validity_start_column =
"validityStartMjdTai"
960 validity_end_column =
"validityEndMjdTai"
961 timestamp = float(visit_time.tai.mjd)
963 validity_start_column =
"validityStart"
964 validity_end_column =
"validityEnd"
965 timestamp = visit_time.datetime
968 if self.
config.dia_object_index ==
"last_object_table":
970 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
973 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
975 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
976 res = connection.execute(query)
977 timer.add_values(row_count=res.rowcount)
978 _LOG.debug(
"deleted %s objects", res.rowcount)
981 last_column_names = [column.name
for column
in table.columns]
982 last_objs = objs[last_column_names]
985 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
993 timer.add_values(row_count=len(last_objs))
996 table = self.
_schema.get_table(ApdbTables.DiaObject)
1000 .values(**{validity_end_column: timestamp})
1002 sql.expression.and_(
1003 table.columns[
"diaObjectId"].in_(ids),
1004 table.columns[validity_end_column].is_(
None),
1009 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
1010 res = connection.execute(update)
1011 timer.add_values(row_count=res.rowcount)
1012 _LOG.debug(
"truncated %s intervals", res.rowcount)
1017 extra_columns: list[pandas.Series] = []
1018 if validity_start_column
in objs.columns:
1019 objs[validity_start_column] = timestamp
1021 extra_columns.append(pandas.Series([timestamp] * len(objs), name=validity_start_column))
1022 if validity_end_column
in objs.columns:
1023 objs[validity_end_column] =
None
1025 extra_columns.append(pandas.Series([
None] * len(objs), name=validity_end_column))
1027 objs.set_index(extra_columns[0].index, inplace=
True)
1028 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1031 table = self.
_schema.get_table(ApdbTables.DiaObject)
1032 replica_data: list[dict] = []
1033 replica_stmt: Any =
None
1034 replica_table_name =
""
1035 if replica_chunk
is not None:
1036 pk_names = [column.name
for column
in table.primary_key]
1037 replica_data = objs[pk_names].to_dict(
"records")
1038 for row
in replica_data:
1039 row[
"apdb_replica_chunk"] = replica_chunk.id
1040 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1041 replica_table_name = replica_table.name
1042 replica_stmt = replica_table.insert()
1045 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1046 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1047 timer.add_values(row_count=len(objs))
1048 if replica_stmt
is not None:
1049 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1050 connection.execute(replica_stmt, replica_data)
1051 timer.add_values(row_count=len(replica_data))
1055 sources: pandas.DataFrame,
1056 replica_chunk: ReplicaChunk |
None,
1057 connection: sqlalchemy.engine.Connection,
1059 """Store catalog of DiaSources from current visit.
1063 sources : `pandas.DataFrame`
1064 Catalog containing DiaSource records
1066 table = self.
_schema.get_table(ApdbTables.DiaSource)
1069 replica_data: list[dict] = []
1070 replica_stmt: Any =
None
1071 replica_table_name =
""
1072 if replica_chunk
is not None:
1073 pk_names = [column.name
for column
in table.primary_key]
1074 replica_data = sources[pk_names].to_dict(
"records")
1075 for row
in replica_data:
1076 row[
"apdb_replica_chunk"] = replica_chunk.id
1077 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1078 replica_table_name = replica_table.name
1079 replica_stmt = replica_table.insert()
1082 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1084 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1085 timer.add_values(row_count=len(sources))
1086 if replica_stmt
is not None:
1087 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1088 connection.execute(replica_stmt, replica_data)
1089 timer.add_values(row_count=len(replica_data))
1093 sources: pandas.DataFrame,
1094 replica_chunk: ReplicaChunk |
None,
1095 connection: sqlalchemy.engine.Connection,
1097 """Store a set of DiaForcedSources from current visit.
1101 sources : `pandas.DataFrame`
1102 Catalog containing DiaForcedSource records
1104 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1107 replica_data: list[dict] = []
1108 replica_stmt: Any =
None
1109 replica_table_name =
""
1110 if replica_chunk
is not None:
1111 pk_names = [column.name
for column
in table.primary_key]
1112 replica_data = sources[pk_names].to_dict(
"records")
1113 for row
in replica_data:
1114 row[
"apdb_replica_chunk"] = replica_chunk.id
1115 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1116 replica_table_name = replica_table.name
1117 replica_stmt = replica_table.insert()
1120 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1122 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1123 timer.add_values(row_count=len(sources))
1124 if replica_stmt
is not None:
1125 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1126 connection.execute(replica_stmt, replica_data)
1127 timer.add_values(row_count=len(replica_data))
1130 """Generate a set of HTM indices covering specified region.
1134 region: `sphgeom.Region`
1135 Region that needs to be indexed.
1139 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1141 _LOG.debug(
"region: %s", region)
1142 indices = self.
pixelator.envelope(region, self.
config.pixelization.htm_max_ranges)
1144 return indices.ranges()
1146 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1147 """Make SQLAlchemy expression for selecting records in a region."""
1148 htm_index_column = table.columns[self.
config.pixelization.htm_index_column]
1151 for low, upper
in pixel_ranges:
1154 exprlist.append(htm_index_column == low)
1156 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1158 return sql.expression.or_(*exprlist)
1161 """Calculate spatial index for each record and add it to a DataFrame.
1165 df : `pandas.DataFrame`
1166 DataFrame which has to contain ra/dec columns, names of these
1167 columns are defined by configuration ``ra_dec_columns`` field.
1171 df : `pandas.DataFrame`
1172 DataFrame with ``pixelId`` column which contains pixel index
1173 for ra/dec coordinates.
1177 This overrides any existing column in a DataFrame with the same name
1178 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1182 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1183 ra_col, dec_col = self.
config.ra_dec_columns
1184 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1189 df[self.
config.pixelization.htm_index_column] = htm_index
1193 """Update timestamp columns in input DataFrame to be aware datetime
1196 AP pipeline generates naive datetime instances, we want them to be
1197 aware before they go to database. All naive timestamps are assumed to
1198 be in UTC timezone (they should be TAI).
1203 for column, dtype
in df.dtypes.items()
1204 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.UTC
1206 for column
in columns:
1207 df[column] = df[column].dt.tz_convert(datetime.UTC)
1210 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1212 for column
in columns:
1213 df[column] = df[column].dt.tz_localize(datetime.UTC)
1217 """Update timestamp columns to be naive datetime type in returned
1220 AP pipeline code expects DataFrames to contain naive datetime columns,
1221 while Postgres queries return timezone-aware type. This method converts
1222 those columns to naive datetime in UTC timezone.
1225 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1226 for column
in columns:
1228 df[column] = df[column].dt.tz_convert(
None)
__init__(self, ApdbSqlConfig config)
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
str metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
VersionTuple _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
str metadataCodeVersionKey
ApdbSqlConfig getConfig(self)
sqlalchemy.engine.Engine _engine
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
VersionTuple _db_schema_version
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, float start_time_mjdTai)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
str metadataSchemaVersionKey
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, float start_time_mjdTai)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)