22 """Module defining Ppdb class and related methods.    25 __all__ = [
"PpdbConfig", 
"Ppdb", 
"Visit"]
    27 from collections 
import namedtuple
    28 from contextlib 
import contextmanager
    29 from datetime 
import datetime
    40 from sqlalchemy 
import (func, sql)
    41 from sqlalchemy.pool 
import NullPool
    42 from . 
import timer, ppdbSchema
    45 _LOG = logging.getLogger(__name__.partition(
".")[2])  
    49     """Timer class defining context manager which tracks execution timing.    53         with Timer("timer_name"):    56     On exit from block it will print elapsed time.    58     See also :py:mod:`timer` module.    60     def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
    68         Enter context, start timer    77         Exit context, stop and dump timer    87     def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
    90             _LOG.info(
"before_cursor_execute")
    93     def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
   100 def _split(seq, nItems):
   101     """Split a sequence into smaller sequences"""   109 Visit = namedtuple(
'Visit', 
'visitId visitTime lastObjectId lastSourceId')
   113 def _ansi_session(engine):
   114     """Returns a connection, makes sure that ANSI mode is set for MySQL   116     with engine.begin() 
as conn:
   117         if engine.name == 
'mysql':
   118             conn.execute(sql.text(
"SET SESSION SQL_MODE = 'ANSI'"))
   123 def _data_file_name(basename):
   124     """Return path name of a data file.   126     return os.path.join(
getPackageDir(
"dax_ppdb"), 
"data", basename)
   131     db_url = 
Field(dtype=str, doc=
"SQLAlchemy database connection URI")
   133                                   doc=
"Transaction isolation level",
   134                                   allowed={
"READ_COMMITTED": 
"Read committed",
   135                                            "READ_UNCOMMITTED": 
"Read uncommitted",
   136                                            "REPEATABLE_READ": 
"Repeatable read",
   137                                            "SERIALIZABLE": 
"Serializable"},
   138                                   default=
"READ_COMMITTED")
   140                             doc=(
"If False then disable SQLAlchemy connection pool. "   141                                  "Do not use connection pool when forking."),
   143     connection_timeout = 
