LSST Applications g013ef56533+2edba5e46d,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+d594c53611,g210f2d0738+d003cec0be,g25ecf2a47a+204e5a9ad5,g262e1987ae+16552835ac,g29ae962dfc+867dbde878,g2cef7863aa+aef1011c0b,g30d7c61c20+990af31dd2,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+c8e8c50746,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5b326b94bb+01d4acd216,g64539dfbff+d003cec0be,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g74acd417e5+4b21cac47b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+b19788b7f5,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+3bd86eedd0,g98a1a72a9c+67cf96cfb3,g98df359435+688a06938e,gbf99507273+8c5ae1fdc5,gc2a301910b+d003cec0be,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+4b21cac47b,ge410e46f29+f459a6810c,ge41e95a9f2+d003cec0be,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.sql.apdbSql.ApdbSql Class Reference
Inheritance diagram for lsst.dax.apdb.sql.apdbSql.ApdbSql:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

 __init__ (self, ApdbSqlConfig config)
 
VersionTuple apdbImplementationVersion (cls)
 
ApdbSqlConfig init_database (cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
 
ApdbSqlReplica get_replica (self)
 
dict[str, int] tableRowCount (self)
 
ApdbSqlConfig getConfig (self)
 
Table|None tableDef (self, ApdbTables table)
 
pandas.DataFrame getDiaObjects (self, Region region)
 
pandas.DataFrame|None getDiaSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
 
pandas.DataFrame|None getDiaForcedSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
 
bool containsVisitDetector (self, int visit, int detector, Region region, astropy.time.Time visit_time)
 
pandas.DataFrame getSSObjects (self)
 
None store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
 
None storeSSObjects (self, pandas.DataFrame objects)
 
None reassignDiaSources (self, Mapping[int, int] idMap)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ApdbMetadata metadata (self)
 
ApdbSqlAdmin admin (self)
 
Apdb from_config (cls, ApdbConfig config)
 
Apdb from_uri (cls, ResourcePathExpression uri)
 

Public Attributes

 config = freezer.update(config, config_json)
 
 pixelator = HtmPixelization(self.config.pixelization.htm_level)
 

Static Public Attributes

str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbSql"
 
str metadataReplicaVersionKey = "version:ApdbSqlReplica"
 
str metadataConfigKey = "config:apdb-sql.json"
 

Protected Member Functions

Timer _timer (self, str name, *, Mapping[str, str|int]|None tags=None)
 
sqlalchemy.engine.Engine _makeEngine (cls, ApdbSqlConfig config, *, bool create)
 
sqlalchemy.engine.URL|str _connection_url (cls, str config_url, *, bool create)
 
str _update_sqlite_url (cls, str url_string)
 
VersionTuple _versionCheck (self, ApdbMetadataSql metadata)
 
None _makeSchema (cls, ApdbConfig config, bool drop=False)
 
pandas.DataFrame _getDiaSourcesInRegion (self, Region region, float start_time_mjdTai)
 
pandas.DataFrame _getDiaSourcesByIDs (self, list[int] object_ids, float start_time_mjdTai)
 
pandas.DataFrame _getSourcesByIDs (self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
 
None _storeReplicaChunk (self, ReplicaChunk replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaForcedSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeUpdateRecords (self, Iterable[ApdbUpdateRecord] records, ReplicaChunk chunk, *, bool store_chunk=False, sqlalchemy.engine.Connection|None connection=None)
 
list[tuple[int, int]] _htm_indices (self, Region region)
 
sql.ColumnElement _filterRegion (self, sqlalchemy.schema.Table table, Region region)
 
pandas.DataFrame _add_spatial_index (self, pandas.DataFrame df)
 
pandas.DataFrame _fix_input_timestamps (self, pandas.DataFrame df)
 
pandas.DataFrame _fix_result_timestamps (self, pandas.DataFrame df)
 

Protected Attributes

sqlalchemy.engine.Engine _engine = self._makeEngine(config, create=False)
 
 _metadata = ApdbMetadataSql(self._engine, meta_table)
 
 _schema
 
VersionTuple _db_schema_version = self._versionCheck(self._metadata)
 

Static Protected Attributes

tuple _frozen_parameters
 

Detailed Description

Implementation of APDB interface based on SQL database.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbSqlConfig` configuration class. For an example of different
configurations check ``config/`` folder.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 118 of file apdbSql.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.sql.apdbSql.ApdbSql.__init__ ( self,
ApdbSqlConfig config )

Definition at line 152 of file apdbSql.py.

152 def __init__(self, config: ApdbSqlConfig):
153 self._engine = self._makeEngine(config, create=False)
154
155 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
156 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
157 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
158 self._metadata = ApdbMetadataSql(self._engine, meta_table)
159
160 # Read frozen config from metadata.
161 config_json = self._metadata.get(self.metadataConfigKey)
162 if config_json is not None:
163 # Update config from metadata.
164 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
165 self.config = freezer.update(config, config_json)
166 else:
167 self.config = config
168
169 self._schema = ApdbSqlSchema(
170 engine=self._engine,
171 dia_object_index=self.config.dia_object_index,
172 schema_file=self.config.schema_file,
173 schema_name=self.config.schema_name,
174 prefix=self.config.prefix,
175 namespace=self.config.namespace,
176 htm_index_column=self.config.pixelization.htm_index_column,
177 enable_replica=self.config.enable_replica,
178 )
179
180 self._db_schema_version = self._versionCheck(self._metadata)
181
182 self.pixelator = HtmPixelization(self.config.pixelization.htm_level)
183
184 if _LOG.isEnabledFor(logging.DEBUG):
185 _LOG.debug("ApdbSql Configuration: %s", self.config.model_dump())
186

Member Function Documentation

◆ _add_spatial_index()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._add_spatial_index ( self,
pandas.DataFrame df )
protected
Calculate spatial index for each record and add it to a DataFrame.

Parameters
----------
df : `pandas.DataFrame`
    DataFrame which has to contain ra/dec columns, names of these
    columns are defined by configuration ``ra_dec_columns`` field.

Returns
-------
df : `pandas.DataFrame`
    DataFrame with ``pixelId`` column which contains pixel index
    for ra/dec coordinates.

Notes
-----
This overrides any existing column in a DataFrame with the same name
(pixelId). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1251 of file apdbSql.py.

1251 def _add_spatial_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1252 """Calculate spatial index for each record and add it to a DataFrame.
1253
1254 Parameters
1255 ----------
1256 df : `pandas.DataFrame`
1257 DataFrame which has to contain ra/dec columns, names of these
1258 columns are defined by configuration ``ra_dec_columns`` field.
1259
1260 Returns
1261 -------
1262 df : `pandas.DataFrame`
1263 DataFrame with ``pixelId`` column which contains pixel index
1264 for ra/dec coordinates.
1265
1266 Notes
1267 -----
1268 This overrides any existing column in a DataFrame with the same name
1269 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1270 returned.
1271 """
1272 # calculate HTM index for every DiaObject
1273 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1274 ra_col, dec_col = self.config.ra_dec_columns
1275 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1276 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1277 idx = self.pixelator.index(uv3d)
1278 htm_index[i] = idx
1279 df = df.copy()
1280 df[self.config.pixelization.htm_index_column] = htm_index
1281 return df
1282

◆ _connection_url()

sqlalchemy.engine.URL | str lsst.dax.apdb.sql.apdbSql.ApdbSql._connection_url ( cls,
str config_url,
* ,
bool create )
protected
Generate a complete URL for database with proper credentials.

Parameters
----------
config_url : `str`
    Database URL as specified in configuration.
create : `bool`
    Whether to try to create new database file, only relevant for
    SQLite backend which always creates new files by default.

Returns
-------
connection_url : `sqlalchemy.engine.URL` or `str`
    Connection URL including credentials.

Definition at line 229 of file apdbSql.py.

229 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
230 """Generate a complete URL for database with proper credentials.
231
232 Parameters
233 ----------
234 config_url : `str`
235 Database URL as specified in configuration.
236 create : `bool`
237 Whether to try to create new database file, only relevant for
238 SQLite backend which always creates new files by default.
239
240 Returns
241 -------
242 connection_url : `sqlalchemy.engine.URL` or `str`
243 Connection URL including credentials.
244 """
245 # Allow 3rd party authentication mechanisms by assuming connection
246 # string is correct when we can not recognize (dialect, host, database)
247 # matching keys.
248 components = urllib.parse.urlparse(config_url)
249 if all((components.scheme is not None, components.hostname is not None, components.path is not None)):
250 try:
251 db_auth = DbAuth()
252 config_url = db_auth.getUrl(config_url)
253 except DbAuthNotFoundError:
254 # Credentials file doesn't exist or no matching credentials,
255 # use default auth.
256 pass
257
258 # SQLite has a nasty habit creating empty databases when they do not
259 # exist, tell it not to do that unless we do need to create it.
260 if not create:
261 config_url = cls._update_sqlite_url(config_url)
262
263 return config_url
264

◆ _filterRegion()

sql.ColumnElement lsst.dax.apdb.sql.apdbSql.ApdbSql._filterRegion ( self,
sqlalchemy.schema.Table table,
Region region )
protected
Make SQLAlchemy expression for selecting records in a region.

Definition at line 1237 of file apdbSql.py.

1237 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1238 """Make SQLAlchemy expression for selecting records in a region."""
1239 htm_index_column = table.columns[self.config.pixelization.htm_index_column]
1240 exprlist = []
1241 pixel_ranges = self._htm_indices(region)
1242 for low, upper in pixel_ranges:
1243 upper -= 1
1244 if low == upper:
1245 exprlist.append(htm_index_column == low)
1246 else:
1247 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1248
1249 return sql.expression.or_(*exprlist)
1250

◆ _fix_input_timestamps()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._fix_input_timestamps ( self,
pandas.DataFrame df )
protected
Update timestamp columns in input DataFrame to be aware datetime
type in in UTC.

AP pipeline generates naive datetime instances, we want them to be
aware before they go to database. All naive timestamps are assumed to
be in UTC timezone (they should be TAI).

Definition at line 1283 of file apdbSql.py.

1283 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1284 """Update timestamp columns in input DataFrame to be aware datetime
1285 type in in UTC.
1286
1287 AP pipeline generates naive datetime instances, we want them to be
1288 aware before they go to database. All naive timestamps are assumed to
1289 be in UTC timezone (they should be TAI).
1290 """
1291 # Find all columns with aware non-UTC timestamps and convert to UTC.
1292 columns = [
1293 column
1294 for column, dtype in df.dtypes.items()
1295 if isinstance(dtype, pandas.DatetimeTZDtype) and dtype.tz is not datetime.UTC
1296 ]
1297 for column in columns:
1298 df[column] = df[column].dt.tz_convert(datetime.UTC)
1299 # Find all columns with naive timestamps and add UTC timezone.
1300 columns = [
1301 column for column, dtype in df.dtypes.items() if pandas.api.types.is_datetime64_dtype(dtype)
1302 ]
1303 for column in columns:
1304 df[column] = df[column].dt.tz_localize(datetime.UTC)
1305 return df
1306

◆ _fix_result_timestamps()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._fix_result_timestamps ( self,
pandas.DataFrame df )
protected
Update timestamp columns to be naive datetime type in returned
DataFrame.

AP pipeline code expects DataFrames to contain naive datetime columns,
while Postgres queries return timezone-aware type. This method converts
those columns to naive datetime in UTC timezone.

Definition at line 1307 of file apdbSql.py.

1307 def _fix_result_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1308 """Update timestamp columns to be naive datetime type in returned
1309 DataFrame.
1310
1311 AP pipeline code expects DataFrames to contain naive datetime columns,
1312 while Postgres queries return timezone-aware type. This method converts
1313 those columns to naive datetime in UTC timezone.
1314 """
1315 # Find all columns with aware timestamps.
1316 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1317 for column in columns:
1318 # tz_convert(None) will convert to UTC and drop timezone.
1319 df[column] = df[column].dt.tz_convert(None)
1320 return df

◆ _getDiaSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesByIDs ( self,
list[int] object_ids,
float start_time_mjdTai )
protected
Return catalog of DiaSource instances given set of DiaObject IDs.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
start_time_mjdTai : `float`
    Lower bound of time window for the query.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaSource records.

Definition at line 831 of file apdbSql.py.

831 def _getDiaSourcesByIDs(self, object_ids: list[int], start_time_mjdTai: float) -> pandas.DataFrame:
832 """Return catalog of DiaSource instances given set of DiaObject IDs.
833
834 Parameters
835 ----------
836 object_ids :
837 Collection of DiaObject IDs
838 start_time_mjdTai : `float`
839 Lower bound of time window for the query.
840
841 Returns
842 -------
843 catalog : `pandas.DataFrame`
844 Catalog containing DiaSource records.
845 """
846 with self._timer("select_time", tags={"table": "DiaSource"}) as timer:
847 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, start_time_mjdTai)
848 timer.add_values(row_count=len(sources))
849
850 _LOG.debug("found %s DiaSources", len(sources))
851 return sources
852

◆ _getDiaSourcesInRegion()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesInRegion ( self,
Region region,
float start_time_mjdTai )
protected
Return catalog of DiaSource instances from given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
start_time_mjdTai : `float`
    Lower bound of time window for the query.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaSource records.

Definition at line 799 of file apdbSql.py.

799 def _getDiaSourcesInRegion(self, region: Region, start_time_mjdTai: float) -> pandas.DataFrame:
800 """Return catalog of DiaSource instances from given region.
801
802 Parameters
803 ----------
804 region : `lsst.sphgeom.Region`
805 Region to search for DIASources.
806 start_time_mjdTai : `float`
807 Lower bound of time window for the query.
808
809 Returns
810 -------
811 catalog : `pandas.DataFrame`
812 Catalog containing DiaSource records.
813 """
814 table = self._schema.get_table(ApdbTables.DiaSource)
815 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
816 query = sql.select(*columns)
817
818 # build selection
819 time_filter = table.columns["midpointMjdTai"] > start_time_mjdTai
820 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
821 query = query.where(where)
822
823 # execute select
824 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}) as timer:
825 with self._engine.begin() as conn:
826 sources = pandas.read_sql_query(query, conn)
827 timer.add_values(row_counts=len(sources))
828 _LOG.debug("found %s DiaSources", len(sources))
829 return self._fix_result_timestamps(sources)
830

◆ _getSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getSourcesByIDs ( self,
ApdbTables table_enum,
list[int] object_ids,
float midpointMjdTai_start )
protected
Return catalog of DiaSource or DiaForcedSource instances given set
of DiaObject IDs.

Parameters
----------
table : `sqlalchemy.schema.Table`
    Database table.
object_ids :
    Collection of DiaObject IDs
midpointMjdTai_start : `float`
    Earliest midpointMjdTai to retrieve.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog contaning DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 853 of file apdbSql.py.

855 ) -> pandas.DataFrame:
856 """Return catalog of DiaSource or DiaForcedSource instances given set
857 of DiaObject IDs.
858
859 Parameters
860 ----------
861 table : `sqlalchemy.schema.Table`
862 Database table.
863 object_ids :
864 Collection of DiaObject IDs
865 midpointMjdTai_start : `float`
866 Earliest midpointMjdTai to retrieve.
867
868 Returns
869 -------
870 catalog : `pandas.DataFrame`
871 Catalog contaning DiaSource records. `None` is returned if
872 ``read_sources_months`` configuration parameter is set to 0 or
873 when ``object_ids`` is empty.
874 """
875 table = self._schema.get_table(table_enum)
876 columns = self._schema.get_apdb_columns(table_enum)
877
878 sources: pandas.DataFrame | None = None
879 if len(object_ids) <= 0:
880 _LOG.debug("ID list is empty, just fetch empty result")
881 query = sql.select(*columns).where(sql.literal(False))
882 with self._engine.begin() as conn:
883 sources = pandas.read_sql_query(query, conn)
884 else:
885 data_frames: list[pandas.DataFrame] = []
886 for ids in chunk_iterable(sorted(object_ids), 1000):
887 query = sql.select(*columns)
888
889 # Some types like np.int64 can cause issues with
890 # sqlalchemy, convert them to int.
891 int_ids = [int(oid) for oid in ids]
892
893 # select by object id
894 query = query.where(
895 sql.expression.and_(
896 table.columns["diaObjectId"].in_(int_ids),
897 table.columns["midpointMjdTai"] > midpointMjdTai_start,
898 )
899 )
900
901 # execute select
902 with self._engine.begin() as conn:
903 data_frames.append(pandas.read_sql_query(query, conn))
904
905 if len(data_frames) == 1:
906 sources = data_frames[0]
907 else:
908 sources = pandas.concat(data_frames)
909 assert sources is not None, "Catalog cannot be None"
910 return self._fix_result_timestamps(sources)
911

◆ _htm_indices()

list[tuple[int, int]] lsst.dax.apdb.sql.apdbSql.ApdbSql._htm_indices ( self,
Region region )
protected
Generate a set of HTM indices covering specified region.

Parameters
----------
region: `sphgeom.Region`
    Region that needs to be indexed.

Returns
-------
Sequence of ranges, range is a tuple (minHtmID, maxHtmID).

Definition at line 1220 of file apdbSql.py.

1220 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1221 """Generate a set of HTM indices covering specified region.
1222
1223 Parameters
1224 ----------
1225 region: `sphgeom.Region`
1226 Region that needs to be indexed.
1227
1228 Returns
1229 -------
1230 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1231 """
1232 _LOG.debug("region: %s", region)
1233 indices = self.pixelator.envelope(region, self.config.pixelization.htm_max_ranges)
1234
1235 return indices.ranges()
1236

◆ _makeEngine()

sqlalchemy.engine.Engine lsst.dax.apdb.sql.apdbSql.ApdbSql._makeEngine ( cls,
ApdbSqlConfig config,
* ,
bool create )
protected
Make SQLALchemy engine based on configured parameters.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.
create : `bool`
    Whether to try to create new database file, only relevant for
    SQLite backend which always creates new files by default.

Definition at line 192 of file apdbSql.py.

192 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
193 """Make SQLALchemy engine based on configured parameters.
194
195 Parameters
196 ----------
197 config : `ApdbSqlConfig`
198 Configuration object.
199 create : `bool`
200 Whether to try to create new database file, only relevant for
201 SQLite backend which always creates new files by default.
202 """
203 # engine is reused between multiple processes, make sure that we don't
204 # share connections by disabling pool (by using NullPool class)
205 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
206 conn_args: dict[str, Any] = {}
207 if not config.connection_config.connection_pool:
208 kw.update(poolclass=NullPool)
209 if config.connection_config.isolation_level is not None:
210 kw.update(isolation_level=config.connection_config.isolation_level)
211 elif config.db_url.startswith("sqlite"):
212 # Use READ_UNCOMMITTED as default value for sqlite.
213 kw.update(isolation_level="READ_UNCOMMITTED")
214 if config.connection_config.connection_timeout is not None:
215 if config.db_url.startswith("sqlite"):
216 conn_args.update(timeout=config.connection_config.connection_timeout)
217 elif config.db_url.startswith(("postgresql", "mysql")):
218 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
219 kw.update(connect_args=conn_args)
220 engine = sqlalchemy.create_engine(cls._connection_url(config.db_url, create=create), **kw)
221
222 if engine.dialect.name == "sqlite":
223 # Need to enable foreign keys on every new connection.
224 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
225
226 return engine
227

◆ _makeSchema()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._makeSchema ( cls,
ApdbConfig config,
bool drop = False )
protected

Definition at line 493 of file apdbSql.py.

493 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
494 # docstring is inherited from a base class
495
496 if not isinstance(config, ApdbSqlConfig):
497 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
498
499 engine = cls._makeEngine(config, create=True)
500
501 # Ask schema class to create all tables.
502 schema = ApdbSqlSchema(
503 engine=engine,
504 dia_object_index=config.dia_object_index,
505 schema_file=config.schema_file,
506 schema_name=config.schema_name,
507 prefix=config.prefix,
508 namespace=config.namespace,
509 htm_index_column=config.pixelization.htm_index_column,
510 enable_replica=config.enable_replica,
511 )
512 schema.makeSchema(drop=drop)
513
514 # Need metadata table to store few items in it.
515 meta_table = schema.get_table(ApdbTables.metadata)
516 apdb_meta = ApdbMetadataSql(engine, meta_table)
517
518 # Fill version numbers, overwrite if they are already there.
519 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
520 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
521 if config.enable_replica:
522 # Only store replica code version if replica is enabled.
523 apdb_meta.set(
524 cls.metadataReplicaVersionKey,
525 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
526 force=True,
527 )
528
529 # Store frozen part of a configuration in metadata.
530 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
531 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
532

◆ _storeDiaForcedSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaForcedSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store a set of DiaForcedSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaForcedSource records

Definition at line 1114 of file apdbSql.py.

1119 ) -> None:
1120 """Store a set of DiaForcedSources from current visit.
1121
1122 Parameters
1123 ----------
1124 sources : `pandas.DataFrame`
1125 Catalog containing DiaForcedSource records
1126 """
1127 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1128
1129 # Insert replica data
1130 replica_data: list[dict] = []
1131 replica_stmt: Any = None
1132 replica_table_name = ""
1133 if replica_chunk is not None:
1134 pk_names = [column.name for column in table.primary_key]
1135 replica_data = sources[pk_names].to_dict("records")
1136 if replica_data:
1137 for row in replica_data:
1138 row["apdb_replica_chunk"] = replica_chunk.id
1139 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1140 replica_table_name = replica_table.name
1141 replica_stmt = replica_table.insert()
1142
1143 # everything to be done in single transaction
1144 with self._timer("insert_time", tags={"table": table.name}) as timer:
1145 sources = _coerce_uint64(sources)
1146 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1147 timer.add_values(row_count=len(sources))
1148 if replica_stmt is not None:
1149 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1150 connection.execute(replica_stmt, replica_data)
1151 timer.add_values(row_count=len(replica_data))
1152

◆ _storeDiaObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaObjects ( self,
pandas.DataFrame objs,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaObjects from current visit.

Parameters
----------
objs : `pandas.DataFrame`
    Catalog with DiaObject records.
visit_time : `astropy.time.Time`
    Time of the visit.
replica_chunk : `ReplicaChunk`
    Insert identifier.

Definition at line 938 of file apdbSql.py.

944 ) -> None:
945 """Store catalog of DiaObjects from current visit.
946
947 Parameters
948 ----------
949 objs : `pandas.DataFrame`
950 Catalog with DiaObject records.
951 visit_time : `astropy.time.Time`
952 Time of the visit.
953 replica_chunk : `ReplicaChunk`
954 Insert identifier.
955 """
956 if len(objs) == 0:
957 _LOG.debug("No objects to write to database.")
958 return
959
960 # Some types like np.int64 can cause issues with sqlalchemy, convert
961 # them to int.
962 ids = sorted(int(oid) for oid in objs["diaObjectId"])
963 _LOG.debug("first object ID: %d", ids[0])
964
965 if self._schema.has_mjd_timestamps:
966 validity_start_column = "validityStartMjdTai"
967 validity_end_column = "validityEndMjdTai"
968 timestamp = float(visit_time.tai.mjd)
969 else:
970 validity_start_column = "validityStart"
971 validity_end_column = "validityEnd"
972 timestamp = visit_time.datetime
973
974 # everything to be done in single transaction
975 if self.config.dia_object_index == "last_object_table":
976 # Insert and replace all records in LAST table.
977 table = self._schema.get_table(ApdbTables.DiaObjectLast)
978
979 # DiaObjectLast did not have this column in the past.
980 use_validity_start = self._schema.check_column(ApdbTables.DiaObjectLast, validity_start_column)
981
982 # Drop the previous objects (pandas cannot upsert).
983 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
984
985 with self._timer("delete_time", tags={"table": table.name}) as timer:
986 res = connection.execute(query)
987 timer.add_values(row_count=res.rowcount)
988 _LOG.debug("deleted %s objects", res.rowcount)
989
990 # DiaObjectLast is a subset of DiaObject, strip missing columns
991 last_column_names = [column.name for column in table.columns]
992 if validity_start_column in last_column_names and validity_start_column not in objs.columns:
993 last_column_names.remove(validity_start_column)
994 last_objs = objs[last_column_names]
995 last_objs = _coerce_uint64(last_objs)
996
997 # Fill validityStart, only when it is in the schema.
998 if use_validity_start:
999 if validity_start_column in last_objs:
1000 last_objs[validity_start_column] = timestamp
1001 else:
1002 extra_column = pandas.Series([timestamp] * len(last_objs), name=validity_start_column)
1003 last_objs.set_index(extra_column.index, inplace=True)
1004 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
1005
1006 with self._timer("insert_time", tags={"table": "DiaObjectLast"}) as timer:
1007 last_objs.to_sql(
1008 table.name,
1009 connection,
1010 if_exists="append",
1011 index=False,
1012 schema=table.schema,
1013 )
1014 timer.add_values(row_count=len(last_objs))
1015 else:
1016 # truncate existing validity intervals
1017 table = self._schema.get_table(ApdbTables.DiaObject)
1018
1019 update = (
1020 table.update()
1021 .values(**{validity_end_column: timestamp})
1022 .where(
1023 sql.expression.and_(
1024 table.columns["diaObjectId"].in_(ids),
1025 table.columns[validity_end_column].is_(None),
1026 )
1027 )
1028 )
1029
1030 with self._timer("truncate_time", tags={"table": table.name}) as timer:
1031 res = connection.execute(update)
1032 timer.add_values(row_count=res.rowcount)
1033 _LOG.debug("truncated %s intervals", res.rowcount)
1034
1035 objs = _coerce_uint64(objs)
1036
1037 # Fill additional columns
1038 extra_columns: list[pandas.Series] = []
1039 if validity_start_column in objs.columns:
1040 objs[validity_start_column] = timestamp
1041 else:
1042 extra_columns.append(pandas.Series([timestamp] * len(objs), name=validity_start_column))
1043 if validity_end_column in objs.columns:
1044 objs[validity_end_column] = None
1045 else:
1046 extra_columns.append(pandas.Series([None] * len(objs), name=validity_end_column))
1047 if extra_columns:
1048 objs.set_index(extra_columns[0].index, inplace=True)
1049 objs = pandas.concat([objs] + extra_columns, axis="columns")
1050
1051 # Insert replica data
1052 table = self._schema.get_table(ApdbTables.DiaObject)
1053 replica_data: list[dict] = []
1054 replica_stmt: Any = None
1055 replica_table_name = ""
1056 if replica_chunk is not None:
1057 pk_names = [column.name for column in table.primary_key]
1058 replica_data = objs[pk_names].to_dict("records")
1059 if replica_data:
1060 for row in replica_data:
1061 row["apdb_replica_chunk"] = replica_chunk.id
1062 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1063 replica_table_name = replica_table.name
1064 replica_stmt = replica_table.insert()
1065
1066 # insert new versions
1067 with self._timer("insert_time", tags={"table": table.name}) as timer:
1068 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1069 timer.add_values(row_count=len(objs))
1070 if replica_stmt is not None:
1071 with self._timer("insert_time", tags={"table": replica_table_name}) as timer:
1072 connection.execute(replica_stmt, replica_data)
1073 timer.add_values(row_count=len(replica_data))
1074

◆ _storeDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaSource records

Definition at line 1075 of file apdbSql.py.

1080 ) -> None:
1081 """Store catalog of DiaSources from current visit.
1082
1083 Parameters
1084 ----------
1085 sources : `pandas.DataFrame`
1086 Catalog containing DiaSource records
1087 """
1088 table = self._schema.get_table(ApdbTables.DiaSource)
1089
1090 # Insert replica data
1091 replica_data: list[dict] = []
1092 replica_stmt: Any = None
1093 replica_table_name = ""
1094 if replica_chunk is not None:
1095 pk_names = [column.name for column in table.primary_key]
1096 replica_data = sources[pk_names].to_dict("records")
1097 if replica_data:
1098 for row in replica_data:
1099 row["apdb_replica_chunk"] = replica_chunk.id
1100 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1101 replica_table_name = replica_table.name
1102 replica_stmt = replica_table.insert()
1103
1104 # everything to be done in single transaction
1105 with self._timer("insert_time", tags={"table": table.name}) as timer:
1106 sources = _coerce_uint64(sources)
1107 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1108 timer.add_values(row_count=len(sources))
1109 if replica_stmt is not None:
1110 with self._timer("replica_insert_time", tags={"table": replica_table_name}) as timer:
1111 connection.execute(replica_stmt, replica_data)
1112 timer.add_values(row_count=len(replica_data))
1113

◆ _storeReplicaChunk()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeReplicaChunk ( self,
ReplicaChunk replica_chunk,
sqlalchemy.engine.Connection connection )
protected

Definition at line 912 of file apdbSql.py.

916 ) -> None:
917 # `visit_time.datetime` returns naive datetime, even though all astropy
918 # times are in UTC. Add UTC timezone to timestamp so that database
919 # can store a correct value.
920 dt = datetime.datetime.fromtimestamp(replica_chunk.last_update_time.unix_tai, tz=datetime.UTC)
921
922 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
923
924 # We need UPSERT which is dialect-specific construct
925 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
926 row = {"apdb_replica_chunk": replica_chunk.id} | values
927 if connection.dialect.name == "sqlite":
928 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
929 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
930 connection.execute(insert_sqlite, row)
931 elif connection.dialect.name == "postgresql":
932 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
933 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
934 connection.execute(insert_pg, row)
935 else:
936 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
937

◆ _storeUpdateRecords()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeUpdateRecords ( self,
Iterable[ApdbUpdateRecord] records,
ReplicaChunk chunk,
* ,
bool store_chunk = False,
sqlalchemy.engine.Connection | None connection = None )
protected
Store ApdbUpdateRecords in the replica table for those records.

Parameters
----------
records : `list` [`ApdbUpdateRecord`]
    Records to store.
chunk : `ReplicaChunk`
    Replica chunk for these records.
store_chunk : `bool`
    If True then also store replica chunk.
connection : `sqlalchemy.engine.Connection`
    SQLALchemy connection to use, if `None` the new connection will be
    made. `None` is useful for tests only, regular use will call this
    method in the same transaction that saves other types of records.

Raises
------
TypeError
    Raised if replication is not enabled for this instance.

Definition at line 1153 of file apdbSql.py.

1160 ) -> None:
1161 """Store ApdbUpdateRecords in the replica table for those records.
1162
1163 Parameters
1164 ----------
1165 records : `list` [`ApdbUpdateRecord`]
1166 Records to store.
1167 chunk : `ReplicaChunk`
1168 Replica chunk for these records.
1169 store_chunk : `bool`
1170 If True then also store replica chunk.
1171 connection : `sqlalchemy.engine.Connection`
1172 SQLALchemy connection to use, if `None` the new connection will be
1173 made. `None` is useful for tests only, regular use will call this
1174 method in the same transaction that saves other types of records.
1175
1176 Raises
1177 ------
1178 TypeError
1179 Raised if replication is not enabled for this instance.
1180 """
1181 if not self._schema.replication_enabled:
1182 raise TypeError("Replication is not enabled for this APDB instance.")
1183
1184 apdb_replica_chunk = chunk.id
1185 # Do not use unique_if from ReplicaChunk as it could be reused in
1186 # multiple calls to this method.
1187 update_unique_id = uuid.uuid4()
1188
1189 record_dicts = []
1190 for record in records:
1191 record_dicts.append(
1192 {
1193 "apdb_replica_chunk": apdb_replica_chunk,
1194 "update_time_ns": record.update_time_ns,
1195 "update_order": record.update_order,
1196 "update_unique_id": update_unique_id,
1197 "update_payload": record.to_json(),
1198 }
1199 )
1200
1201 if not record_dicts:
1202 return
1203
1204 # TODO: Need to check that table exists.
1205 table = self._schema.get_table(ExtraTables.ApdbUpdateRecordChunks)
1206
1207 def _do_store(connection: sqlalchemy.engine.Connection) -> None:
1208 if store_chunk:
1209 self._storeReplicaChunk(chunk, connection)
1210 with self._timer("insert_time", tags={"table": table.name}) as timer:
1211 connection.execute(table.insert(), record_dicts)
1212 timer.add_values(row_count=len(record_dicts))
1213
1214 if connection is None:
1215 with self._engine.begin() as connection:
1216 _do_store(connection)
1217 else:
1218 _do_store(connection)
1219

◆ _timer()

Timer lsst.dax.apdb.sql.apdbSql.ApdbSql._timer ( self,
str name,
* ,
Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 187 of file apdbSql.py.

187 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
188 """Create `Timer` instance given its name."""
189 return Timer(name, _MON, tags=tags)
190

◆ _update_sqlite_url()

str lsst.dax.apdb.sql.apdbSql.ApdbSql._update_sqlite_url ( cls,
str url_string )
protected
If URL refers to sqlite dialect, update it so that the backend does
not try to create database file if it does not exist already.

Parameters
----------
url_string : `str`
    Connection string.

Returns
-------
url_string : `str`
    Possibly updated connection string.

Definition at line 266 of file apdbSql.py.

266 def _update_sqlite_url(cls, url_string: str) -> str:
267 """If URL refers to sqlite dialect, update it so that the backend does
268 not try to create database file if it does not exist already.
269
270 Parameters
271 ----------
272 url_string : `str`
273 Connection string.
274
275 Returns
276 -------
277 url_string : `str`
278 Possibly updated connection string.
279 """
280 try:
281 url = sqlalchemy.make_url(url_string)
282 except sqlalchemy.exc.SQLAlchemyError:
283 # If parsing fails it means some special format, likely not
284 # sqlite so we just return it unchanged.
285 return url_string
286
287 if url.get_backend_name() == "sqlite":
288 # Massage url so that database name starts with "file:" and
289 # option string has "mode=rw&uri=true". Database name
290 # should look like a path (:memory: is not supported by
291 # Apdb, but someone could still try to use it).
292 database = url.database
293 if database and not database.startswith((":", "file:")):
294 query = dict(url.query, mode="rw", uri="true")
295 # If ``database`` is an absolute path then original URL should
296 # include four slashes after "sqlite:". Humans are bad at
297 # counting things beyond four and sometimes an extra slash gets
298 # added unintentionally, which causes sqlite to treat initial
299 # element as "authority" and to complain. Strip extra slashes
300 # at the start of the path to avoid that (DM-46077).
301 if database.startswith("//"):
302 warnings.warn(
303 f"Database URL contains extra leading slashes which will be removed: {url}",
304 stacklevel=3,
305 )
306 database = "/" + database.lstrip("/")
307 url = url.set(database=f"file:{database}", query=query)
308 url_string = url.render_as_string()
309
310 return url_string
311

◆ _versionCheck()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql._versionCheck ( self,
ApdbMetadataSql metadata )
protected
Check schema version compatibility and return the database schema
version.

Definition at line 312 of file apdbSql.py.

312 def _versionCheck(self, metadata: ApdbMetadataSql) -> VersionTuple:
313 """Check schema version compatibility and return the database schema
314 version.
315 """
316
317 def _get_version(key: str) -> VersionTuple:
318 """Retrieve version number from given metadata key."""
319 version_str = metadata.get(key)
320 if version_str is None:
321 # Should not happen with existing metadata table.
322 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
323 return VersionTuple.fromString(version_str)
324
325 db_schema_version = _get_version(self.metadataSchemaVersionKey)
326 db_code_version = _get_version(self.metadataCodeVersionKey)
327
328 # For now there is no way to make read-only APDB instances, assume that
329 # any access can do updates.
330 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
331 raise IncompatibleVersionError(
332 f"Configured schema version {self._schema.schemaVersion()} "
333 f"is not compatible with database version {db_schema_version}"
334 )
335 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
336 raise IncompatibleVersionError(
337 f"Current code version {self.apdbImplementationVersion()} "
338 f"is not compatible with database version {db_code_version}"
339 )
340
341 # Check replica code version only if replica is enabled.
342 if self._schema.replication_enabled:
343 db_replica_version = _get_version(self.metadataReplicaVersionKey)
344 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
345 if not code_replica_version.checkCompatibility(db_replica_version):
346 raise IncompatibleVersionError(
347 f"Current replication code version {code_replica_version} "
348 f"is not compatible with database version {db_replica_version}"
349 )
350
351 return db_schema_version
352

◆ admin()

ApdbSqlAdmin lsst.dax.apdb.sql.apdbSql.ApdbSql.admin ( self)
Object providing adminitrative interface for APDB (`ApdbAdmin`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 795 of file apdbSql.py.

795 def admin(self) -> ApdbSqlAdmin:
796 # docstring is inherited from a base class
797 return ApdbSqlAdmin(self.pixelator)
798

◆ apdbImplementationVersion()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql.apdbImplementationVersion ( cls)
Return version number for current APDB implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Definition at line 354 of file apdbSql.py.

354 def apdbImplementationVersion(cls) -> VersionTuple:
355 """Return version number for current APDB implementation.
356
357 Returns
358 -------
359 version : `VersionTuple`
360 Version of the code defined in implementation class.
361 """
362 return VERSION
363

◆ containsVisitDetector()

bool lsst.dax.apdb.sql.apdbSql.ApdbSql.containsVisitDetector ( self,
int visit,
int detector,
Region region,
astropy.time.Time visit_time )
Test whether any sources for a given visit-detector are present in
the APDB.

Parameters
----------
visit, detector : `int`
    The ID of the visit-detector to search for.
region : `lsst.sphgeom.Region`
    Region corresponding to the visit/detector combination.
visit_time : `astropy.time.Time`
    Visit time (as opposed to visit processing time). This can be any
    timestamp in the visit timespan, e.g. its begin or end time.

Returns
-------
present : `bool`
    `True` if at least one DiaSource or DiaForcedSource record
    may exist for the specified observation, `False` otherwise.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 628 of file apdbSql.py.

634 ) -> bool:
635 # docstring is inherited from a base class
636 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
637 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
638 # Query should load only one leaf page of the index
639 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
640
641 with self._engine.begin() as conn:
642 result = conn.execute(query1).scalar_one_or_none()
643 if result is not None:
644 return True
645 else:
646 # Backup query if an image was processed but had no diaSources
647 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
648 result = conn.execute(query2).scalar_one_or_none()
649 return result is not None
650

◆ countUnassociatedObjects()

int lsst.dax.apdb.sql.apdbSql.ApdbSql.countUnassociatedObjects ( self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 768 of file apdbSql.py.

768 def countUnassociatedObjects(self) -> int:
769 # docstring is inherited from a base class
770
771 # Retrieve the DiaObject table.
772 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
773
774 if self._schema.has_mjd_timestamps:
775 validity_end_column = "validityEndMjdTai"
776 else:
777 validity_end_column = "validityEnd"
778
779 # Construct the sql statement.
780 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
781 stmt = stmt.where(table.columns[validity_end_column] == None) # noqa: E711
782
783 # Return the count.
784 with self._engine.begin() as conn:
785 count = conn.execute(stmt).scalar_one()
786
787 return count
788

◆ dailyJob()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.dailyJob ( self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 764 of file apdbSql.py.

764 def dailyJob(self) -> None:
765 # docstring is inherited from a base class
766 pass
767

◆ from_config()

Apdb lsst.dax.apdb.apdb.Apdb.from_config ( cls,
ApdbConfig config )
inherited
Create Ppdb instance from configuration object.

Parameters
----------
config : `ApdbConfig`
    Configuration object, type of this object determines type of the
    Apdb implementation.

Returns
-------
apdb : `apdb`
    Instance of `Apdb` class.

Definition at line 50 of file apdb.py.

50 def from_config(cls, config: ApdbConfig) -> Apdb:
51 """Create Ppdb instance from configuration object.
52
53 Parameters
54 ----------
55 config : `ApdbConfig`
56 Configuration object, type of this object determines type of the
57 Apdb implementation.
58
59 Returns
60 -------
61 apdb : `apdb`
62 Instance of `Apdb` class.
63 """
64 return make_apdb(config)
65

◆ from_uri()

Apdb lsst.dax.apdb.apdb.Apdb.from_uri ( cls,
ResourcePathExpression uri )
inherited
Make Apdb instance from a serialized configuration.

Parameters
----------
uri : `~lsst.resources.ResourcePathExpression`
    URI or local file path pointing to a file with serialized
    configuration, or a string with a "label:" prefix. In the latter
    case, the configuration will be looked up from an APDB index file
    using the label name that follows the prefix. The APDB index file's
    location is determined by the ``DAX_APDB_INDEX_URI`` environment
    variable.

Returns
-------
apdb : `apdb`
    Instance of `Apdb` class, the type of the returned instance is
    determined by configuration.

Definition at line 67 of file apdb.py.

67 def from_uri(cls, uri: ResourcePathExpression) -> Apdb:
68 """Make Apdb instance from a serialized configuration.
69
70 Parameters
71 ----------
72 uri : `~lsst.resources.ResourcePathExpression`
73 URI or local file path pointing to a file with serialized
74 configuration, or a string with a "label:" prefix. In the latter
75 case, the configuration will be looked up from an APDB index file
76 using the label name that follows the prefix. The APDB index file's
77 location is determined by the ``DAX_APDB_INDEX_URI`` environment
78 variable.
79
80 Returns
81 -------
82 apdb : `apdb`
83 Instance of `Apdb` class, the type of the returned instance is
84 determined by configuration.
85 """
86 config = ApdbConfig.from_uri(uri)
87 return make_apdb(config)
88

◆ get_replica()

ApdbSqlReplica lsst.dax.apdb.sql.apdbSql.ApdbSql.get_replica ( self)
Return `ApdbReplica` instance for this database.

Definition at line 456 of file apdbSql.py.

456 def get_replica(self) -> ApdbSqlReplica:
457 """Return `ApdbReplica` instance for this database."""
458 return ApdbSqlReplica(self._schema, self._engine, self._db_schema_version)
459

◆ getConfig()

ApdbSqlConfig lsst.dax.apdb.sql.apdbSql.ApdbSql.getConfig ( self)
Return APDB configuration for this instance, including any updates
that may be read from database.

Returns
-------
config : `ApdbConfig`
    APDB configuration.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 484 of file apdbSql.py.

484 def getConfig(self) -> ApdbSqlConfig:
485 # docstring is inherited from a base class
486 return self.config
487

◆ getDiaForcedSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaForcedSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time,
astropy.time.Time | None start_time = None )
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained.
visit_time : `astropy.time.Time`
    Time of the current visit. If APDB contains records later than this
    time they may also be returned.
start_time : `astropy.time.Time`, optional
    Lower bound of time window for the query. If not specified then
    it is calculated using ``visit_time`` and
    ``read_forced_sources_months`` configuration parameter.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaForcedSource records. `None` is returned if
    ``start_time`` is not specified and ``read_forced_sources_months``
    configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 594 of file apdbSql.py.

600 ) -> pandas.DataFrame | None:
601 # docstring is inherited from a base class
602 if start_time is None and self.config.read_forced_sources_months == 0:
603 _LOG.debug("Skip DiaForceSources fetching")
604 return None
605
606 if object_ids is None:
607 # This implementation does not support region-based selection. In
608 # the past DiaForcedSource schema did not have ra/dec columns (it
609 # had x/y columns). ra/dec were added at some point, so we could
610 # add pixelOd column to this table if/when needed.
611 raise NotImplementedError("Region-based selection is not supported")
612
613 # TODO: DateTime.MJD must be consistent with code in ap_association,
614 # alternatively we can fill midpointMjdTai ourselves in store()
615 if start_time is None:
616 start_time_mjdTai = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
617 else:
618 start_time_mjdTai = float(start_time.tai.mjd)
619 _LOG.debug("start_time_mjdTai = %.6f", start_time_mjdTai)
620
621 with self._timer("select_time", tags={"table": "DiaForcedSource"}) as timer:
622 sources = self._getSourcesByIDs(ApdbTables.DiaForcedSource, list(object_ids), start_time_mjdTai)
623 timer.add_values(row_count=len(sources))
624
625 _LOG.debug("found %s DiaForcedSources", len(sources))
626 return sources
627

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaObjects ( self,
Region region )
Return catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject,
and may return only the subset of the DiaObject columns needed
for AP association. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 533 of file apdbSql.py.

533 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
534 # docstring is inherited from a base class
535
536 # decide what columns we need
537 if self.config.dia_object_index == "last_object_table":
538 table_enum = ApdbTables.DiaObjectLast
539 else:
540 table_enum = ApdbTables.DiaObject
541 table = self._schema.get_table(table_enum)
542 if not self.config.dia_object_columns:
543 columns = self._schema.get_apdb_columns(table_enum)
544 else:
545 columns = [table.c[col] for col in self.config.dia_object_columns]
546 query = sql.select(*columns)
547
548 # build selection
549 query = query.where(self._filterRegion(table, region))
550
551 if self._schema.has_mjd_timestamps:
552 validity_end_column = "validityEndMjdTai"
553 else:
554 validity_end_column = "validityEnd"
555
556 # select latest version of objects
557 if self.config.dia_object_index != "last_object_table":
558 query = query.where(table.columns[validity_end_column] == None) # noqa: E711
559
560 # _LOG.debug("query: %s", query)
561
562 # execute select
563 with self._timer("select_time", tags={"table": "DiaObject"}) as timer:
564 with self._engine.begin() as conn:
565 objects = pandas.read_sql_query(query, conn)
566 timer.add_values(row_count=len(objects))
567 _LOG.debug("found %s DiaObjects", len(objects))
568 return self._fix_result_timestamps(objects)
569

◆ getDiaSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time,
astropy.time.Time | None start_time = None )
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `astropy.time.Time`
    Time of the current visit. If APDB contains records later than this
    time they may also be returned.
start_time : `astropy.time.Time`, optional
    Lower bound of time window for the query. If not specified then
    it is calculated using ``visit_time`` and
    ``read_forced_sources_months`` configuration parameter.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``start_time`` is not specified and ``read_sources_months``
    configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 570 of file apdbSql.py.

576 ) -> pandas.DataFrame | None:
577 # docstring is inherited from a base class
578 if start_time is None and self.config.read_sources_months == 0:
579 _LOG.debug("Skip DiaSources fetching")
580 return None
581
582 if start_time is None:
583 start_time_mjdTai = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
584 else:
585 start_time_mjdTai = float(start_time.tai.mjd)
586 _LOG.debug("start_time_mjdTai = %.6f", start_time_mjdTai)
587
588 if object_ids is None:
589 # region-based select
590 return self._getDiaSourcesInRegion(region, start_time_mjdTai)
591 else:
592 return self._getDiaSourcesByIDs(list(object_ids), start_time_mjdTai)
593

◆ getSSObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getSSObjects ( self)
Return catalog of SSObject instances.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing SSObject records, all existing records are
    returned.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 651 of file apdbSql.py.

651 def getSSObjects(self) -> pandas.DataFrame:
652 # docstring is inherited from a base class
653
654 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
655 query = sql.select(*columns)
656
657 # execute select
658 with self._timer("SSObject_select_time", tags={"table": "SSObject"}) as timer:
659 with self._engine.begin() as conn:
660 objects = pandas.read_sql_query(query, conn)
661 timer.add_values(row_count=len(objects))
662 _LOG.debug("found %s SSObjects", len(objects))
663 return self._fix_result_timestamps(objects)
664

◆ init_database()

ApdbSqlConfig lsst.dax.apdb.sql.apdbSql.ApdbSql.init_database ( cls,
str db_url,
* ,
str | None schema_file = None,
str | None schema_name = None,
int | None read_sources_months = None,
int | None read_forced_sources_months = None,
bool enable_replica = False,
int | None connection_timeout = None,
str | None dia_object_index = None,
int | None htm_level = None,
str | None htm_index_column = None,
tuple[str, str] | None ra_dec_columns = None,
str | None prefix = None,
str | None namespace = None,
bool drop = False )
Initialize new APDB instance and make configuration object for it.

Parameters
----------
db_url : `str`
    SQLAlchemy database URL.
schema_file : `str`, optional
    Location of (YAML) configuration file with APDB schema. If not
    specified then default location will be used.
schema_name : str | None
    Name of the schema in YAML configuration file. If not specified
    then default name will be used.
read_sources_months : `int`, optional
    Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
    Number of months of history to read from DiaForcedSource.
enable_replica : `bool`
    If True, make additional tables used for replication to PPDB.
connection_timeout : `int`, optional
    Database connection timeout in seconds.
dia_object_index : `str`, optional
    Indexing mode for DiaObject table.
htm_level : `int`, optional
    HTM indexing level.
htm_index_column : `str`, optional
    Name of a HTM index column for DiaObject and DiaSource tables.
ra_dec_columns : `tuple` [`str`, `str`], optional
    Names of ra/dec columns in DiaObject table.
prefix : `str`, optional
    Optional prefix for all table names.
namespace : `str`, optional
    Name of the database schema for all APDB tables. If not specified
    then default schema is used.
drop : `bool`, optional
    If `True` then drop existing tables before re-creating the schema.

Returns
-------
config : `ApdbSqlConfig`
    Resulting configuration object for a created APDB instance.

Definition at line 365 of file apdbSql.py.

382 ) -> ApdbSqlConfig:
383 """Initialize new APDB instance and make configuration object for it.
384
385 Parameters
386 ----------
387 db_url : `str`
388 SQLAlchemy database URL.
389 schema_file : `str`, optional
390 Location of (YAML) configuration file with APDB schema. If not
391 specified then default location will be used.
392 schema_name : str | None
393 Name of the schema in YAML configuration file. If not specified
394 then default name will be used.
395 read_sources_months : `int`, optional
396 Number of months of history to read from DiaSource.
397 read_forced_sources_months : `int`, optional
398 Number of months of history to read from DiaForcedSource.
399 enable_replica : `bool`
400 If True, make additional tables used for replication to PPDB.
401 connection_timeout : `int`, optional
402 Database connection timeout in seconds.
403 dia_object_index : `str`, optional
404 Indexing mode for DiaObject table.
405 htm_level : `int`, optional
406 HTM indexing level.
407 htm_index_column : `str`, optional
408 Name of a HTM index column for DiaObject and DiaSource tables.
409 ra_dec_columns : `tuple` [`str`, `str`], optional
410 Names of ra/dec columns in DiaObject table.
411 prefix : `str`, optional
412 Optional prefix for all table names.
413 namespace : `str`, optional
414 Name of the database schema for all APDB tables. If not specified
415 then default schema is used.
416 drop : `bool`, optional
417 If `True` then drop existing tables before re-creating the schema.
418
419 Returns
420 -------
421 config : `ApdbSqlConfig`
422 Resulting configuration object for a created APDB instance.
423 """
424 config = ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
425 if schema_file is not None:
426 config.schema_file = schema_file
427 if schema_name is not None:
428 config.schema_name = schema_name
429 if read_sources_months is not None:
430 config.read_sources_months = read_sources_months
431 if read_forced_sources_months is not None:
432 config.read_forced_sources_months = read_forced_sources_months
433 if connection_timeout is not None:
434 config.connection_config.connection_timeout = connection_timeout
435 if dia_object_index is not None:
436 config.dia_object_index = dia_object_index
437 if htm_level is not None:
438 config.pixelization.htm_level = htm_level
439 if htm_index_column is not None:
440 config.pixelization.htm_index_column = htm_index_column
441 if ra_dec_columns is not None:
442 config.ra_dec_columns = ra_dec_columns
443 if prefix is not None:
444 config.prefix = prefix
445 if namespace is not None:
446 config.namespace = namespace
447
448 cls._makeSchema(config, drop=drop)
449
450 # SQLite has a nasty habit of creating empty database by default,
451 # update URL in config file to disable that behavior.
452 config.db_url = cls._update_sqlite_url(config.db_url)
453
454 return config
455

◆ metadata()

ApdbMetadata lsst.dax.apdb.sql.apdbSql.ApdbSql.metadata ( self)
Object controlling access to APDB metadata (`ApdbMetadata`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 790 of file apdbSql.py.

790 def metadata(self) -> ApdbMetadata:
791 # docstring is inherited from a base class
792 return self._metadata
793

◆ reassignDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.reassignDiaSources ( self,
Mapping[int, int] idMap )
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.

Parameters
----------
idMap : `Mapping`
    Maps DiaSource IDs to their new SSObject IDs.

Raises
------
ValueError
    Raised if DiaSource ID does not exist in the database.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 731 of file apdbSql.py.

731 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
732 # docstring is inherited from a base class
733
734 timestamp: float | datetime.datetime
735 if self._schema.has_mjd_timestamps:
736 timestamp_column = "ssObjectReassocTimeMjdTai"
737 timestamp = float(astropy.time.Time.now().tai.mjd)
738 else:
739 timestamp_column = "ssObjectReassocTime"
740 timestamp = datetime.datetime.now(tz=datetime.UTC)
741
742 table = self._schema.get_table(ApdbTables.DiaSource)
743 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
744
745 with self._engine.begin() as conn:
746 # Need to make sure that every ID exists in the database, but
747 # executemany may not support rowcount, so iterate and check what
748 # is missing.
749 missing_ids: list[int] = []
750 for key, value in idMap.items():
751 params = {
752 "srcId": key,
753 "diaObjectId": 0,
754 "ssObjectId": value,
755 timestamp_column: timestamp,
756 }
757 result = conn.execute(query, params)
758 if result.rowcount == 0:
759 missing_ids.append(key)
760 if missing_ids:
761 missing = ",".join(str(item) for item in missing_ids)
762 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
763

◆ store()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.store ( self,
astropy.time.Time visit_time,
pandas.DataFrame objects,
pandas.DataFrame | None sources = None,
pandas.DataFrame | None forced_sources = None )
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `astropy.time.Time`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

This operation need not be atomic, but DiaSources and DiaForcedSources
will not be stored until all DiaObjects are stored.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 665 of file apdbSql.py.

671 ) -> None:
672 # docstring is inherited from a base class
673 objects = self._fix_input_timestamps(objects)
674 if sources is not None:
675 sources = self._fix_input_timestamps(sources)
676 if forced_sources is not None:
677 forced_sources = self._fix_input_timestamps(forced_sources)
678
679 # We want to run all inserts in one transaction.
680 with self._engine.begin() as connection:
681 replica_chunk: ReplicaChunk | None = None
682 if self._schema.replication_enabled:
683 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
684 self._storeReplicaChunk(replica_chunk, connection)
685
686 # fill pixelId column for DiaObjects
687 objects = self._add_spatial_index(objects)
688 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
689
690 if sources is not None:
691 # fill pixelId column for DiaSources
692 sources = self._add_spatial_index(sources)
693 self._storeDiaSources(sources, replica_chunk, connection)
694
695 if forced_sources is not None:
696 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
697

◆ storeSSObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.storeSSObjects ( self,
pandas.DataFrame objects )
Store or update SSObject catalog.

Parameters
----------
objects : `pandas.DataFrame`
    Catalog with SSObject records.

Notes
-----
If SSObjects with matching IDs already exist in the database, their
records will be updated with the information from provided records.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 698 of file apdbSql.py.

698 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
699 # docstring is inherited from a base class
700 objects = self._fix_input_timestamps(objects)
701
702 idColumn = "ssObjectId"
703 table = self._schema.get_table(ApdbTables.SSObject)
704
705 # everything to be done in single transaction
706 with self._engine.begin() as conn:
707 # Find record IDs that already exist. Some types like np.int64 can
708 # cause issues with sqlalchemy, convert them to int.
709 ids = sorted(int(oid) for oid in objects[idColumn])
710
711 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
712 result = conn.execute(query)
713 knownIds = {row.ssObjectId for row in result}
714
715 filter = objects[idColumn].isin(knownIds)
716 toUpdate = cast(pandas.DataFrame, objects[filter])
717 toInsert = cast(pandas.DataFrame, objects[~filter])
718
719 # insert new records
720 if len(toInsert) > 0:
721 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
722
723 # update existing records
724 if len(toUpdate) > 0:
725 whereKey = f"{idColumn}_param"
726 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
727 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
728 values = toUpdate.to_dict("records")
729 result = conn.execute(update, values)
730

◆ tableDef()

Table | None lsst.dax.apdb.sql.apdbSql.ApdbSql.tableDef ( self,
ApdbTables table )
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `.schema_model.Table` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 488 of file apdbSql.py.

488 def tableDef(self, table: ApdbTables) -> Table | None:
489 # docstring is inherited from a base class
490 return self._schema.tableSchemas.get(table)
491

◆ tableRowCount()

dict[str, int] lsst.dax.apdb.sql.apdbSql.ApdbSql.tableRowCount ( self)
Return dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 460 of file apdbSql.py.

460 def tableRowCount(self) -> dict[str, int]:
461 """Return dictionary with the table names and row counts.
462
463 Used by ``ap_proto`` to keep track of the size of the database tables.
464 Depending on database technology this could be expensive operation.
465
466 Returns
467 -------
468 row_counts : `dict`
469 Dict where key is a table name and value is a row count.
470 """
471 res = {}
472 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
473 if self.config.dia_object_index == "last_object_table":
474 tables.append(ApdbTables.DiaObjectLast)
475 with self._engine.begin() as conn:
476 for table in tables:
477 sa_table = self._schema.get_table(table)
478 stmt = sql.select(func.count()).select_from(sa_table)
479 count: int = conn.execute(stmt).scalar_one()
480 res[table.name] = count
481
482 return res
483

Member Data Documentation

◆ _db_schema_version

lsst.dax.apdb.sql.apdbSql.ApdbSql._db_schema_version = self._versionCheck(self._metadata)
protected

Definition at line 180 of file apdbSql.py.

◆ _engine

lsst.dax.apdb.sql.apdbSql.ApdbSql._engine = self._makeEngine(config, create=False)
protected

Definition at line 153 of file apdbSql.py.

◆ _frozen_parameters

tuple lsst.dax.apdb.sql.apdbSql.ApdbSql._frozen_parameters
staticprotected
Initial value:
= (
"enable_replica",
"dia_object_index",
"pixelization.htm_level",
"pixelization.htm_index_column",
"ra_dec_columns",
)

Definition at line 143 of file apdbSql.py.

◆ _metadata

lsst.dax.apdb.sql.apdbSql.ApdbSql._metadata = ApdbMetadataSql(self._engine, meta_table)
protected

Definition at line 158 of file apdbSql.py.

◆ _schema

lsst.dax.apdb.sql.apdbSql.ApdbSql._schema
protected
Initial value:
= ApdbSqlSchema(
engine=self._engine,
dia_object_index=self.config.dia_object_index,
schema_file=self.config.schema_file,
schema_name=self.config.schema_name,
prefix=self.config.prefix,
namespace=self.config.namespace,
htm_index_column=self.config.pixelization.htm_index_column,
enable_replica=self.config.enable_replica,
)

Definition at line 169 of file apdbSql.py.

◆ config

lsst.dax.apdb.sql.apdbSql.ApdbSql.config = freezer.update(config, config_json)

Definition at line 165 of file apdbSql.py.

◆ metadataCodeVersionKey

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataCodeVersionKey = "version:ApdbSql"
static

Definition at line 134 of file apdbSql.py.

◆ metadataConfigKey

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataConfigKey = "config:apdb-sql.json"
static

Definition at line 140 of file apdbSql.py.

◆ metadataReplicaVersionKey

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataReplicaVersionKey = "version:ApdbSqlReplica"
static

Definition at line 137 of file apdbSql.py.

◆ metadataSchemaVersionKey

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataSchemaVersionKey = "version:schema"
static

Definition at line 131 of file apdbSql.py.

◆ pixelator

lsst.dax.apdb.sql.apdbSql.ApdbSql.pixelator = HtmPixelization(self.config.pixelization.htm_level)

Definition at line 182 of file apdbSql.py.


The documentation for this class was generated from the following file: