22 """Module defining Apdb class and related methods.
25 __all__ = [
"ApdbConfig",
"Apdb",
"Visit"]
27 from collections
import namedtuple
28 from contextlib
import contextmanager
29 from datetime
import datetime
40 from sqlalchemy
import (func, sql)
41 from sqlalchemy.pool
import NullPool
42 from .
import timer, apdbSchema
45 _LOG = logging.getLogger(__name__.partition(
".")[2])
49 """Timer class defining context manager which tracks execution timing.
53 with Timer("timer_name"):
56 On exit from block it will print elapsed time.
58 See also :py:mod:`timer` module.
60 def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
68 Enter context, start timer
77 Exit context, stop and dump timer
87 def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
90 _LOG.info(
"before_cursor_execute")
93 def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
100 def _split(seq, nItems):
101 """Split a sequence into smaller sequences"""
108 def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
109 """Change type of the uint64 columns to int64, return copy of data frame.
111 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
112 return df.astype({name: np.int64
for name
in names})
116 Visit = namedtuple(
'Visit',
'visitId visitTime lastObjectId lastSourceId')
120 def _ansi_session(engine):
121 """Returns a connection, makes sure that ANSI mode is set for MySQL
123 with engine.begin()
as conn:
124 if engine.name ==
'mysql':
125 conn.execute(sql.text(
"SET SESSION SQL_MODE = 'ANSI'"))
130 def _data_file_name(basename):
131 """Return path name of a data file.
133 return os.path.join(
"${DAX_APDB_DIR}",
"data", basename)
138 db_url =
Field(dtype=str, doc=
"SQLAlchemy database connection URI")
140 doc=
"Transaction isolation level",
141 allowed={
"READ_COMMITTED":
"Read committed",
142 "READ_UNCOMMITTED":
"Read uncommitted",
143 "REPEATABLE_READ":
"Repeatable read",
144 "SERIALIZABLE":
"Serializable"},
145 default=
"READ_COMMITTED",
148 doc=(
"If False then disable SQLAlchemy connection pool. "
149 "Do not use connection pool when forking."),
151 connection_timeout =
Field(dtype=float,
152 doc=
"Maximum time to wait time for database lock to be released before "
153 "exiting. Defaults to sqlachemy defaults if not set.",
157 doc=
"If True then pass SQLAlchemy echo option.",
160 doc=
"Indexing mode for DiaObject table",
161 allowed={
'baseline':
"Index defined in baseline schema",
162 'pix_id_iov':
"(pixelId, objectId, iovStart) PK",
163 'last_object_table':
"Separate DiaObjectLast table"},
165 dia_object_nightly =
Field(dtype=bool,
166 doc=
"Use separate nightly table for DiaObject",
168 read_sources_months =
Field(dtype=int,
169 doc=
"Number of months of history to read from DiaSource",
171 read_forced_sources_months =
Field(dtype=int,
172 doc=
"Number of months of history to read from DiaForcedSource",
175 doc=
"List of columns to read from DiaObject, by default read all columns",
177 object_last_replace =
Field(dtype=bool,
178 doc=
"If True (default) then use \"upsert\" for DiaObjectsLast table",
181 doc=
"Location of (YAML) configuration file with standard schema",
182 default=_data_file_name(
"apdb-schema.yaml"))
183 extra_schema_file =
Field(dtype=str,
184 doc=
"Location of (YAML) configuration file with extra schema",
185 default=_data_file_name(
"apdb-schema-extra.yaml"))
187 doc=
"Location of (YAML) configuration file with column mapping",
188 default=_data_file_name(
"apdb-afw-map.yaml"))
190 doc=
"Prefix to add to table names and index names",
193 doc=
"If True then run EXPLAIN SQL command on each executed query",
196 doc=
"If True then print/log timing information",
198 diaobject_index_hint =
Field(dtype=str,
199 doc=
"Name of the index to use with Oracle index hint",
202 dynamic_sampling_hint =
Field(dtype=int,
203 doc=
"If non-zero then use dynamic_sampling hint",
206 doc=
"If non-zero then use cardinality hint",
211 if self.
isolation_levelisolation_level ==
"READ_COMMITTED" and self.
db_urldb_url.startswith(
"sqlite"):
212 raise ValueError(
"Attempting to run Apdb with SQLITE and isolation level 'READ_COMMITTED.' "
213 "Use 'READ_UNCOMMITTED' instead.")
217 """Interface to L1 database, hides all database access details.
219 The implementation is configured via standard ``pex_config`` mechanism
220 using `ApdbConfig` configuration class. For an example of different
221 configurations check config/ folder.
225 config : `ApdbConfig`
226 afw_schemas : `dict`, optional
227 Dictionary with table name for a key and `afw.table.Schema`
228 for a value. Columns in schema will be added to standard
237 _LOG.debug(
"APDB Configuration:")
238 _LOG.debug(
" dia_object_index: %s", self.
configconfig.dia_object_index)
239 _LOG.debug(
" dia_object_nightly: %s", self.
configconfig.dia_object_nightly)
240 _LOG.debug(
" read_sources_months: %s", self.
configconfig.read_sources_months)
241 _LOG.debug(
" read_forced_sources_months: %s", self.
configconfig.read_forced_sources_months)
242 _LOG.debug(
" dia_object_columns: %s", self.
configconfig.dia_object_columns)
243 _LOG.debug(
" object_last_replace: %s", self.
configconfig.object_last_replace)
244 _LOG.debug(
" schema_file: %s", self.
configconfig.schema_file)
245 _LOG.debug(
" extra_schema_file: %s", self.
configconfig.extra_schema_file)
246 _LOG.debug(
" column_map: %s", self.
configconfig.column_map)
247 _LOG.debug(
" schema prefix: %s", self.
configconfig.prefix)
251 kw = dict(echo=self.
configconfig.sql_echo)
253 if not self.
configconfig.connection_pool:
254 kw.update(poolclass=NullPool)
255 if self.
configconfig.isolation_level
is not None:
256 kw.update(isolation_level=self.
configconfig.isolation_level)
257 if self.
configconfig.connection_timeout
is not None:
258 if self.
configconfig.db_url.startswith(
"sqlite"):
259 conn_args.update(timeout=self.
configconfig.connection_timeout)
260 elif self.
configconfig.db_url.startswith((
"postgresql",
"mysql")):
261 conn_args.update(connect_timeout=self.
configconfig.connection_timeout)
262 kw.update(connect_args=conn_args)
263 self.
_engine_engine = sqlalchemy.create_engine(self.
configconfig.db_url, **kw)
266 dia_object_index=self.
configconfig.dia_object_index,
267 dia_object_nightly=self.
configconfig.dia_object_nightly,
268 schema_file=self.
configconfig.schema_file,
269 extra_schema_file=self.
configconfig.extra_schema_file,
270 column_map=self.
configconfig.column_map,
271 afw_schemas=afw_schemas,
272 prefix=self.
configconfig.prefix)
275 """Returns last visit information or `None` if visits table is empty.
277 Visits table is used by ap_proto to track visit information, it is
278 not a part of the regular APDB schema.
282 visit : `Visit` or `None`
283 Last stored visit info or `None` if there was nothing stored yet.
286 with self.
_engine_engine.begin()
as conn:
288 stmnt = sql.select([sql.func.max(self.
_schema_schema.visits.c.visitId),
289 sql.func.max(self.
_schema_schema.visits.c.visitTime)])
290 res = conn.execute(stmnt)
297 _LOG.info(
"lastVisit: visitId: %s visitTime: %s (%s)", visitId,
298 visitTime,
type(visitTime))
301 stmnt = sql.select([sql.func.max(self.
_schema_schema.objects.c.diaObjectId)])
302 lastObjectId = conn.scalar(stmnt)
303 stmnt = sql.select([sql.func.max(self.
_schema_schema.sources.c.diaSourceId)])
304 lastSourceId = conn.scalar(stmnt)
306 return Visit(visitId=visitId, visitTime=visitTime,
307 lastObjectId=lastObjectId, lastSourceId=lastSourceId)
310 """Store visit information.
312 This method is only used by ``ap_proto`` script from ``l1dbproto``
313 and is not intended for production pipelines.
319 visitTime : `datetime.datetime`
323 ins = self.
_schema_schema.visits.insert().values(visitId=visitId,
325 self.
_engine_engine.execute(ins)
328 """Returns dictionary with the table names and row counts.
330 Used by ``ap_proto`` to keep track of the size of the database tables.
331 Depending on database technology this could be expensive operation.
336 Dict where key is a table name and value is a row count.
339 tables = [self.
_schema_schema.objects, self.
_schema_schema.sources, self.
_schema_schema.forcedSources]
340 if self.
configconfig.dia_object_index ==
'last_object_table':
341 tables.append(self.
_schema_schema.objects_last)
343 stmt = sql.select([func.count()]).select_from(table)
344 count = self.
_engine_engine.scalar(stmt)
345 res[table.name] = count
350 """Returns catalog of DiaObject instances from given region.
352 Objects are searched based on pixelization index and region is
353 determined by the set of indices. There is no assumption on a
354 particular type of index, client is responsible for consistency
355 when calculating pixelization indices.
357 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
358 the schema of APDB table. Re-mapping of the column names is done for
359 some columns (based on column map passed to constructor) but types
360 or units are not changed.
362 Returns only the last version of each DiaObject.
366 pixel_ranges : `list` of `tuple`
367 Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
368 This defines set of pixel indices to be included in result.
369 return_pandas : `bool`
370 Return a `pandas.DataFrame` instead of
371 `lsst.afw.table.SourceCatalog`.
375 catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
376 Catalog containing DiaObject records.
380 if self.
configconfig.dia_object_index ==
'last_object_table':
381 table = self.
_schema_schema.objects_last
383 table = self.
_schema_schema.objects
384 if not self.
configconfig.dia_object_columns:
385 query = table.select()
387 columns = [table.c[col]
for col
in self.
configconfig.dia_object_columns]
388 query = sql.select(columns)
390 if self.
configconfig.diaobject_index_hint:
391 val = self.
configconfig.diaobject_index_hint
392 query = query.with_hint(table,
'index_rs_asc(%(name)s "{}")'.
format(val))
393 if self.
configconfig.dynamic_sampling_hint > 0:
394 val = self.
configconfig.dynamic_sampling_hint
395 query = query.with_hint(table,
'dynamic_sampling(%(name)s {})'.
format(val))
396 if self.
configconfig.cardinality_hint > 0:
397 val = self.
configconfig.cardinality_hint
398 query = query.with_hint(table,
'FIRST_ROWS_1 cardinality(%(name)s {})'.
format(val))
402 for low, upper
in pixel_ranges:
405 exprlist.append(table.c.pixelId == low)
407 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
408 query = query.where(sql.expression.or_(*exprlist))
411 if self.
configconfig.dia_object_index !=
'last_object_table':
412 query = query.where(table.c.validityEnd ==
None)
414 _LOG.debug(
"query: %s", query)
416 if self.
configconfig.explain:
421 with Timer(
'DiaObject select', self.
configconfig.timer):
422 with self.
_engine_engine.begin()
as conn:
424 objects = pandas.read_sql_query(query, conn)
426 res = conn.execute(query)
428 _LOG.debug(
"found %s DiaObjects", len(objects))
432 """Returns catalog of DiaSource instances from given region.
434 Sources are searched based on pixelization index and region is
435 determined by the set of indices. There is no assumption on a
436 particular type of index, client is responsible for consistency
437 when calculating pixelization indices.
439 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
440 the schema of APDB table. Re-mapping of the column names is done for
441 some columns (based on column map passed to constructor) but types or
442 units are not changed.
446 pixel_ranges : `list` of `tuple`
447 Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
448 This defines set of pixel indices to be included in result.
449 dt : `datetime.datetime`
450 Time of the current visit
451 return_pandas : `bool`
452 Return a `pandas.DataFrame` instead of
453 `lsst.afw.table.SourceCatalog`.
457 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
458 Catalog containing DiaSource records. `None` is returned if
459 ``read_sources_months`` configuration parameter is set to 0.
462 if self.
configconfig.read_sources_months == 0:
463 _LOG.info(
"Skip DiaSources fetching")
466 table = self.
_schema_schema.sources
467 query = table.select()
471 for low, upper
in pixel_ranges:
474 exprlist.append(table.c.pixelId == low)
476 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
477 query = query.where(sql.expression.or_(*exprlist))
480 with Timer(
'DiaSource select', self.
configconfig.timer):
481 with _ansi_session(self.
_engine_engine)
as conn:
483 sources = pandas.read_sql_query(query, conn)
485 res = conn.execute(query)
487 _LOG.debug(
"found %s DiaSources", len(sources))
491 """Returns catalog of DiaSource instances given set of DiaObject IDs.
493 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
494 the schema of APDB table. Re-mapping of the column names is done for
495 some columns (based on column map passed to constructor) but types or
496 units are not changed.
501 Collection of DiaObject IDs
502 dt : `datetime.datetime`
503 Time of the current visit
504 return_pandas : `bool`
505 Return a `pandas.DataFrame` instead of
506 `lsst.afw.table.SourceCatalog`.
511 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
512 Catalog contaning DiaSource records. `None` is returned if
513 ``read_sources_months`` configuration parameter is set to 0 or
514 when ``object_ids`` is empty.
517 if self.
configconfig.read_sources_months == 0:
518 _LOG.info(
"Skip DiaSources fetching")
521 if len(object_ids) <= 0:
522 _LOG.info(
"Skip DiaSources fetching - no Objects")
526 table = self.
_schema_schema.sources
528 with Timer(
'DiaSource select', self.
configconfig.timer):
529 with _ansi_session(self.
_engine_engine)
as conn:
530 for ids
in _split(sorted(object_ids), 1000):
531 query =
'SELECT * FROM "' + table.name +
'" WHERE '
534 ids =
",".join(str(id)
for id
in ids)
535 query +=
'"diaObjectId" IN (' + ids +
') '
539 df = pandas.read_sql_query(sql.text(query), conn)
543 sources = sources.append(df)
545 res = conn.execute(sql.text(query))
546 sources = self.
_convertResult_convertResult(res,
"DiaSource", sources)
548 _LOG.debug(
"found %s DiaSources", len(sources))
552 """Returns catalog of DiaForcedSource instances matching given
555 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
556 the schema of L1 database table. Re-mapping of the column names may
557 be done for some columns (based on column map passed to constructor)
558 but types or units are not changed.
563 Collection of DiaObject IDs
564 dt : `datetime.datetime`
565 Time of the current visit
566 return_pandas : `bool`
567 Return a `pandas.DataFrame` instead of
568 `lsst.afw.table.SourceCatalog`.
572 catalog : `lsst.afw.table.SourceCatalog` or `None`
573 Catalog contaning DiaForcedSource records. `None` is returned if
574 ``read_sources_months`` configuration parameter is set to 0 or
575 when ``object_ids`` is empty.
578 if self.
configconfig.read_forced_sources_months == 0:
579 _LOG.info(
"Skip DiaForceSources fetching")
582 if len(object_ids) <= 0:
583 _LOG.info(
"Skip DiaForceSources fetching - no Objects")
587 table = self.
_schema_schema.forcedSources
590 with Timer(
'DiaForcedSource select', self.
configconfig.timer):
591 with _ansi_session(self.
_engine_engine)
as conn:
592 for ids
in _split(sorted(object_ids), 1000):
594 query =
'SELECT * FROM "' + table.name +
'" WHERE '
597 ids =
",".join(str(id)
for id
in ids)
598 query +=
'"diaObjectId" IN (' + ids +
') '
602 df = pandas.read_sql_query(sql.text(query), conn)
606 sources = sources.append(df)
608 res = conn.execute(sql.text(query))
609 sources = self.
_convertResult_convertResult(res,
"DiaForcedSource", sources)
611 _LOG.debug(
"found %s DiaForcedSources", len(sources))
615 """Store catalog of DiaObjects from current visit.
617 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
618 compatible with the schema of APDB table:
620 - column names must correspond to database table columns
621 - some columns names are re-mapped based on column map passed to
623 - types and units of the columns must match database definitions,
624 no unit conversion is performed presently
625 - columns that have default values in database schema can be
626 omitted from afw schema
627 - this method knows how to fill interval-related columns
628 (validityStart, validityEnd) they do not need to appear in
633 objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
634 Catalog with DiaObject records
635 dt : `datetime.datetime`
639 if isinstance(objs, pandas.DataFrame):
640 ids = sorted(objs[
'diaObjectId'])
642 ids = sorted([obj[
'id']
for obj
in objs])
643 _LOG.debug(
"first object ID: %d", ids[0])
647 table = self.
_schema_schema.objects
650 with _ansi_session(self.
_engine_engine)
as conn:
652 ids =
",".join(str(id)
for id
in ids)
654 if self.
configconfig.dia_object_index ==
'last_object_table':
658 table = self.
_schema_schema.objects_last
659 do_replace = self.
configconfig.object_last_replace
663 if not do_replace
or isinstance(objs, pandas.DataFrame):
664 query =
'DELETE FROM "' + table.name +
'" '
665 query +=
'WHERE "diaObjectId" IN (' + ids +
') '
667 if self.
configconfig.explain:
671 with Timer(table.name +
' delete', self.
configconfig.timer):
672 res = conn.execute(sql.text(query))
673 _LOG.debug(
"deleted %s objects", res.rowcount)
675 extra_columns = dict(lastNonForcedSource=dt)
676 if isinstance(objs, pandas.DataFrame):
677 with Timer(
"DiaObjectLast insert", self.
configconfig.timer):
678 objs = _coerce_uint64(objs)
679 for col, data
in extra_columns.items():
681 objs.to_sql(
"DiaObjectLast", conn, if_exists=
'append',
686 extra_columns=extra_columns)
691 table = self.
_schema_schema.objects
692 query =
'UPDATE "' + table.name +
'" '
693 query +=
"SET \"validityEnd\" = '" + str(dt) +
"' "
694 query +=
'WHERE "diaObjectId" IN (' + ids +
') '
695 query +=
'AND "validityEnd" IS NULL'
699 if self.
configconfig.explain:
703 with Timer(table.name +
' truncate', self.
configconfig.timer):
704 res = conn.execute(sql.text(query))
705 _LOG.debug(
"truncated %s intervals", res.rowcount)
708 if self.
configconfig.dia_object_nightly:
709 table = self.
_schema_schema.objects_nightly
711 table = self.
_schema_schema.objects
712 extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
714 if isinstance(objs, pandas.DataFrame):
715 with Timer(
"DiaObject insert", self.
configconfig.timer):
716 objs = _coerce_uint64(objs)
717 for col, data
in extra_columns.items():
719 objs.to_sql(
"DiaObject", conn, if_exists=
'append',
723 extra_columns=extra_columns)
726 """Store catalog of DIASources from current visit.
728 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
729 compatible with the schema of L1 database table:
731 - column names must correspond to database table columns
732 - some columns names may be re-mapped based on column map passed to
734 - types and units of the columns must match database definitions,
735 no unit conversion is performed presently
736 - columns that have default values in database schema can be
737 omitted from afw schema
741 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
742 Catalog containing DiaSource records
746 with _ansi_session(self.
_engine_engine)
as conn:
748 if isinstance(sources, pandas.DataFrame):
749 with Timer(
"DiaSource insert", self.
configconfig.timer):
750 sources = _coerce_uint64(sources)
751 sources.to_sql(
"DiaSource", conn, if_exists=
'append',
754 table = self.
_schema_schema.sources
758 """Store a set of DIAForcedSources from current visit.
760 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
761 compatible with the schema of L1 database table:
763 - column names must correspond to database table columns
764 - some columns names may be re-mapped based on column map passed to
766 - types and units of the columns must match database definitions,
767 no unit conversion is performed presently
768 - columns that have default values in database schema can be
769 omitted from afw schema
773 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
774 Catalog containing DiaForcedSource records
778 with _ansi_session(self.
_engine_engine)
as conn:
780 if isinstance(sources, pandas.DataFrame):
781 with Timer(
"DiaForcedSource insert", self.
configconfig.timer):
782 sources = _coerce_uint64(sources)
783 sources.to_sql(
"DiaForcedSource", conn, if_exists=
'append',
786 table = self.
_schema_schema.forcedSources
787 self.
_storeObjectsAfw_storeObjectsAfw(sources, conn, table,
"DiaForcedSource")
790 """Return the number of DiaObjects that have only one DiaSource associated
793 Used as part of ap_verify metrics.
798 Number of DiaObjects with exactly one associated DiaSource.
801 table = self.
_schema_schema.objects
804 stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
805 stmt = stmt.where(table.c.validityEnd ==
None)
808 count = self.
_engine_engine.scalar(stmt)
813 """Test whether data from an image has been loaded into the database.
815 Used as part of ap_verify metrics.
819 visitInfo : `lsst.afw.image.VisitInfo`
820 The metadata for the image of interest.
825 `True` if the data are present, `False` otherwise.
827 id = visitInfo.getExposureId()
828 table = self.
_schema_schema.sources
829 idField = table.c.ccdVisitId
832 query = sql.select([idField]).select_from(table) \
833 .where(idField == id).limit(1)
835 return self.
_engine_engine.scalar(query)
is not None
838 """Implement daily activities like cleanup/vacuum.
840 What should be done during daily cleanup is determined by
841 configuration/schema.
845 if self.
configconfig.dia_object_nightly:
846 with _ansi_session(self.
_engine_engine)
as conn:
847 query =
'INSERT INTO "' + self.
_schema_schema.objects.name +
'" '
848 query +=
'SELECT * FROM "' + self.
_schema_schema.objects_nightly.name +
'"'
849 with Timer(
'DiaObjectNightly copy', self.
configconfig.timer):
850 conn.execute(sql.text(query))
852 query =
'DELETE FROM "' + self.
_schema_schema.objects_nightly.name +
'"'
853 with Timer(
'DiaObjectNightly delete', self.
configconfig.timer):
854 conn.execute(sql.text(query))
856 if self.
_engine_engine.name ==
'postgresql':
859 _LOG.info(
"Running VACUUM on all tables")
860 connection = self.
_engine_engine.raw_connection()
861 ISOLATION_LEVEL_AUTOCOMMIT = 0
862 connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
863 cursor = connection.cursor()
864 cursor.execute(
"VACUUM ANALYSE")
866 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
867 """Create or re-create all tables.
872 If True then drop tables before creating new ones.
873 mysql_engine : `str`, optional
874 Name of the MySQL engine to use for new tables.
875 oracle_tablespace : `str`, optional
876 Name of Oracle tablespace.
877 oracle_iot : `bool`, optional
878 Make Index-organized DiaObjectLast table.
881 oracle_tablespace=oracle_tablespace,
882 oracle_iot=oracle_iot)
884 def _explain(self, query, conn):
885 """Run the query with explain
888 _LOG.info(
"explain for query: %s...", query[:64])
890 if conn.engine.name ==
'mysql':
891 query =
"EXPLAIN EXTENDED " + query
893 query =
"EXPLAIN " + query
895 res = conn.execute(sql.text(query))
897 _LOG.info(
"explain: %s", res.keys())
899 _LOG.info(
"explain: %s", row)
901 _LOG.info(
"EXPLAIN returned nothing")
903 def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
904 replace=False, extra_columns=None):
905 """Generic store method.
907 Takes catalog of records and stores a bunch of objects in a table.
911 objects : `lsst.afw.table.BaseCatalog`
912 Catalog containing object records
915 table : `sqlalchemy.Table`
917 schema_table_name : `str`
918 Name of the table to be used for finding table schema.
920 If `True` then use replace instead of INSERT (should be more efficient)
921 extra_columns : `dict`, optional
922 Mapping (column_name, column_value) which gives column values to add
923 to every row, only if column is missing in catalog records.
927 """Quote and escape values"""
930 elif isinstance(v, datetime):
931 v =
"'" + str(v) +
"'"
932 elif isinstance(v, str):
948 def quoteId(columnName):
949 """Smart quoting for column names.
950 Lower-case names are not quoted.
952 if not columnName.islower():
953 columnName =
'"' + columnName +
'"'
956 if conn.engine.name ==
"oracle":
958 schema_table_name, replace,
961 schema = objects.getSchema()
963 extra_fields =
list((extra_columns
or {}).
keys())
965 afw_fields = [field.getName()
for key, field
in schema
966 if field.getName()
not in extra_fields]
968 column_map = self.
_schema_schema.getAfwColumns(schema_table_name)
970 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
972 if replace
and conn.engine.name
in (
'mysql',
'sqlite'):
973 query =
'REPLACE INTO '
975 query =
'INSERT INTO '
976 qfields = [quoteId(field)
for field
in fields + extra_fields]
977 query += quoteId(table.name) +
' (' +
','.join(qfields) +
') ' +
'VALUES '
982 for field
in afw_fields:
983 if field
not in column_map:
986 if column_map[field].type ==
"DATETIME" and \
989 value = datetime.utcfromtimestamp(value)
990 row.append(quoteValue(value))
991 for field
in extra_fields:
992 row.append(quoteValue(extra_columns[field]))
993 values.append(
'(' +
','.join(row) +
')')
995 if self.
configconfig.explain:
997 self.
_explain_explain(query + values[0], conn)
999 query +=
','.join(values)
1001 if replace
and conn.engine.name ==
'postgresql':
1003 pks = (
'pixelId',
'diaObjectId')
1004 query +=
" ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".
format(*pks)
1005 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
1006 fields = [
'"{0}" = EXCLUDED."{0}"'.
format(field)
1007 for field
in fields
if field
not in pks]
1008 query +=
', '.join(fields)
1011 _LOG.info(
"%s: will store %d records", table.name, len(objects))
1012 with Timer(table.name +
' insert', self.
configconfig.timer):
1013 res = conn.execute(sql.text(query))
1014 _LOG.debug(
"inserted %s intervals", res.rowcount)
1016 def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
1017 replace=False, extra_columns=None):
1018 """Store method for Oracle.
1020 Takes catalog of records and stores a bunch of objects in a table.
1024 objects : `lsst.afw.table.BaseCatalog`
1025 Catalog containing object records
1028 table : `sqlalchemy.Table`
1030 schema_table_name : `str`
1031 Name of the table to be used for finding table schema.
1033 If `True` then use replace instead of INSERT (should be more efficient)
1034 extra_columns : `dict`, optional
1035 Mapping (column_name, column_value) which gives column values to add
1036 to every row, only if column is missing in catalog records.
1039 def quoteId(columnName):
1040 """Smart quoting for column names.
1041 Lower-case naems are not quoted (Oracle backend needs them unquoted).
1043 if not columnName.islower():
1044 columnName =
'"' + columnName +
'"'
1047 schema = objects.getSchema()
1050 extra_fields =
list((extra_columns
or {}).
keys())
1052 afw_fields = [field.getName()
for key, field
in schema
1053 if field.getName()
not in extra_fields]
1056 column_map = self.
_schema_schema.getAfwColumns(schema_table_name)
1060 fields = [column_map[field].name
for field
in afw_fields
1061 if field
in column_map]
1064 qfields = [quoteId(field)
for field
in fields + extra_fields]
1067 vals = [
":col{}".
format(i)
for i
in range(len(fields))]
1068 vals += [
":extcol{}".
format(i)
for i
in range(len(extra_fields))]
1069 query =
'INSERT INTO ' + quoteId(table.name)
1070 query +=
' (' +
','.join(qfields) +
') VALUES'
1071 query +=
' (' +
','.join(vals) +
')'
1073 qvals = [
":col{} {}".
format(i, quoteId(field))
for i, field
in enumerate(fields)]
1074 qvals += [
":extcol{} {}".
format(i, quoteId(field))
for i, field
in enumerate(extra_fields)]
1075 pks = (
'pixelId',
'diaObjectId')
1076 onexpr = [
"SRC.{col} = DST.{col}".
format(col=quoteId(col))
for col
in pks]
1077 setexpr = [
"DST.{col} = SRC.{col}".
format(col=quoteId(col))
1078 for col
in fields + extra_fields
if col
not in pks]
1079 vals = [
"SRC.{col}".
format(col=quoteId(col))
for col
in fields + extra_fields]
1080 query =
"MERGE INTO {} DST ".
format(quoteId(table.name))
1081 query +=
"USING (SELECT {} FROM DUAL) SRC ".
format(
", ".join(qvals))
1082 query +=
"ON ({}) ".
format(
" AND ".join(onexpr))
1083 query +=
"WHEN MATCHED THEN UPDATE SET {} ".
format(
" ,".join(setexpr))
1084 query +=
"WHEN NOT MATCHED THEN INSERT "
1085 query +=
"({}) VALUES ({})".
format(
','.join(qfields),
','.join(vals))
1092 for field
in afw_fields:
1093 if field
not in column_map:
1096 if column_map[field].type ==
"DATETIME" and not np.isnan(value):
1098 value = datetime.utcfromtimestamp(value)
1100 value = str(value.asDegrees())
1101 elif not np.isfinite(value):
1103 row[
"col{}".
format(col)] = value
1105 for i, field
in enumerate(extra_fields):
1106 row[
"extcol{}".
format(i)] = extra_columns[field]
1110 _LOG.info(
"%s: will store %d records", table.name, len(objects))
1111 with Timer(table.name +
' insert', self.
configconfig.timer):
1112 res = conn.execute(sql.text(query), values)
1113 _LOG.debug(
"inserted %s intervals", res.rowcount)
1115 def _convertResult(self, res, table_name, catalog=None):
1116 """Convert result set into output catalog.
1120 res : `sqlalchemy.ResultProxy`
1121 SQLAlchemy result set returned by query.
1124 catalog : `lsst.afw.table.BaseCatalog`
1125 If not None then extend existing catalog
1129 catalog : `lsst.afw.table.SourceCatalog`
1130 If ``catalog`` is None then new instance is returned, otherwise
1131 ``catalog`` is updated and returned.
1134 columns = res.keys()
1135 schema, col_map = self.
_schema_schema.getAfwSchema(table_name, columns)
1137 _LOG.debug(
"_convertResult: schema: %s", schema)
1138 _LOG.debug(
"_convertResult: col_map: %s", col_map)
1143 record = catalog.addNew()
1144 for col, value
in row.items():
1146 col = col_map.get(col)
1148 if isinstance(value, datetime):
1150 value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1151 elif col.getTypeString() ==
'Angle' and value
is not None:
1152 value = value * geom.degrees
1153 if value
is not None:
1154 record.set(col, value)
def storeDiaObjects(self, objs, dt)
def saveVisit(self, visitId, visitTime)
def __init__(self, config, afw_schemas=None)
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def storeDiaSources(self, sources)
def _convertResult(self, res, table_name, catalog=None)
def isVisitProcessed(self, visitInfo)
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
def getDiaObjects(self, pixel_ranges, return_pandas=False)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def _explain(self, query, conn)
def storeDiaForcedSources(self, sources)
def countUnassociatedObjects(self)
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
def getDiaSources(self, object_ids, dt, return_pandas=False)
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
_log_before_cursor_execute
def __exit__(self, exc_type, exc_val, exc_tb)
A class representing an angle.
daf::base::PropertyList * list
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)