Field(dtype=float,
   144                                doc=
"Maximum time to wait time for database lock to be released before "   145                                    "exiting. Defaults to sqlachemy defaults if not set.",
   148                      doc=
"If True then pass SQLAlchemy echo option.",
   151                                    doc=
"Indexing mode for DiaObject table",
   152                                    allowed={
'baseline': 
"Index defined in baseline schema",
   153                                             'pix_id_iov': 
"(pixelId, objectId, iovStart) PK",
   154                                             'last_object_table': 
"Separate DiaObjectLast table"},
   156     dia_object_nightly = 
Field(dtype=bool,
   157                                doc=
"Use separate nightly table for DiaObject",
   159     read_sources_months = 
Field(dtype=int,
   160                                 doc=
"Number of months of history to read from DiaSource",
   162     read_forced_sources_months = 
Field(dtype=int,
   163                                        doc=
"Number of months of history to read from DiaForcedSource",
   166                                    doc=
"List of columns to read from DiaObject, by default read all columns",
   168     object_last_replace = 
Field(dtype=bool,
   169                                 doc=
"If True (default) then use \"upsert\" for DiaObjectsLast table",
   172                         doc=
"Location of (YAML) configuration file with standard schema",
   173                         default=_data_file_name(
"ppdb-schema.yaml"))
   174     extra_schema_file = 
Field(dtype=str,
   175                               doc=
"Location of (YAML) configuration file with extra schema",
   176                               default=_data_file_name(
"ppdb-schema-extra.yaml"))
   178                        doc=
"Location of (YAML) configuration file with column mapping",
   179                        default=_data_file_name(
"ppdb-afw-map.yaml"))
   181                    doc=
"Prefix to add to table names and index names",
   184                     doc=
"If True then run EXPLAIN SQL command on each executed query",
   187                   doc=
"If True then print/log timing information",
   189     diaobject_index_hint = 
Field(dtype=str,
   190                                  doc=
"Name of the index to use with Oracle index hint",
   192     dynamic_sampling_hint = 
Field(dtype=int,
   193                                   doc=
"If non-zero then use dynamic_sampling hint",
   196                              doc=
"If non-zero then use cardinality hint",
   201             raise ValueError(
"Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' "   202                              "Use 'READ_UNCOMMITTED' instead.")
   206     """Interface to L1 database, hides all database access details.   208     The implementation is configured via standard ``pex_config`` mechanism   209     using `PpdbConfig` configuration class. For an example of different   210     configurations check config/ folder.   214     config : `PpdbConfig`   215     afw_schemas : `dict`, optional   216         Dictionary with table name for a key and `afw.table.Schema`   217         for a value. Columns in schema will be added to standard   226         _LOG.info(
"PPDB Configuration:")
   227         _LOG.info(
"    dia_object_index: %s", self.
config.dia_object_index)
   228         _LOG.info(
"    dia_object_nightly: %s", self.
config.dia_object_nightly)
   229         _LOG.info(
"    read_sources_months: %s", self.
config.read_sources_months)
   230         _LOG.info(
"    read_forced_sources_months: %s", self.
config.read_forced_sources_months)
   231         _LOG.info(
"    dia_object_columns: %s", self.
config.dia_object_columns)
   232         _LOG.info(
"    object_last_replace: %s", self.
config.object_last_replace)
   233         _LOG.info(
"    schema_file: %s", self.
config.schema_file)
   234         _LOG.info(
"    extra_schema_file: %s", self.
config.extra_schema_file)
   235         _LOG.info(
"    column_map: %s", self.
config.column_map)
   236         _LOG.info(
"    schema prefix: %s", self.
config.prefix)
   240         kw = dict(echo=self.
config.sql_echo)
   242         if not self.
config.connection_pool:
   243             kw.update(poolclass=NullPool)
   244         if self.
config.isolation_level 
is not None:
   245             kw.update(isolation_level=self.
config.isolation_level)
   246         if self.
config.connection_timeout 
is not None:
   247             if self.
config.db_url.startswith(
"sqlite"):
   248                 conn_args.update(timeout=self.
config.connection_timeout)
   249             elif self.
config.db_url.startswith((
"postgresql", 
"mysql")):
   250                 conn_args.update(connect_timeout=self.
config.connection_timeout)
   251         kw.update(connect_args=conn_args)
   252         self.
_engine = sqlalchemy.create_engine(self.
config.db_url, **kw)
   255                                              dia_object_index=self.
config.dia_object_index,
   256                                              dia_object_nightly=self.
config.dia_object_nightly,
   257                                              schema_file=self.
config.schema_file,
   258                                              extra_schema_file=self.
config.extra_schema_file,
   259                                              column_map=self.
config.column_map,
   260                                              afw_schemas=afw_schemas,
   261                                              prefix=self.
config.prefix)
   264         """Returns last visit information or `None` if visits table is empty.   266         Visits table is used by ap_proto to track visit information, it is   267         not a part of the regular PPDB schema.   271         visit : `Visit` or `None`   272             Last stored visit info or `None` if there was nothing stored yet.   275         with self.
_engine.begin() 
as conn:
   277             stmnt = sql.select([sql.func.max(self.
_schema.visits.c.visitId),
   278                                 sql.func.max(self.
_schema.visits.c.visitTime)])
   279             res = conn.execute(stmnt)
   286             _LOG.info(
"lastVisit: visitId: %s visitTime: %s (%s)", visitId,
   287                       visitTime, 
type(visitTime))
   290             stmnt = sql.select([sql.func.max(self.
_schema.objects.c.diaObjectId)])
   291             lastObjectId = conn.scalar(stmnt)
   292             stmnt = sql.select([sql.func.max(self.
_schema.sources.c.diaSourceId)])
   293             lastSourceId = conn.scalar(stmnt)
   295             return Visit(visitId=visitId, visitTime=visitTime,
   296                          lastObjectId=lastObjectId, lastSourceId=lastSourceId)
   299         """Store visit information.   301         This method is only used by ``ap_proto`` script from ``l1dbproto``   302         and is not intended for production pipelines.   308         visitTime : `datetime.datetime`   312         ins = self.
_schema.visits.insert().values(visitId=visitId,
   317         """Returns dictionary with the table names and row counts.   319         Used by ``ap_proto`` to keep track of the size of the database tables.   320         Depending on database technology this could be expensive operation.   325             Dict where key is a table name and value is a row count.   329         if self.
config.dia_object_index == 
'last_object_table':
   330             tables.append(self.
_schema.objects_last)
   332             stmt = sql.select([func.count()]).select_from(table)
   333             count = self.
_engine.scalar(stmt)
   334             res[table.name] = count
   339         """Returns catalog of DiaObject instances from given region.   341         Objects are searched based on pixelization index and region is   342         determined by the set of indices. There is no assumption on a   343         particular type of index, client is responsible for consistency   344         when calculating pixelization indices.   346         This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by   347         the schema of PPDB table. Re-mapping of the column names is done for   348         some columns (based on column map passed to constructor) but types   349         or units are not changed.   351         Returns only the last version of each DiaObject.   355         pixel_ranges : `list` of `tuple`   356             Sequence of ranges, range is a tuple (minPixelID, maxPixelID).   357             This defines set of pixel indices to be included in result.   361         catalog : `lsst.afw.table.SourceCatalog`   362             Catalog contaning DiaObject records.   366         if self.
config.dia_object_index == 
'last_object_table':
   367             table = self.
_schema.objects_last
   370         if not self.
config.dia_object_columns:
   371             query = table.select()
   373             columns = [table.c[col] 
for col 
in self.
config.dia_object_columns]
   374             query = sql.select(columns)
   376         if self.
config.diaobject_index_hint:
   377             val = self.
config.diaobject_index_hint
   378             query = query.with_hint(table, 
'index_rs_asc(%(name)s "{}")'.
format(val))
   379         if self.
config.dynamic_sampling_hint > 0:
   380             val = self.
config.dynamic_sampling_hint
   381             query = query.with_hint(table, 
'dynamic_sampling(%(name)s {})'.
format(val))
   382         if self.
config.cardinality_hint > 0:
   383             val = self.
config.cardinality_hint
   384             query = query.with_hint(table, 
'FIRST_ROWS_1 cardinality(%(name)s {})'.
format(val))
   388         for low, upper 
in pixel_ranges:
   391                 exprlist.append(table.c.pixelId == low)
   393                 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
   394         query = query.where(sql.expression.or_(*exprlist))
   397         if self.
config.dia_object_index != 
'last_object_table':
   398             query = query.where(table.c.validityEnd == 
None)  
   400         _LOG.debug(
"query: %s", query)
   408             with self.
_engine.begin() 
as conn:
   409                 res = conn.execute(query)
   411         _LOG.debug(
"found %s DiaObjects", len(objects))
   415         """Returns catalog of DiaSource instances from given region.   417         Sources are searched based on pixelization index and region is   418         determined by the set of indices. There is no assumption on a   419         particular type of index, client is responsible for consistency   420         when calculating pixelization indices.   422         This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by   423         the schema of PPDB table. Re-mapping of the column names is done for   424         some columns (based on column map passed to constructor) but types or   425         units are not changed.   429         pixel_ranges : `list` of `tuple`   430             Sequence of ranges, range is a tuple (minPixelID, maxPixelID).   431             This defines set of pixel indices to be included in result.   432         dt : `datetime.datetime`   433             Time of the current visit   437         catalog : `lsst.afw.table.SourceCatalog` or `None`   438             Catalog contaning DiaSource records. `None` is returned if   439             ``read_sources_months`` configuration parameter is set to 0.   442         if self.
config.read_sources_months == 0:
   443             _LOG.info(
"Skip DiaSources fetching")
   447         query = table.select()
   451         for low, upper 
in pixel_ranges:
   454                 exprlist.append(table.c.pixelId == low)
   456                 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
   457         query = query.where(sql.expression.or_(*exprlist))
   461             with _ansi_session(self.
_engine) 
as conn:
   462                 res = conn.execute(query)
   464         _LOG.debug(
"found %s DiaSources", len(sources))
   468         """Returns catalog of DiaSource instances given set of DiaObject IDs.   470         This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by   471         the schema of PPDB table. Re-mapping of the column names is done for   472         some columns (based on column map passed to constructor) but types or   473         units are not changed.   478             Collection of DiaObject IDs   479         dt : `datetime.datetime`   480             Time of the current visit   484         catalog : `lsst.afw.table.SourceCatalog` or `None`   485             Catalog contaning DiaSource records. `None` is returned if   486             ``read_sources_months`` configuration parameter is set to 0 or   487             when ``object_ids`` is empty.   490         if self.
config.read_sources_months == 0:
   491             _LOG.info(
"Skip DiaSources fetching")
   494         if len(object_ids) <= 0:
   495             _LOG.info(
"Skip DiaSources fetching - no Objects")
   502             with _ansi_session(self.
_engine) 
as conn:
   503                 for ids 
in _split(sorted(object_ids), 1000):
   504                     query = 
'SELECT *  FROM "' + table.name + 
'" WHERE '   507                     ids = 
",".join(
str(id) 
for id 
in ids)
   508                     query += 
'"diaObjectId" IN (' + ids + 
') '   511                     res = conn.execute(sql.text(query))
   514         _LOG.debug(
"found %s DiaSources", len(sources))
   518         """Returns catalog of DiaForcedSource instances matching given   521         This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by   522         the schema of L1 database table. Re-mapping of the column names may   523         be done for some columns (based on column map passed to constructor)   524         but types or units are not changed.   529             Collection of DiaObject IDs   530         dt : `datetime.datetime`   531             Time of the current visit   535         catalog : `lsst.afw.table.SourceCatalog` or `None`   536             Catalog contaning DiaForcedSource records. `None` is returned if   537             ``read_sources_months`` configuration parameter is set to 0 or   538             when ``object_ids`` is empty.   541         if self.
config.read_forced_sources_months == 0:
   542             _LOG.info(
"Skip DiaForceSources fetching")
   546             _LOG.info(
"Skip DiaForceSources fetching - no Objects")
   550         table = self.
_schema.forcedSources
   553         with 
Timer(
'DiaForcedSource select', self.
config.timer):
   554             with _ansi_session(self.
_engine) 
as conn:
   555                 for ids 
in _split(sorted(object_ids), 1000):
   557                     query = 
'SELECT *  FROM "' + table.name + 
'" WHERE '   560                     ids = 
",".join(
str(id) 
for id 
in ids)
   561                     query += 
'"diaObjectId" IN (' + ids + 
') '   564                     res = conn.execute(sql.text(query))
   567         _LOG.debug(
"found %s DiaForcedSources", len(sources))
   571         """Store catalog of DiaObjects from current visit.   573         This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be   574         compatible with the schema of PPDB table:   576           - column names must correspond to database table columns   577           - some columns names are re-mapped based on column map passed to   579           - types and units of the columns must match database definitions,   580             no unit conversion is performed presently   581           - columns that have default values in database schema can be   582             omitted from afw schema   583           - this method knows how to fill interval-related columns   584             (validityStart, validityEnd) they do not need to appear in   589         objs : `lsst.afw.table.BaseCatalog`   590             Catalog with DiaObject records   591         dt : `datetime.datetime`   595         ids = sorted([obj[
'id'] 
for obj 
in objs])
   596         _LOG.debug(
"first object ID: %d", ids[0])
   603         with _ansi_session(self.
_engine) 
as conn:
   605             ids = 
",".join(
str(id) 
for id 
in ids)
   607             if self.
config.dia_object_index == 
'last_object_table':
   611                 table = self.
_schema.objects_last
   612                 do_replace = self.
config.object_last_replace
   614                     query = 
'DELETE FROM "' + table.name + 
'" '   615                     query += 
'WHERE "diaObjectId" IN (' + ids + 
') '   621                     with 
Timer(table.name + 
' delete', self.
config.timer):
   622                         res = conn.execute(sql.text(query))
   623                     _LOG.debug(
"deleted %s objects", res.rowcount)
   625                 extra_columns = dict(lastNonForcedSource=dt)
   628                                       extra_columns=extra_columns)
   634                 query = 
'UPDATE "' + table.name + 
'" '   635                 query += 
"SET \"validityEnd\" = '" + 
str(dt) + 
"' "   636                 query += 
'WHERE "diaObjectId" IN (' + ids + 
') '   637                 query += 
'AND "validityEnd" IS NULL'   645                 with 
Timer(table.name + 
' truncate', self.
config.timer):
   646                     res = conn.execute(sql.text(query))
   647                 _LOG.debug(
"truncated %s intervals", res.rowcount)
   650             if self.
config.dia_object_nightly:
   651                 table = self.
_schema.objects_nightly
   654             extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
   657                                   extra_columns=extra_columns)
   660         """Store catalog of DIASources from current visit.   662         This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be   663         compatible with the schema of L1 database table:   665           - column names must correspond to database table columns   666           - some columns names may be re-mapped based on column map passed to   668           - types and units of the columns must match database definitions,   669             no unit conversion is performed presently   670           - columns that have default values in database schema can be   671             omitted from afw schema   675         sources : `lsst.afw.table.BaseCatalog`   676             Catalog containing DiaSource records   680         with _ansi_session(self.
_engine) 
as conn:
   686         """Store a set of DIAForcedSources from current visit.   688         This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be   689         compatible with the schema of L1 database table:   691           - column names must correspond to database table columns   692           - some columns names may be re-mapped based on column map passed to   694           - types and units of the columns must match database definitions,   695             no unit conversion is performed presently   696           - columns that have default values in database schema can be   697             omitted from afw schema   701         sources : `lsst.afw.table.BaseCatalog`   702             Catalog containing DiaForcedSource records   706         with _ansi_session(self.
_engine) 
as conn:
   708             table = self.
_schema.forcedSources
   712         """Implement daily activities like cleanup/vacuum.   714         What should be done during daily cleanup is determined by   715         configuration/schema.   719         if self.
config.dia_object_nightly:
   720             with _ansi_session(self.
_engine) 
as conn:
   721                 query = 
'INSERT INTO "' + self.
_schema.objects.name + 
'" '   722                 query += 
'SELECT * FROM "' + self.
_schema.objects_nightly.name + 
'"'   723                 with 
Timer(
'DiaObjectNightly copy', self.
config.timer):
   724                     conn.execute(sql.text(query))
   726                 query = 
'DELETE FROM "' + self.
_schema.objects_nightly.name + 
'"'   727                 with 
Timer(
'DiaObjectNightly delete', self.
config.timer):
   728                     conn.execute(sql.text(query))
   730         if self.
_engine.name == 
'postgresql':
   733             _LOG.info(
"Running VACUUM on all tables")
   734             connection = self.
_engine.raw_connection()
   735             ISOLATION_LEVEL_AUTOCOMMIT = 0
   736             connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
   737             cursor = connection.cursor()
   738             cursor.execute(
"VACUUM ANALYSE")
   740     def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
   741         """Create or re-create all tables.   746             If True then drop tables before creating new ones.   747         mysql_engine : `str`, optional   748             Name of the MySQL engine to use for new tables.   749         oracle_tablespace : `str`, optional   750             Name of Oracle tablespace.   751         oracle_iot : `bool`, optional   752             Make Index-organized DiaObjectLast table.   755                                 oracle_tablespace=oracle_tablespace,
   756                                 oracle_iot=oracle_iot)
   758     def _explain(self, query, conn):
   759         """Run the query with explain   762         _LOG.info(
"explain for query: %s...", query[:64])
   764         if conn.engine.name == 
'mysql':
   765             query = 
"EXPLAIN EXTENDED " + query
   767             query = 
"EXPLAIN " + query
   769         res = conn.execute(sql.text(query))
   771             _LOG.info(
"explain: %s", res.keys())
   773                 _LOG.info(
"explain: %s", row)
   775             _LOG.info(
"EXPLAIN returned nothing")
   777     def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
   778                          replace=False, extra_columns=None):
   779         """Generic store method.   781         Takes catalog of records and stores a bunch of objects in a table.   785         objects : `lsst.afw.table.BaseCatalog`   786             Catalog containing object records   789         table : `sqlalchemy.Table`   791         schema_table_name : `str`   792             Name of the table to be used for finding table schema.   794             If `True` then use replace instead of INSERT (should be more efficient)   795         extra_columns : `dict`, optional   796             Mapping (column_name, column_value) which gives column values to add   797             to every row, only if column is missing in catalog records.   801             """Quote and escape values"""   804             elif isinstance(v, datetime):
   805                 v = 
"'" + 
str(v) + 
"'"   806             elif isinstance(v, str):
   822         def quoteId(columnName):
   823             """Smart quoting for column names.   824             Lower-case names are not quoted.   826             if not columnName.islower():
   827                 columnName = 
'"' + columnName + 
'"'   830         if conn.engine.name == 
"oracle":
   832                                                schema_table_name, replace,
   835         schema = objects.getSchema()
   836         afw_fields = [field.getName() 
for key, field 
in schema]
   838         column_map = self.
_schema.getAfwColumns(schema_table_name)
   841         fields = [column_map[field].name 
for field 
in afw_fields 
if field 
in column_map]
   844         extra_fields = (extra_columns 
or {}).
keys()
   845         extra_fields = [field 
for field 
in extra_fields 
if field 
not in fields]
   847         if replace 
and conn.engine.name 
in (
'mysql', 
'sqlite'):
   848             query = 
'REPLACE INTO '   850             query = 
'INSERT INTO '   851         qfields = [quoteId(field) 
for field 
in fields + extra_fields]
   852         query += quoteId(table.name) + 
' (' + 
','.join(qfields) + 
') ' + 
'VALUES '   857             for field 
in afw_fields:
   858                 if field 
not in column_map:
   861                 if column_map[field].type == 
"DATETIME" and \
   864                     value = datetime.utcfromtimestamp(value)
   865                 row.append(quoteValue(value))
   866             for field 
in extra_fields:
   867                 row.append(quoteValue(extra_columns[field]))
   868             values.append(
'(' + 
','.join(row) + 
')')
   872             self.
_explain(query + values[0], conn)
   874         query += 
','.join(values)
   876         if replace 
and conn.engine.name == 
'postgresql':
   878             pks = (
'pixelId', 
'diaObjectId')
   879             query += 
" ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".
format(*pks)
   880             fields = [column_map[field].name 
for field 
in afw_fields]
   881             fields = [
'"{0}" = EXCLUDED."{0}"'.
format(field)
   882                       for field 
in fields 
if field 
not in pks]
   883             query += 
', '.join(fields)
   886         _LOG.info(
"%s: will store %d records", table.name, len(objects))
   887         with 
Timer(table.name + 
' insert', self.
config.timer):
   888             res = conn.execute(sql.text(query))
   889         _LOG.debug(
"inserted %s intervals", res.rowcount)
   891     def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
   892                                replace=False, extra_columns=None):
   893         """Store method for Oracle.   895         Takes catalog of records and stores a bunch of objects in a table.   899         objects : `lsst.afw.table.BaseCatalog`   900             Catalog containing object records   903         table : `sqlalchemy.Table`   905         schema_table_name : `str`   906             Name of the table to be used for finding table schema.   908             If `True` then use replace instead of INSERT (should be more efficient)   909         extra_columns : `dict`, optional   910             Mapping (column_name, column_value) which gives column values to add   911             to every row, only if column is missing in catalog records.   914         def quoteId(columnName):
   915             """Smart quoting for column names.   916             Lower-case naems are not quoted (Oracle backend needs them unquoted).   918             if not columnName.islower():
   919                 columnName = 
'"' + columnName + 
'"'   922         schema = objects.getSchema()
   923         afw_fields = [field.getName() 
for key, field 
in schema]
   926         column_map = self.
_schema.getAfwColumns(schema_table_name)
   930         fields = [column_map[field].name 
for field 
in afw_fields
   931                   if field 
in column_map]
   935         extra_fields = (extra_columns 
or {}).
keys()
   936         extra_fields = [field 
for field 
in extra_fields 
if field 
not in fields]
   938         qfields = [quoteId(field) 
for field 
in fields + extra_fields]
   941             vals = [
":col{}".
format(i) 
for i 
in range(len(fields))]
   942             vals += [
":extcol{}".
format(i) 
for i 
in range(len(extra_fields))]
   943             query = 
'INSERT INTO ' + quoteId(table.name)
   944             query += 
' (' + 
','.join(qfields) + 
') VALUES'   945             query += 
' (' + 
','.join(vals) + 
')'   947             qvals = [
":col{} {}".
format(i, quoteId(field)) 
for i, field 
in enumerate(fields)]
   948             qvals += [
":extcol{} {}".
format(i, quoteId(field)) 
for i, field 
in enumerate(extra_fields)]
   949             pks = (
'pixelId', 
'diaObjectId')
   950             onexpr = [
"SRC.{col} = DST.{col}".
format(col=quoteId(col)) 
for col 
in pks]
   951             setexpr = [
"DST.{col} = SRC.{col}".
format(col=quoteId(col))
   952                        for col 
in fields + extra_fields 
if col 
not in pks]
   953             vals = [
"SRC.{col}".
format(col=quoteId(col)) 
for col 
in fields + extra_fields]
   954             query = 
"MERGE INTO {} DST ".
format(quoteId(table.name))
   955             query += 
"USING (SELECT {} FROM DUAL) SRC ".
format(
", ".join(qvals))
   956             query += 
"ON ({}) ".
format(
" AND ".join(onexpr))
   957             query += 
"WHEN MATCHED THEN UPDATE SET {} ".
format(
" ,".join(setexpr))
   958             query += 
"WHEN NOT MATCHED THEN INSERT "   959             query += 
"({}) VALUES ({})".
format(
','.join(qfields), 
','.join(vals))
   966             for field 
in afw_fields:
   967                 if field 
not in column_map:
   970                 if column_map[field].type == 
"DATETIME" and not np.isnan(value):
   972                     value = datetime.utcfromtimestamp(value)
   974                     value = 
str(value.asDegrees())
   975                 elif not np.isfinite(value):
   977                 row[
"col{}".
format(col)] = value
   979             for i, field 
in enumerate(extra_fields):
   980                 row[
"extcol{}".
format(i)] = extra_columns[field]
   984         _LOG.info(
"%s: will store %d records", table.name, len(objects))
   985         with 
Timer(table.name + 
' insert', self.
config.timer):
   986             res = conn.execute(sql.text(query), values)
   987         _LOG.debug(
"inserted %s intervals", res.rowcount)
   989     def _convertResult(self, res, table_name, catalog=None):
   990         """Convert result set into output catalog.   994         res : `sqlalchemy.ResultProxy`   995             SQLAlchemy result set returned by query.   998         catalog : `lsst.afw.table.BaseCatalog`   999             If not None then extend existing catalog  1003         catalog : `lsst.afw.table.SourceCatalog`  1004              If ``catalog`` is None then new instance is returned, otherwise  1005              ``catalog`` is updated and returned.  1008         columns = res.keys()
  1009         schema, col_map = self.
_schema.getAfwSchema(table_name, columns)
  1011             _LOG.debug(
"_convertResult: schema: %s", schema)
  1012             _LOG.debug(
"_convertResult: col_map: %s", col_map)
  1017             record = catalog.addNew()
  1018             for col, value 
in row.items():
  1020                 col = col_map.get(col)
  1022                     if isinstance(value, datetime):
  1024                         value = 
int((value - datetime.utcfromtimestamp(0)).total_seconds())
  1025                     elif col.getTypeString() == 
'Angle' and value 
is not None:
  1026                         value = value * geom.degrees
  1027                     if value 
is not None:
  1028                         record.set(col, value)
 def __exit__(self, exc_type, exc_val, exc_tb)
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def saveVisit(self, visitId, visitTime)
def _convertResult(self, res, table_name, catalog=None)
def __init__(self, config, afw_schemas=None)
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def getDiaSources(self, object_ids, dt)
A class representing an angle. 
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package 
def _explain(self, query, conn)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def getDiaForcedSources(self, object_ids, dt)
def getDiaObjects(self, pixel_ranges)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def storeDiaObjects(self, objs, dt)
def getDiaSourcesInRegion(self, pixel_ranges, dt)
def storeDiaForcedSources(self, sources)
def storeDiaSources(self, sources)
daf::base::PropertyList * list
_log_before_cursor_execute