LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Static Protected Attributes | List of all members
lsst.dax.apdb.sql.apdbSql.ApdbSql Class Reference
Inheritance diagram for lsst.dax.apdb.sql.apdbSql.ApdbSql:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

 __init__ (self, ApdbSqlConfig config)
 
VersionTuple apdbImplementationVersion (cls)
 
ApdbSqlConfig init_database (cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
 
VersionTuple apdbSchemaVersion (self)
 
ApdbSqlReplica get_replica (self)
 
dict[str, int] tableRowCount (self)
 
Table|None tableDef (self, ApdbTables table)
 
pandas.DataFrame getDiaObjects (self, Region region)
 
pandas.DataFrame|None getDiaSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame|None getDiaForcedSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
bool containsVisitDetector (self, int visit, int detector)
 
bool containsCcdVisit (self, int ccdVisitId)
 
pandas.DataFrame getSSObjects (self)
 
None store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
 
None storeSSObjects (self, pandas.DataFrame objects)
 
None reassignDiaSources (self, Mapping[int, int] idMap)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ApdbMetadata metadata (self)
 

Public Attributes

 config
 
 pixelator
 
 metadataSchemaVersionKey
 
 metadataCodeVersionKey
 
 metadataReplicaVersionKey
 
 metadataConfigKey
 

Static Public Attributes

 ConfigClass = ApdbSqlConfig
 
str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbSql"
 
str metadataReplicaVersionKey = "version:ApdbSqlReplica"
 
str metadataConfigKey = "config:apdb-sql.json"
 

Protected Member Functions

sqlalchemy.engine.Engine _makeEngine (cls, ApdbSqlConfig config)
 
None _versionCheck (self, ApdbMetadataSql metadata)
 
None _makeSchema (cls, ApdbConfig config, bool drop=False)
 
pandas.DataFrame _getDiaSourcesInRegion (self, Region region, astropy.time.Time visit_time)
 
pandas.DataFrame _getDiaSourcesByIDs (self, list[int] object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame _getSourcesByIDs (self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
 
None _storeReplicaChunk (self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
 
None _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaForcedSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
list[tuple[int, int]] _htm_indices (self, Region region)
 
sql.ColumnElement _filterRegion (self, sqlalchemy.schema.Table table, Region region)
 
pandas.DataFrame _add_obj_htm_index (self, pandas.DataFrame df)
 
pandas.DataFrame _add_src_htm_index (self, pandas.DataFrame sources, pandas.DataFrame objs)
 

Protected Attributes

 _engine
 
 _metadata
 
 _schema
 

Static Protected Attributes

tuple _frozen_parameters
 

Detailed Description

Implementation of APDB interface based on SQL database.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbSqlConfig` configuration class. For an example of different
configurations check ``config/`` folder.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 176 of file apdbSql.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.sql.apdbSql.ApdbSql.__init__ ( self,
ApdbSqlConfig config )

Definition at line 212 of file apdbSql.py.

212 def __init__(self, config: ApdbSqlConfig):
213 self._engine = self._makeEngine(config)
214
215 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
216 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
217 meta_table: sqlalchemy.schema.Table | None = None
218 with suppress(sqlalchemy.exc.NoSuchTableError):
219 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
220
221 self._metadata = ApdbMetadataSql(self._engine, meta_table)
222
223 # Read frozen config from metadata.
224 config_json = self._metadata.get(self.metadataConfigKey)
225 if config_json is not None:
226 # Update config from metadata.
227 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
228 self.config = freezer.update(config, config_json)
229 else:
230 self.config = config
231 self.config.validate()
232
233 self._schema = ApdbSqlSchema(
234 engine=self._engine,
235 dia_object_index=self.config.dia_object_index,
236 schema_file=self.config.schema_file,
237 schema_name=self.config.schema_name,
238 prefix=self.config.prefix,
239 namespace=self.config.namespace,
240 htm_index_column=self.config.htm_index_column,
241 enable_replica=self.config.use_insert_id,
242 )
243
244 if self._metadata.table_exists():
245 self._versionCheck(self._metadata)
246
247 self.pixelator = HtmPixelization(self.config.htm_level)
248
249 _LOG.debug("APDB Configuration:")
250 _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
251 _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
252 _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
253 _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
254 _LOG.debug(" schema_file: %s", self.config.schema_file)
255 _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
256 _LOG.debug(" schema prefix: %s", self.config.prefix)
257

Member Function Documentation

◆ _add_obj_htm_index()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._add_obj_htm_index ( self,
pandas.DataFrame df )
protected
Calculate HTM index for each record and add it to a DataFrame.

Notes
-----
This overrides any existing column in a DataFrame with the same name
(pixelId). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1103 of file apdbSql.py.

1103 def _add_obj_htm_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1104 """Calculate HTM index for each record and add it to a DataFrame.
1105
1106 Notes
1107 -----
1108 This overrides any existing column in a DataFrame with the same name
1109 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1110 returned.
1111 """
1112 # calculate HTM index for every DiaObject
1113 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1114 ra_col, dec_col = self.config.ra_dec_columns
1115 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1116 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1117 idx = self.pixelator.index(uv3d)
1118 htm_index[i] = idx
1119 df = df.copy()
1120 df[self.config.htm_index_column] = htm_index
1121 return df
1122

◆ _add_src_htm_index()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._add_src_htm_index ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add pixelId column to DiaSource catalog.

Notes
-----
This method copies pixelId value from a matching DiaObject record.
DiaObject catalog needs to have a pixelId column filled by
``_add_obj_htm_index`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(pixelId). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1123 of file apdbSql.py.

1123 def _add_src_htm_index(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1124 """Add pixelId column to DiaSource catalog.
1125
1126 Notes
1127 -----
1128 This method copies pixelId value from a matching DiaObject record.
1129 DiaObject catalog needs to have a pixelId column filled by
1130 ``_add_obj_htm_index`` method and DiaSource records need to be
1131 associated to DiaObjects via ``diaObjectId`` column.
1132
1133 This overrides any existing column in a DataFrame with the same name
1134 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1135 returned.
1136 """
1137 pixel_id_map: dict[int, int] = {
1138 diaObjectId: pixelId
1139 for diaObjectId, pixelId in zip(objs["diaObjectId"], objs[self.config.htm_index_column])
1140 }
1141 # DiaSources associated with SolarSystemObjects do not have an
1142 # associated DiaObject hence we skip them and set their htmIndex
1143 # value to 0.
1144 pixel_id_map[0] = 0
1145 htm_index = np.zeros(sources.shape[0], dtype=np.int64)
1146 for i, diaObjId in enumerate(sources["diaObjectId"]):
1147 htm_index[i] = pixel_id_map[diaObjId]
1148 sources = sources.copy()
1149 sources[self.config.htm_index_column] = htm_index
1150 return sources

◆ _filterRegion()

sql.ColumnElement lsst.dax.apdb.sql.apdbSql.ApdbSql._filterRegion ( self,
sqlalchemy.schema.Table table,
Region region )
protected
Make SQLAlchemy expression for selecting records in a region.

Definition at line 1089 of file apdbSql.py.

1089 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1090 """Make SQLAlchemy expression for selecting records in a region."""
1091 htm_index_column = table.columns[self.config.htm_index_column]
1092 exprlist = []
1093 pixel_ranges = self._htm_indices(region)
1094 for low, upper in pixel_ranges:
1095 upper -= 1
1096 if low == upper:
1097 exprlist.append(htm_index_column == low)
1098 else:
1099 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1100
1101 return sql.expression.or_(*exprlist)
1102

◆ _getDiaSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesByIDs ( self,
list[int] object_ids,
astropy.time.Time visit_time )
protected
Return catalog of DiaSource instances given set of DiaObject IDs.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog contaning DiaSource records.

Definition at line 771 of file apdbSql.py.

771 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
772 """Return catalog of DiaSource instances given set of DiaObject IDs.
773
774 Parameters
775 ----------
776 object_ids :
777 Collection of DiaObject IDs
778 visit_time : `astropy.time.Time`
779 Time of the current visit.
780
781 Returns
782 -------
783 catalog : `pandas.DataFrame`
784 Catalog contaning DiaSource records.
785 """
786 # TODO: DateTime.MJD must be consistent with code in ap_association,
787 # alternatively we can fill midpointMjdTai ourselves in store()
788 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
789 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
790
791 with Timer("DiaSource select", self.config.timer):
792 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
793
794 _LOG.debug("found %s DiaSources", len(sources))
795 return sources
796

◆ _getDiaSourcesInRegion()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesInRegion ( self,
Region region,
astropy.time.Time visit_time )
protected
Return catalog of DiaSource instances from given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaSource records.

Definition at line 735 of file apdbSql.py.

735 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
736 """Return catalog of DiaSource instances from given region.
737
738 Parameters
739 ----------
740 region : `lsst.sphgeom.Region`
741 Region to search for DIASources.
742 visit_time : `astropy.time.Time`
743 Time of the current visit.
744
745 Returns
746 -------
747 catalog : `pandas.DataFrame`
748 Catalog containing DiaSource records.
749 """
750 # TODO: DateTime.MJD must be consistent with code in ap_association,
751 # alternatively we can fill midpointMjdTai ourselves in store()
752 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
753 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
754
755 table = self._schema.get_table(ApdbTables.DiaSource)
756 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
757 query = sql.select(*columns)
758
759 # build selection
760 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
761 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
762 query = query.where(where)
763
764 # execute select
765 with Timer("DiaSource select", self.config.timer):
766 with self._engine.begin() as conn:
767 sources = pandas.read_sql_query(query, conn)
768 _LOG.debug("found %s DiaSources", len(sources))
769 return sources
770

◆ _getSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getSourcesByIDs ( self,
ApdbTables table_enum,
list[int] object_ids,
float midpointMjdTai_start )
protected
Return catalog of DiaSource or DiaForcedSource instances given set
of DiaObject IDs.

Parameters
----------
table : `sqlalchemy.schema.Table`
    Database table.
object_ids :
    Collection of DiaObject IDs
midpointMjdTai_start : `float`
    Earliest midpointMjdTai to retrieve.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog contaning DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 797 of file apdbSql.py.

799 ) -> pandas.DataFrame:
800 """Return catalog of DiaSource or DiaForcedSource instances given set
801 of DiaObject IDs.
802
803 Parameters
804 ----------
805 table : `sqlalchemy.schema.Table`
806 Database table.
807 object_ids :
808 Collection of DiaObject IDs
809 midpointMjdTai_start : `float`
810 Earliest midpointMjdTai to retrieve.
811
812 Returns
813 -------
814 catalog : `pandas.DataFrame`
815 Catalog contaning DiaSource records. `None` is returned if
816 ``read_sources_months`` configuration parameter is set to 0 or
817 when ``object_ids`` is empty.
818 """
819 table = self._schema.get_table(table_enum)
820 columns = self._schema.get_apdb_columns(table_enum)
821
822 sources: pandas.DataFrame | None = None
823 if len(object_ids) <= 0:
824 _LOG.debug("ID list is empty, just fetch empty result")
825 query = sql.select(*columns).where(sql.literal(False))
826 with self._engine.begin() as conn:
827 sources = pandas.read_sql_query(query, conn)
828 else:
829 data_frames: list[pandas.DataFrame] = []
830 for ids in chunk_iterable(sorted(object_ids), 1000):
831 query = sql.select(*columns)
832
833 # Some types like np.int64 can cause issues with
834 # sqlalchemy, convert them to int.
835 int_ids = [int(oid) for oid in ids]
836
837 # select by object id
838 query = query.where(
839 sql.expression.and_(
840 table.columns["diaObjectId"].in_(int_ids),
841 table.columns["midpointMjdTai"] > midpointMjdTai_start,
842 )
843 )
844
845 # execute select
846 with self._engine.begin() as conn:
847 data_frames.append(pandas.read_sql_query(query, conn))
848
849 if len(data_frames) == 1:
850 sources = data_frames[0]
851 else:
852 sources = pandas.concat(data_frames)
853 assert sources is not None, "Catalog cannot be None"
854 return sources
855

◆ _htm_indices()

list[tuple[int, int]] lsst.dax.apdb.sql.apdbSql.ApdbSql._htm_indices ( self,
Region region )
protected
Generate a set of HTM indices covering specified region.

Parameters
----------
region: `sphgeom.Region`
    Region that needs to be indexed.

Returns
-------
Sequence of ranges, range is a tuple (minHtmID, maxHtmID).

Definition at line 1072 of file apdbSql.py.

1072 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1073 """Generate a set of HTM indices covering specified region.
1074
1075 Parameters
1076 ----------
1077 region: `sphgeom.Region`
1078 Region that needs to be indexed.
1079
1080 Returns
1081 -------
1082 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1083 """
1084 _LOG.debug("region: %s", region)
1085 indices = self.pixelator.envelope(region, self.config.htm_max_ranges)
1086
1087 return indices.ranges()
1088

◆ _makeEngine()

sqlalchemy.engine.Engine lsst.dax.apdb.sql.apdbSql.ApdbSql._makeEngine ( cls,
ApdbSqlConfig config )
protected
Make SQLALchemy engine based on configured parameters.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 259 of file apdbSql.py.

259 def _makeEngine(cls, config: ApdbSqlConfig) -> sqlalchemy.engine.Engine:
260 """Make SQLALchemy engine based on configured parameters.
261
262 Parameters
263 ----------
264 config : `ApdbSqlConfig`
265 Configuration object.
266 """
267 # engine is reused between multiple processes, make sure that we don't
268 # share connections by disabling pool (by using NullPool class)
269 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
270 conn_args: dict[str, Any] = dict()
271 if not config.connection_pool:
272 kw.update(poolclass=NullPool)
273 if config.isolation_level is not None:
274 kw.update(isolation_level=config.isolation_level)
275 elif config.db_url.startswith("sqlite"): # type: ignore
276 # Use READ_UNCOMMITTED as default value for sqlite.
277 kw.update(isolation_level="READ_UNCOMMITTED")
278 if config.connection_timeout is not None:
279 if config.db_url.startswith("sqlite"):
280 conn_args.update(timeout=config.connection_timeout)
281 elif config.db_url.startswith(("postgresql", "mysql")):
282 conn_args.update(connect_timeout=config.connection_timeout)
283 kw.update(connect_args=conn_args)
284 engine = sqlalchemy.create_engine(config.db_url, **kw)
285
286 if engine.dialect.name == "sqlite":
287 # Need to enable foreign keys on every new connection.
288 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
289
290 return engine
291

◆ _makeSchema()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._makeSchema ( cls,
ApdbConfig config,
bool drop = False )
protected

Definition at line 464 of file apdbSql.py.

464 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
465 # docstring is inherited from a base class
466
467 if not isinstance(config, ApdbSqlConfig):
468 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
469
470 engine = cls._makeEngine(config)
471
472 # Ask schema class to create all tables.
473 schema = ApdbSqlSchema(
474 engine=engine,
475 dia_object_index=config.dia_object_index,
476 schema_file=config.schema_file,
477 schema_name=config.schema_name,
478 prefix=config.prefix,
479 namespace=config.namespace,
480 htm_index_column=config.htm_index_column,
481 enable_replica=config.use_insert_id,
482 )
483 schema.makeSchema(drop=drop)
484
485 # Need metadata table to store few items in it, if table exists.
486 meta_table: sqlalchemy.schema.Table | None = None
487 with suppress(ValueError):
488 meta_table = schema.get_table(ApdbTables.metadata)
489
490 apdb_meta = ApdbMetadataSql(engine, meta_table)
491 if apdb_meta.table_exists():
492 # Fill version numbers, overwrite if they are already there.
493 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
494 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
495 if config.use_insert_id:
496 # Only store replica code version if replcia is enabled.
497 apdb_meta.set(
498 cls.metadataReplicaVersionKey,
499 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
500 force=True,
501 )
502
503 # Store frozen part of a configuration in metadata.
504 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
505 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
506

◆ _storeDiaForcedSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaForcedSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store a set of DiaForcedSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaForcedSource records

Definition at line 1039 of file apdbSql.py.

1044 ) -> None:
1045 """Store a set of DiaForcedSources from current visit.
1046
1047 Parameters
1048 ----------
1049 sources : `pandas.DataFrame`
1050 Catalog containing DiaForcedSource records
1051 """
1052 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1053
1054 # Insert replica data
1055 replica_data: list[dict] = []
1056 replica_stmt: Any = None
1057 if replica_chunk is not None:
1058 pk_names = [column.name for column in table.primary_key]
1059 replica_data = sources[pk_names].to_dict("records")
1060 for row in replica_data:
1061 row["apdb_replica_chunk"] = replica_chunk.id
1062 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1063 replica_stmt = replica_table.insert()
1064
1065 # everything to be done in single transaction
1066 with Timer("DiaForcedSource insert", self.config.timer):
1067 sources = _coerce_uint64(sources)
1068 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1069 if replica_stmt is not None:
1070 connection.execute(replica_stmt, replica_data)
1071

◆ _storeDiaObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaObjects ( self,
pandas.DataFrame objs,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaObjects from current visit.

Parameters
----------
objs : `pandas.DataFrame`
    Catalog with DiaObject records.
visit_time : `astropy.time.Time`
    Time of the visit.
replica_chunk : `ReplicaChunk`
    Insert identifier.

Definition at line 880 of file apdbSql.py.

886 ) -> None:
887 """Store catalog of DiaObjects from current visit.
888
889 Parameters
890 ----------
891 objs : `pandas.DataFrame`
892 Catalog with DiaObject records.
893 visit_time : `astropy.time.Time`
894 Time of the visit.
895 replica_chunk : `ReplicaChunk`
896 Insert identifier.
897 """
898 if len(objs) == 0:
899 _LOG.debug("No objects to write to database.")
900 return
901
902 # Some types like np.int64 can cause issues with sqlalchemy, convert
903 # them to int.
904 ids = sorted(int(oid) for oid in objs["diaObjectId"])
905 _LOG.debug("first object ID: %d", ids[0])
906
907 # TODO: Need to verify that we are using correct scale here for
908 # DATETIME representation (see DM-31996).
909 dt = visit_time.datetime
910
911 # everything to be done in single transaction
912 if self.config.dia_object_index == "last_object_table":
913 # Insert and replace all records in LAST table.
914 table = self._schema.get_table(ApdbTables.DiaObjectLast)
915
916 # Drop the previous objects (pandas cannot upsert).
917 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
918
919 with Timer(table.name + " delete", self.config.timer):
920 res = connection.execute(query)
921 _LOG.debug("deleted %s objects", res.rowcount)
922
923 # DiaObjectLast is a subset of DiaObject, strip missing columns
924 last_column_names = [column.name for column in table.columns]
925 last_objs = objs[last_column_names]
926 last_objs = _coerce_uint64(last_objs)
927
928 if "lastNonForcedSource" in last_objs.columns:
929 # lastNonForcedSource is defined NOT NULL, fill it with visit
930 # time just in case.
931 last_objs["lastNonForcedSource"].fillna(dt, inplace=True)
932 else:
933 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
934 last_objs.set_index(extra_column.index, inplace=True)
935 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
936
937 with Timer("DiaObjectLast insert", self.config.timer):
938 last_objs.to_sql(
939 table.name,
940 connection,
941 if_exists="append",
942 index=False,
943 schema=table.schema,
944 )
945 else:
946 # truncate existing validity intervals
947 table = self._schema.get_table(ApdbTables.DiaObject)
948
949 update = (
950 table.update()
951 .values(validityEnd=dt)
952 .where(
953 sql.expression.and_(
954 table.columns["diaObjectId"].in_(ids),
955 table.columns["validityEnd"].is_(None),
956 )
957 )
958 )
959
960 # _LOG.debug("query: %s", query)
961
962 with Timer(table.name + " truncate", self.config.timer):
963 res = connection.execute(update)
964 _LOG.debug("truncated %s intervals", res.rowcount)
965
966 objs = _coerce_uint64(objs)
967
968 # Fill additional columns
969 extra_columns: list[pandas.Series] = []
970 if "validityStart" in objs.columns:
971 objs["validityStart"] = dt
972 else:
973 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
974 if "validityEnd" in objs.columns:
975 objs["validityEnd"] = None
976 else:
977 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
978 if "lastNonForcedSource" in objs.columns:
979 # lastNonForcedSource is defined NOT NULL, fill it with visit time
980 # just in case.
981 objs["lastNonForcedSource"].fillna(dt, inplace=True)
982 else:
983 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
984 if extra_columns:
985 objs.set_index(extra_columns[0].index, inplace=True)
986 objs = pandas.concat([objs] + extra_columns, axis="columns")
987
988 # Insert replica data
989 table = self._schema.get_table(ApdbTables.DiaObject)
990 replica_data: list[dict] = []
991 replica_stmt: Any = None
992 if replica_chunk is not None:
993 pk_names = [column.name for column in table.primary_key]
994 replica_data = objs[pk_names].to_dict("records")
995 for row in replica_data:
996 row["apdb_replica_chunk"] = replica_chunk.id
997 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
998 replica_stmt = replica_table.insert()
999
1000 # insert new versions
1001 with Timer("DiaObject insert", self.config.timer):
1002 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1003 if replica_stmt is not None:
1004 connection.execute(replica_stmt, replica_data)
1005

◆ _storeDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaSource records

Definition at line 1006 of file apdbSql.py.

1011 ) -> None:
1012 """Store catalog of DiaSources from current visit.
1013
1014 Parameters
1015 ----------
1016 sources : `pandas.DataFrame`
1017 Catalog containing DiaSource records
1018 """
1019 table = self._schema.get_table(ApdbTables.DiaSource)
1020
1021 # Insert replica data
1022 replica_data: list[dict] = []
1023 replica_stmt: Any = None
1024 if replica_chunk is not None:
1025 pk_names = [column.name for column in table.primary_key]
1026 replica_data = sources[pk_names].to_dict("records")
1027 for row in replica_data:
1028 row["apdb_replica_chunk"] = replica_chunk.id
1029 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1030 replica_stmt = replica_table.insert()
1031
1032 # everything to be done in single transaction
1033 with Timer("DiaSource insert", self.config.timer):
1034 sources = _coerce_uint64(sources)
1035 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1036 if replica_stmt is not None:
1037 connection.execute(replica_stmt, replica_data)
1038

◆ _storeReplicaChunk()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeReplicaChunk ( self,
ReplicaChunk replica_chunk,
astropy.time.Time visit_time,
sqlalchemy.engine.Connection connection )
protected

Definition at line 856 of file apdbSql.py.

861 ) -> None:
862 dt = visit_time.datetime
863
864 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
865
866 # We need UPSERT which is dialect-specific construct
867 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
868 row = {"apdb_replica_chunk": replica_chunk.id} | values
869 if connection.dialect.name == "sqlite":
870 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
871 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
872 connection.execute(insert_sqlite, row)
873 elif connection.dialect.name == "postgresql":
874 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
875 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
876 connection.execute(insert_pg, row)
877 else:
878 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
879

◆ _versionCheck()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._versionCheck ( self,
ApdbMetadataSql metadata )
protected
Check schema version compatibility.

Definition at line 292 of file apdbSql.py.

292 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
293 """Check schema version compatibility."""
294
295 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
296 """Retrieve version number from given metadata key."""
297 if metadata.table_exists():
298 version_str = metadata.get(key)
299 if version_str is None:
300 # Should not happen with existing metadata table.
301 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
302 return VersionTuple.fromString(version_str)
303 return default
304
305 # For old databases where metadata table does not exist we assume that
306 # version of both code and schema is 0.1.0.
307 initial_version = VersionTuple(0, 1, 0)
308 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
309 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
310
311 # For now there is no way to make read-only APDB instances, assume that
312 # any access can do updates.
313 if not self._schema.schemaVersion().checkCompatibility(db_schema_version, True):
314 raise IncompatibleVersionError(
315 f"Configured schema version {self._schema.schemaVersion()} "
316 f"is not compatible with database version {db_schema_version}"
317 )
318 if not self.apdbImplementationVersion().checkCompatibility(db_code_version, True):
319 raise IncompatibleVersionError(
320 f"Current code version {self.apdbImplementationVersion()} "
321 f"is not compatible with database version {db_code_version}"
322 )
323
324 # Check replica code version only if replica is enabled.
325 if self._schema.has_replica_chunks:
326 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
327 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
328 if not code_replica_version.checkCompatibility(db_replica_version, True):
329 raise IncompatibleVersionError(
330 f"Current replication code version {code_replica_version} "
331 f"is not compatible with database version {db_replica_version}"
332 )
333

◆ apdbImplementationVersion()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql.apdbImplementationVersion ( cls)
Return version number for current APDB implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 335 of file apdbSql.py.

335 def apdbImplementationVersion(cls) -> VersionTuple:
336 # Docstring inherited from base class.
337 return VERSION
338

◆ apdbSchemaVersion()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql.apdbSchemaVersion ( self)
Return schema version number as defined in config file.

Returns
-------
version : `VersionTuple`
    Version of the schema defined in schema config file.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 427 of file apdbSql.py.

427 def apdbSchemaVersion(self) -> VersionTuple:
428 # Docstring inherited from base class.
429 return self._schema.schemaVersion()
430

◆ containsCcdVisit()

bool lsst.dax.apdb.sql.apdbSql.ApdbSql.containsCcdVisit ( self,
int ccdVisitId )
Test whether data for a given visit-detector is present in the APDB.

This method is a placeholder until `Apdb.containsVisitDetector` can
be implemented.

Parameters
----------
ccdVisitId : `int`
    The packed ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaSource records exist for the specified
    observation, `False` otherwise.

Definition at line 581 of file apdbSql.py.

581 def containsCcdVisit(self, ccdVisitId: int) -> bool:
582 """Test whether data for a given visit-detector is present in the APDB.
583
584 This method is a placeholder until `Apdb.containsVisitDetector` can
585 be implemented.
586
587 Parameters
588 ----------
589 ccdVisitId : `int`
590 The packed ID of the visit-detector to search for.
591
592 Returns
593 -------
594 present : `bool`
595 `True` if some DiaSource records exist for the specified
596 observation, `False` otherwise.
597 """
598 # TODO: remove this method in favor of containsVisitDetector on either
599 # DM-41671 or a ticket that removes ccdVisitId from these tables
600 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
601 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
602 # Query should load only one leaf page of the index
603 query1 = sql.select(src_table.c.ccdVisitId).filter_by(ccdVisitId=ccdVisitId).limit(1)
604 # Backup query in case an image was processed but had no diaSources
605 query2 = sql.select(frcsrc_table.c.ccdVisitId).filter_by(ccdVisitId=ccdVisitId).limit(1)
606
607 with self._engine.begin() as conn:
608 result = conn.execute(query1).scalar_one_or_none()
609 if result is not None:
610 return True
611 else:
612 result = conn.execute(query2).scalar_one_or_none()
613 return result is not None
614

◆ containsVisitDetector()

bool lsst.dax.apdb.sql.apdbSql.ApdbSql.containsVisitDetector ( self,
int visit,
int detector )
Test whether data for a given visit-detector is present in the APDB.

Parameters
----------
visit, detector : `int`
    The ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaObject, DiaSource, or DiaForcedSource records
    exist for the specified observation, `False` otherwise.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 577 of file apdbSql.py.

577 def containsVisitDetector(self, visit: int, detector: int) -> bool:
578 # docstring is inherited from a base class
579 raise NotImplementedError()
580

◆ countUnassociatedObjects()

int lsst.dax.apdb.sql.apdbSql.ApdbSql.countUnassociatedObjects ( self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 712 of file apdbSql.py.

712 def countUnassociatedObjects(self) -> int:
713 # docstring is inherited from a base class
714
715 # Retrieve the DiaObject table.
716 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
717
718 # Construct the sql statement.
719 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
720 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
721
722 # Return the count.
723 with self._engine.begin() as conn:
724 count = conn.execute(stmt).scalar_one()
725
726 return count
727

◆ dailyJob()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.dailyJob ( self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 708 of file apdbSql.py.

708 def dailyJob(self) -> None:
709 # docstring is inherited from a base class
710 pass
711

◆ get_replica()

ApdbSqlReplica lsst.dax.apdb.sql.apdbSql.ApdbSql.get_replica ( self)
Return `ApdbReplica` instance for this database.

Definition at line 431 of file apdbSql.py.

431 def get_replica(self) -> ApdbSqlReplica:
432 """Return `ApdbReplica` instance for this database."""
433 return ApdbSqlReplica(self._schema, self._engine)
434

◆ getDiaForcedSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaForcedSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained. Some implementations may not support latter case.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_forced_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 552 of file apdbSql.py.

554 ) -> pandas.DataFrame | None:
555 # docstring is inherited from a base class
556 if self.config.read_forced_sources_months == 0:
557 _LOG.debug("Skip DiaForceSources fetching")
558 return None
559
560 if object_ids is None:
561 # This implementation does not support region-based selection.
562 raise NotImplementedError("Region-based selection is not supported")
563
564 # TODO: DateTime.MJD must be consistent with code in ap_association,
565 # alternatively we can fill midpointMjdTai ourselves in store()
566 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
567 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
568
569 with Timer("DiaForcedSource select", self.config.timer):
570 sources = self._getSourcesByIDs(
571 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
572 )
573
574 _LOG.debug("found %s DiaForcedSources", len(sources))
575 return sources
576

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaObjects ( self,
Region region )
Return catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 507 of file apdbSql.py.

507 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
508 # docstring is inherited from a base class
509
510 # decide what columns we need
511 if self.config.dia_object_index == "last_object_table":
512 table_enum = ApdbTables.DiaObjectLast
513 else:
514 table_enum = ApdbTables.DiaObject
515 table = self._schema.get_table(table_enum)
516 if not self.config.dia_object_columns:
517 columns = self._schema.get_apdb_columns(table_enum)
518 else:
519 columns = [table.c[col] for col in self.config.dia_object_columns]
520 query = sql.select(*columns)
521
522 # build selection
523 query = query.where(self._filterRegion(table, region))
524
525 # select latest version of objects
526 if self.config.dia_object_index != "last_object_table":
527 query = query.where(table.c.validityEnd == None) # noqa: E711
528
529 # _LOG.debug("query: %s", query)
530
531 # execute select
532 with Timer("DiaObject select", self.config.timer):
533 with self._engine.begin() as conn:
534 objects = pandas.read_sql_query(query, conn)
535 _LOG.debug("found %s DiaObjects", len(objects))
536 return objects
537

◆ getDiaSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 538 of file apdbSql.py.

540 ) -> pandas.DataFrame | None:
541 # docstring is inherited from a base class
542 if self.config.read_sources_months == 0:
543 _LOG.debug("Skip DiaSources fetching")
544 return None
545
546 if object_ids is None:
547 # region-based select
548 return self._getDiaSourcesInRegion(region, visit_time)
549 else:
550 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
551

◆ getSSObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getSSObjects ( self)
Return catalog of SSObject instances.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing SSObject records, all existing records are
    returned.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 615 of file apdbSql.py.

615 def getSSObjects(self) -> pandas.DataFrame:
616 # docstring is inherited from a base class
617
618 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
619 query = sql.select(*columns)
620
621 # execute select
622 with Timer("DiaObject select", self.config.timer):
623 with self._engine.begin() as conn:
624 objects = pandas.read_sql_query(query, conn)
625 _LOG.debug("found %s SSObjects", len(objects))
626 return objects
627

◆ init_database()

ApdbSqlConfig lsst.dax.apdb.sql.apdbSql.ApdbSql.init_database ( cls,
str db_url,
*str | None schema_file = None,
str | None schema_name = None,
int | None read_sources_months = None,
int | None read_forced_sources_months = None,
bool use_insert_id = False,
int | None connection_timeout = None,
str | None dia_object_index = None,
int | None htm_level = None,
str | None htm_index_column = None,
list[str] | None ra_dec_columns = None,
str | None prefix = None,
str | None namespace = None,
bool drop = False )
Initialize new APDB instance and make configuration object for it.

Parameters
----------
db_url : `str`
    SQLAlchemy database URL.
schema_file : `str`, optional
    Location of (YAML) configuration file with APDB schema. If not
    specified then default location will be used.
schema_name : str | None
    Name of the schema in YAML configuration file. If not specified
    then default name will be used.
read_sources_months : `int`, optional
    Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
    Number of months of history to read from DiaForcedSource.
use_insert_id : `bool`
    If True, make additional tables used for replication to PPDB.
connection_timeout : `int`, optional
    Database connection timeout in seconds.
dia_object_index : `str`, optional
    Indexing mode for DiaObject table.
htm_level : `int`, optional
    HTM indexing level.
htm_index_column : `str`, optional
    Name of a HTM index column for DiaObject and DiaSource tables.
ra_dec_columns : `list` [`str`], optional
    Names of ra/dec columns in DiaObject table.
prefix : `str`, optional
    Optional prefix for all table names.
namespace : `str`, optional
    Name of the database schema for all APDB tables. If not specified
    then default schema is used.
drop : `bool`, optional
    If `True` then drop existing tables before re-creating the schema.

Returns
-------
config : `ApdbSqlConfig`
    Resulting configuration object for a created APDB instance.

Definition at line 340 of file apdbSql.py.

357 ) -> ApdbSqlConfig:
358 """Initialize new APDB instance and make configuration object for it.
359
360 Parameters
361 ----------
362 db_url : `str`
363 SQLAlchemy database URL.
364 schema_file : `str`, optional
365 Location of (YAML) configuration file with APDB schema. If not
366 specified then default location will be used.
367 schema_name : str | None
368 Name of the schema in YAML configuration file. If not specified
369 then default name will be used.
370 read_sources_months : `int`, optional
371 Number of months of history to read from DiaSource.
372 read_forced_sources_months : `int`, optional
373 Number of months of history to read from DiaForcedSource.
374 use_insert_id : `bool`
375 If True, make additional tables used for replication to PPDB.
376 connection_timeout : `int`, optional
377 Database connection timeout in seconds.
378 dia_object_index : `str`, optional
379 Indexing mode for DiaObject table.
380 htm_level : `int`, optional
381 HTM indexing level.
382 htm_index_column : `str`, optional
383 Name of a HTM index column for DiaObject and DiaSource tables.
384 ra_dec_columns : `list` [`str`], optional
385 Names of ra/dec columns in DiaObject table.
386 prefix : `str`, optional
387 Optional prefix for all table names.
388 namespace : `str`, optional
389 Name of the database schema for all APDB tables. If not specified
390 then default schema is used.
391 drop : `bool`, optional
392 If `True` then drop existing tables before re-creating the schema.
393
394 Returns
395 -------
396 config : `ApdbSqlConfig`
397 Resulting configuration object for a created APDB instance.
398 """
399 config = ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
400 if schema_file is not None:
401 config.schema_file = schema_file
402 if schema_name is not None:
403 config.schema_name = schema_name
404 if read_sources_months is not None:
405 config.read_sources_months = read_sources_months
406 if read_forced_sources_months is not None:
407 config.read_forced_sources_months = read_forced_sources_months
408 if connection_timeout is not None:
409 config.connection_timeout = connection_timeout
410 if dia_object_index is not None:
411 config.dia_object_index = dia_object_index
412 if htm_level is not None:
413 config.htm_level = htm_level
414 if htm_index_column is not None:
415 config.htm_index_column = htm_index_column
416 if ra_dec_columns is not None:
417 config.ra_dec_columns = ra_dec_columns
418 if prefix is not None:
419 config.prefix = prefix
420 if namespace is not None:
421 config.namespace = namespace
422
423 cls._makeSchema(config, drop=drop)
424
425 return config
426

◆ metadata()

ApdbMetadata lsst.dax.apdb.sql.apdbSql.ApdbSql.metadata ( self)
Object controlling access to APDB metadata (`ApdbMetadata`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 729 of file apdbSql.py.

729 def metadata(self) -> ApdbMetadata:
730 # docstring is inherited from a base class
731 if self._metadata is None:
732 raise RuntimeError("Database schema was not initialized.")
733 return self._metadata
734

◆ reassignDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.reassignDiaSources ( self,
Mapping[int, int] idMap )
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.

Parameters
----------
idMap : `Mapping`
    Maps DiaSource IDs to their new SSObject IDs.

Raises
------
ValueError
    Raised if DiaSource ID does not exist in the database.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 688 of file apdbSql.py.

688 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
689 # docstring is inherited from a base class
690
691 table = self._schema.get_table(ApdbTables.DiaSource)
692 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
693
694 with self._engine.begin() as conn:
695 # Need to make sure that every ID exists in the database, but
696 # executemany may not support rowcount, so iterate and check what
697 # is missing.
698 missing_ids: list[int] = []
699 for key, value in idMap.items():
700 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
701 result = conn.execute(query, params)
702 if result.rowcount == 0:
703 missing_ids.append(key)
704 if missing_ids:
705 missing = ",".join(str(item) for item in missing_ids)
706 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
707

◆ store()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.store ( self,
astropy.time.Time visit_time,
pandas.DataFrame objects,
pandas.DataFrame | None sources = None,
pandas.DataFrame | None forced_sources = None )
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `astropy.time.Time`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 628 of file apdbSql.py.

634 ) -> None:
635 # docstring is inherited from a base class
636
637 # We want to run all inserts in one transaction.
638 with self._engine.begin() as connection:
639 replica_chunk: ReplicaChunk | None = None
640 if self._schema.has_replica_chunks:
641 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
642 self._storeReplicaChunk(replica_chunk, visit_time, connection)
643
644 # fill pixelId column for DiaObjects
645 objects = self._add_obj_htm_index(objects)
646 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
647
648 if sources is not None:
649 # copy pixelId column from DiaObjects to DiaSources
650 sources = self._add_src_htm_index(sources, objects)
651 self._storeDiaSources(sources, replica_chunk, connection)
652
653 if forced_sources is not None:
654 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
655

◆ storeSSObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.storeSSObjects ( self,
pandas.DataFrame objects )
Store or update SSObject catalog.

Parameters
----------
objects : `pandas.DataFrame`
    Catalog with SSObject records.

Notes
-----
If SSObjects with matching IDs already exist in the database, their
records will be updated with the information from provided records.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 656 of file apdbSql.py.

656 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
657 # docstring is inherited from a base class
658
659 idColumn = "ssObjectId"
660 table = self._schema.get_table(ApdbTables.SSObject)
661
662 # everything to be done in single transaction
663 with self._engine.begin() as conn:
664 # Find record IDs that already exist. Some types like np.int64 can
665 # cause issues with sqlalchemy, convert them to int.
666 ids = sorted(int(oid) for oid in objects[idColumn])
667
668 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
669 result = conn.execute(query)
670 knownIds = set(row.ssObjectId for row in result)
671
672 filter = objects[idColumn].isin(knownIds)
673 toUpdate = cast(pandas.DataFrame, objects[filter])
674 toInsert = cast(pandas.DataFrame, objects[~filter])
675
676 # insert new records
677 if len(toInsert) > 0:
678 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
679
680 # update existing records
681 if len(toUpdate) > 0:
682 whereKey = f"{idColumn}_param"
683 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
684 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
685 values = toUpdate.to_dict("records")
686 result = conn.execute(update, values)
687
daf::base::PropertySet * set
Definition fits.cc:931

◆ tableDef()

Table | None lsst.dax.apdb.sql.apdbSql.ApdbSql.tableDef ( self,
ApdbTables table )
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `.schema_model.Table` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 459 of file apdbSql.py.

459 def tableDef(self, table: ApdbTables) -> Table | None:
460 # docstring is inherited from a base class
461 return self._schema.tableSchemas.get(table)
462

◆ tableRowCount()

dict[str, int] lsst.dax.apdb.sql.apdbSql.ApdbSql.tableRowCount ( self)
Return dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 435 of file apdbSql.py.

435 def tableRowCount(self) -> dict[str, int]:
436 """Return dictionary with the table names and row counts.
437
438 Used by ``ap_proto`` to keep track of the size of the database tables.
439 Depending on database technology this could be expensive operation.
440
441 Returns
442 -------
443 row_counts : `dict`
444 Dict where key is a table name and value is a row count.
445 """
446 res = {}
447 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
448 if self.config.dia_object_index == "last_object_table":
449 tables.append(ApdbTables.DiaObjectLast)
450 with self._engine.begin() as conn:
451 for table in tables:
452 sa_table = self._schema.get_table(table)
453 stmt = sql.select(func.count()).select_from(sa_table)
454 count: int = conn.execute(stmt).scalar_one()
455 res[table.name] = count
456
457 return res
458

Member Data Documentation

◆ _engine

lsst.dax.apdb.sql.apdbSql.ApdbSql._engine
protected

Definition at line 213 of file apdbSql.py.

◆ _frozen_parameters

tuple lsst.dax.apdb.sql.apdbSql.ApdbSql._frozen_parameters
staticprotected
Initial value:
= (
"use_insert_id",
"dia_object_index",
"htm_level",
"htm_index_column",
"ra_dec_columns",
)

Definition at line 203 of file apdbSql.py.

◆ _metadata

lsst.dax.apdb.sql.apdbSql.ApdbSql._metadata
protected

Definition at line 221 of file apdbSql.py.

◆ _schema

lsst.dax.apdb.sql.apdbSql.ApdbSql._schema
protected

Definition at line 233 of file apdbSql.py.

◆ config

lsst.dax.apdb.sql.apdbSql.ApdbSql.config

Definition at line 228 of file apdbSql.py.

◆ ConfigClass

lsst.dax.apdb.sql.apdbSql.ApdbSql.ConfigClass = ApdbSqlConfig
static

Definition at line 189 of file apdbSql.py.

◆ metadataCodeVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataCodeVersionKey = "version:ApdbSql"
static

Definition at line 194 of file apdbSql.py.

◆ metadataCodeVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataCodeVersionKey

Definition at line 494 of file apdbSql.py.

◆ metadataConfigKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataConfigKey = "config:apdb-sql.json"
static

Definition at line 200 of file apdbSql.py.

◆ metadataConfigKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataConfigKey

Definition at line 505 of file apdbSql.py.

◆ metadataReplicaVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataReplicaVersionKey = "version:ApdbSqlReplica"
static

Definition at line 197 of file apdbSql.py.

◆ metadataReplicaVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataReplicaVersionKey

Definition at line 498 of file apdbSql.py.

◆ metadataSchemaVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataSchemaVersionKey = "version:schema"
static

Definition at line 191 of file apdbSql.py.

◆ metadataSchemaVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataSchemaVersionKey

Definition at line 493 of file apdbSql.py.

◆ pixelator

lsst.dax.apdb.sql.apdbSql.ApdbSql.pixelator

Definition at line 247 of file apdbSql.py.


The documentation for this class was generated from the following file: