22 """Module defining Ppdb class and related methods. 25 __all__ = [
"PpdbConfig",
"Ppdb",
"Visit"]
27 from collections
import namedtuple
28 from contextlib
import contextmanager
29 from datetime
import datetime
40 from sqlalchemy
import (func, sql)
41 from sqlalchemy.pool
import NullPool
42 from .
import timer, ppdbSchema
45 _LOG = logging.getLogger(__name__.partition(
".")[2])
49 """Timer class defining context manager which tracks execution timing. 53 with Timer("timer_name"): 56 On exit from block it will print elapsed time. 58 See also :py:mod:`timer` module. 60 def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
68 Enter context, start timer 77 Exit context, stop and dump timer 87 def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
90 _LOG.info(
"before_cursor_execute")
93 def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
100 def _split(seq, nItems):
101 """Split a sequence into smaller sequences""" 109 Visit = namedtuple(
'Visit',
'visitId visitTime lastObjectId lastSourceId')
113 def _ansi_session(engine):
114 """Returns a connection, makes sure that ANSI mode is set for MySQL 116 with engine.begin()
as conn:
117 if engine.name ==
'mysql':
118 conn.execute(sql.text(
"SET SESSION SQL_MODE = 'ANSI'"))
123 def _data_file_name(basename):
124 """Return path name of a data file. 126 return os.path.join(
getPackageDir(
"dax_ppdb"),
"data", basename)
131 db_url =
Field(dtype=str, doc=
"SQLAlchemy database connection URI")
133 doc=
"Transaction isolation level",
134 allowed={
"READ_COMMITTED":
"Read committed",
135 "READ_UNCOMMITTED":
"Read uncommitted",
136 "REPEATABLE_READ":
"Repeatable read",
137 "SERIALIZABLE":
"Serializable"},
138 default=
"READ_COMMITTED")
140 doc=(
"If False then disable SQLAlchemy connection pool. " 141 "Do not use connection pool when forking."),
143 connection_timeout =
Field(dtype=float,
144 doc=
"Maximum time to wait time for database lock to be released before " 145 "exiting. Defaults to sqlachemy defaults if not set.",
148 doc=
"If True then pass SQLAlchemy echo option.",
151 doc=
"Indexing mode for DiaObject table",
152 allowed={
'baseline':
"Index defined in baseline schema",
153 'pix_id_iov':
"(pixelId, objectId, iovStart) PK",
154 'last_object_table':
"Separate DiaObjectLast table"},
156 dia_object_nightly =
Field(dtype=bool,
157 doc=
"Use separate nightly table for DiaObject",
159 read_sources_months =
Field(dtype=int,
160 doc=
"Number of months of history to read from DiaSource",
162 read_forced_sources_months =
Field(dtype=int,
163 doc=
"Number of months of history to read from DiaForcedSource",
166 doc=
"List of columns to read from DiaObject, by default read all columns",
168 object_last_replace =
Field(dtype=bool,
169 doc=
"If True (default) then use \"upsert\" for DiaObjectsLast table",
172 doc=
"Location of (YAML) configuration file with standard schema",
173 default=_data_file_name(
"ppdb-schema.yaml"))
174 extra_schema_file =
Field(dtype=str,
175 doc=
"Location of (YAML) configuration file with extra schema",
176 default=_data_file_name(
"ppdb-schema-extra.yaml"))
178 doc=
"Location of (YAML) configuration file with column mapping",
179 default=_data_file_name(
"ppdb-afw-map.yaml"))
181 doc=
"Prefix to add to table names and index names",
184 doc=
"If True then run EXPLAIN SQL command on each executed query",
187 doc=
"If True then print/log timing information",
189 diaobject_index_hint =
Field(dtype=str,
190 doc=
"Name of the index to use with Oracle index hint",
192 dynamic_sampling_hint =
Field(dtype=int,
193 doc=
"If non-zero then use dynamic_sampling hint",
196 doc=
"If non-zero then use cardinality hint",
201 raise ValueError(
"Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' " 202 "Use 'READ_UNCOMMITTED' instead.")
206 """Interface to L1 database, hides all database access details. 208 The implementation is configured via standard ``pex_config`` mechanism 209 using `PpdbConfig` configuration class. For an example of different 210 configurations check config/ folder. 214 config : `PpdbConfig` 215 afw_schemas : `dict`, optional 216 Dictionary with table name for a key and `afw.table.Schema` 217 for a value. Columns in schema will be added to standard 226 _LOG.info(
"PPDB Configuration:")
227 _LOG.info(
" dia_object_index: %s", self.
config.dia_object_index)
228 _LOG.info(
" dia_object_nightly: %s", self.
config.dia_object_nightly)
229 _LOG.info(
" read_sources_months: %s", self.
config.read_sources_months)
230 _LOG.info(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
231 _LOG.info(
" dia_object_columns: %s", self.
config.dia_object_columns)
232 _LOG.info(
" object_last_replace: %s", self.
config.object_last_replace)
233 _LOG.info(
" schema_file: %s", self.
config.schema_file)
234 _LOG.info(
" extra_schema_file: %s", self.
config.extra_schema_file)
235 _LOG.info(
" column_map: %s", self.
config.column_map)
236 _LOG.info(
" schema prefix: %s", self.
config.prefix)
240 kw = dict(echo=self.
config.sql_echo)
242 if not self.
config.connection_pool:
243 kw.update(poolclass=NullPool)
244 if self.
config.isolation_level
is not None:
245 kw.update(isolation_level=self.
config.isolation_level)
246 if self.
config.connection_timeout
is not None:
247 if self.
config.db_url.startswith(
"sqlite"):
248 conn_args.update(timeout=self.
config.connection_timeout)
249 elif self.
config.db_url.startswith((
"postgresql",
"mysql")):
250 conn_args.update(connect_timeout=self.
config.connection_timeout)
251 kw.update(connect_args=conn_args)
252 self.
_engine = sqlalchemy.create_engine(self.
config.db_url, **kw)
255 dia_object_index=self.
config.dia_object_index,
256 dia_object_nightly=self.
config.dia_object_nightly,
257 schema_file=self.
config.schema_file,
258 extra_schema_file=self.
config.extra_schema_file,
259 column_map=self.
config.column_map,
260 afw_schemas=afw_schemas,
261 prefix=self.
config.prefix)
264 """Returns last visit information or `None` if visits table is empty. 266 Visits table is used by ap_proto to track visit information, it is 267 not a part of the regular PPDB schema. 271 visit : `Visit` or `None` 272 Last stored visit info or `None` if there was nothing stored yet. 275 with self.
_engine.begin()
as conn:
277 stmnt = sql.select([sql.func.max(self.
_schema.visits.c.visitId),
278 sql.func.max(self.
_schema.visits.c.visitTime)])
279 res = conn.execute(stmnt)
286 _LOG.info(
"lastVisit: visitId: %s visitTime: %s (%s)", visitId,
287 visitTime,
type(visitTime))
290 stmnt = sql.select([sql.func.max(self.
_schema.objects.c.diaObjectId)])
291 lastObjectId = conn.scalar(stmnt)
292 stmnt = sql.select([sql.func.max(self.
_schema.sources.c.diaSourceId)])
293 lastSourceId = conn.scalar(stmnt)
295 return Visit(visitId=visitId, visitTime=visitTime,
296 lastObjectId=lastObjectId, lastSourceId=lastSourceId)
299 """Store visit information. 301 This method is only used by ``ap_proto`` script from ``l1dbproto`` 302 and is not intended for production pipelines. 308 visitTime : `datetime.datetime` 312 ins = self.
_schema.visits.insert().values(visitId=visitId,
317 """Returns dictionary with the table names and row counts. 319 Used by ``ap_proto`` to keep track of the size of the database tables. 320 Depending on database technology this could be expensive operation. 325 Dict where key is a table name and value is a row count. 329 if self.
config.dia_object_index ==
'last_object_table':
330 tables.append(self.
_schema.objects_last)
332 stmt = sql.select([func.count()]).select_from(table)
333 count = self.
_engine.scalar(stmt)
334 res[table.name] = count
339 """Returns catalog of DiaObject instances from given region. 341 Objects are searched based on pixelization index and region is 342 determined by the set of indices. There is no assumption on a 343 particular type of index, client is responsible for consistency 344 when calculating pixelization indices. 346 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 347 the schema of PPDB table. Re-mapping of the column names is done for 348 some columns (based on column map passed to constructor) but types 349 or units are not changed. 351 Returns only the last version of each DiaObject. 355 pixel_ranges : `list` of `tuple` 356 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 357 This defines set of pixel indices to be included in result. 361 catalog : `lsst.afw.table.SourceCatalog` 362 Catalog contaning DiaObject records. 366 if self.
config.dia_object_index ==
'last_object_table':
367 table = self.
_schema.objects_last
370 if not self.
config.dia_object_columns:
371 query = table.select()
373 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
374 query = sql.select(columns)
376 if self.
config.diaobject_index_hint:
377 val = self.
config.diaobject_index_hint
378 query = query.with_hint(table,
'index_rs_asc(%(name)s "{}")'.
format(val))
379 if self.
config.dynamic_sampling_hint > 0:
380 val = self.
config.dynamic_sampling_hint
381 query = query.with_hint(table,
'dynamic_sampling(%(name)s {})'.
format(val))
382 if self.
config.cardinality_hint > 0:
383 val = self.
config.cardinality_hint
384 query = query.with_hint(table,
'FIRST_ROWS_1 cardinality(%(name)s {})'.
format(val))
388 for low, upper
in pixel_ranges:
391 exprlist.append(table.c.pixelId == low)
393 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
394 query = query.where(sql.expression.or_(*exprlist))
397 if self.
config.dia_object_index !=
'last_object_table':
398 query = query.where(table.c.validityEnd ==
None)
400 _LOG.debug(
"query: %s", query)
408 with self.
_engine.begin()
as conn:
409 res = conn.execute(query)
411 _LOG.debug(
"found %s DiaObjects", len(objects))
415 """Returns catalog of DiaSource instances from given region. 417 Sources are searched based on pixelization index and region is 418 determined by the set of indices. There is no assumption on a 419 particular type of index, client is responsible for consistency 420 when calculating pixelization indices. 422 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 423 the schema of PPDB table. Re-mapping of the column names is done for 424 some columns (based on column map passed to constructor) but types or 425 units are not changed. 429 pixel_ranges : `list` of `tuple` 430 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 431 This defines set of pixel indices to be included in result. 432 dt : `datetime.datetime` 433 Time of the current visit 437 catalog : `lsst.afw.table.SourceCatalog` or `None` 438 Catalog contaning DiaSource records. `None` is returned if 439 ``read_sources_months`` configuration parameter is set to 0. 442 if self.
config.read_sources_months == 0:
443 _LOG.info(
"Skip DiaSources fetching")
447 query = table.select()
451 for low, upper
in pixel_ranges:
454 exprlist.append(table.c.pixelId == low)
456 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
457 query = query.where(sql.expression.or_(*exprlist))
461 with _ansi_session(self.
_engine)
as conn:
462 res = conn.execute(query)
464 _LOG.debug(
"found %s DiaSources", len(sources))
468 """Returns catalog of DiaSource instances given set of DiaObject IDs. 470 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 471 the schema of PPDB table. Re-mapping of the column names is done for 472 some columns (based on column map passed to constructor) but types or 473 units are not changed. 478 Collection of DiaObject IDs 479 dt : `datetime.datetime` 480 Time of the current visit 484 catalog : `lsst.afw.table.SourceCatalog` or `None` 485 Catalog contaning DiaSource records. `None` is returned if 486 ``read_sources_months`` configuration parameter is set to 0 or 487 when ``object_ids`` is empty. 490 if self.
config.read_sources_months == 0:
491 _LOG.info(
"Skip DiaSources fetching")
494 if len(object_ids) <= 0:
495 _LOG.info(
"Skip DiaSources fetching - no Objects")
502 with _ansi_session(self.
_engine)
as conn:
503 for ids
in _split(sorted(object_ids), 1000):
504 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 507 ids =
",".join(
str(id)
for id
in ids)
508 query +=
'"diaObjectId" IN (' + ids +
') ' 511 res = conn.execute(sql.text(query))
514 _LOG.debug(
"found %s DiaSources", len(sources))
518 """Returns catalog of DiaForcedSource instances matching given 521 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 522 the schema of L1 database table. Re-mapping of the column names may 523 be done for some columns (based on column map passed to constructor) 524 but types or units are not changed. 529 Collection of DiaObject IDs 530 dt : `datetime.datetime` 531 Time of the current visit 535 catalog : `lsst.afw.table.SourceCatalog` or `None` 536 Catalog contaning DiaForcedSource records. `None` is returned if 537 ``read_sources_months`` configuration parameter is set to 0 or 538 when ``object_ids`` is empty. 541 if self.
config.read_forced_sources_months == 0:
542 _LOG.info(
"Skip DiaForceSources fetching")
546 _LOG.info(
"Skip DiaForceSources fetching - no Objects")
550 table = self.
_schema.forcedSources
553 with
Timer(
'DiaForcedSource select', self.
config.timer):
554 with _ansi_session(self.
_engine)
as conn:
555 for ids
in _split(sorted(object_ids), 1000):
557 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 560 ids =
",".join(
str(id)
for id
in ids)
561 query +=
'"diaObjectId" IN (' + ids +
') ' 564 res = conn.execute(sql.text(query))
567 _LOG.debug(
"found %s DiaForcedSources", len(sources))
571 """Store catalog of DiaObjects from current visit. 573 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 574 compatible with the schema of PPDB table: 576 - column names must correspond to database table columns 577 - some columns names are re-mapped based on column map passed to 579 - types and units of the columns must match database definitions, 580 no unit conversion is performed presently 581 - columns that have default values in database schema can be 582 omitted from afw schema 583 - this method knows how to fill interval-related columns 584 (validityStart, validityEnd) they do not need to appear in 589 objs : `lsst.afw.table.BaseCatalog` 590 Catalog with DiaObject records 591 dt : `datetime.datetime` 595 ids = sorted([obj[
'id']
for obj
in objs])
596 _LOG.debug(
"first object ID: %d", ids[0])
603 with _ansi_session(self.
_engine)
as conn:
605 ids =
",".join(
str(id)
for id
in ids)
607 if self.
config.dia_object_index ==
'last_object_table':
611 table = self.
_schema.objects_last
612 do_replace = self.
config.object_last_replace
614 query =
'DELETE FROM "' + table.name +
'" ' 615 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 621 with
Timer(table.name +
' delete', self.
config.timer):
622 res = conn.execute(sql.text(query))
623 _LOG.debug(
"deleted %s objects", res.rowcount)
625 extra_columns = dict(lastNonForcedSource=dt)
628 extra_columns=extra_columns)
634 query =
'UPDATE "' + table.name +
'" ' 635 query +=
"SET \"validityEnd\" = '" +
str(dt) +
"' " 636 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 637 query +=
'AND "validityEnd" IS NULL' 645 with
Timer(table.name +
' truncate', self.
config.timer):
646 res = conn.execute(sql.text(query))
647 _LOG.debug(
"truncated %s intervals", res.rowcount)
650 if self.
config.dia_object_nightly:
651 table = self.
_schema.objects_nightly
654 extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
657 extra_columns=extra_columns)
660 """Store catalog of DIASources from current visit. 662 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 663 compatible with the schema of L1 database table: 665 - column names must correspond to database table columns 666 - some columns names may be re-mapped based on column map passed to 668 - types and units of the columns must match database definitions, 669 no unit conversion is performed presently 670 - columns that have default values in database schema can be 671 omitted from afw schema 675 sources : `lsst.afw.table.BaseCatalog` 676 Catalog containing DiaSource records 680 with _ansi_session(self.
_engine)
as conn:
686 """Store a set of DIAForcedSources from current visit. 688 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 689 compatible with the schema of L1 database table: 691 - column names must correspond to database table columns 692 - some columns names may be re-mapped based on column map passed to 694 - types and units of the columns must match database definitions, 695 no unit conversion is performed presently 696 - columns that have default values in database schema can be 697 omitted from afw schema 701 sources : `lsst.afw.table.BaseCatalog` 702 Catalog containing DiaForcedSource records 706 with _ansi_session(self.
_engine)
as conn:
708 table = self.
_schema.forcedSources
712 """Implement daily activities like cleanup/vacuum. 714 What should be done during daily cleanup is determined by 715 configuration/schema. 719 if self.
config.dia_object_nightly:
720 with _ansi_session(self.
_engine)
as conn:
721 query =
'INSERT INTO "' + self.
_schema.objects.name +
'" ' 722 query +=
'SELECT * FROM "' + self.
_schema.objects_nightly.name +
'"' 723 with
Timer(
'DiaObjectNightly copy', self.
config.timer):
724 conn.execute(sql.text(query))
726 query =
'DELETE FROM "' + self.
_schema.objects_nightly.name +
'"' 727 with
Timer(
'DiaObjectNightly delete', self.
config.timer):
728 conn.execute(sql.text(query))
730 if self.
_engine.name ==
'postgresql':
733 _LOG.info(
"Running VACUUM on all tables")
734 connection = self.
_engine.raw_connection()
735 ISOLATION_LEVEL_AUTOCOMMIT = 0
736 connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
737 cursor = connection.cursor()
738 cursor.execute(
"VACUUM ANALYSE")
740 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
741 """Create or re-create all tables. 746 If True then drop tables before creating new ones. 747 mysql_engine : `str`, optional 748 Name of the MySQL engine to use for new tables. 749 oracle_tablespace : `str`, optional 750 Name of Oracle tablespace. 751 oracle_iot : `bool`, optional 752 Make Index-organized DiaObjectLast table. 755 oracle_tablespace=oracle_tablespace,
756 oracle_iot=oracle_iot)
758 def _explain(self, query, conn):
759 """Run the query with explain 762 _LOG.info(
"explain for query: %s...", query[:64])
764 if conn.engine.name ==
'mysql':
765 query =
"EXPLAIN EXTENDED " + query
767 query =
"EXPLAIN " + query
769 res = conn.execute(sql.text(query))
771 _LOG.info(
"explain: %s", res.keys())
773 _LOG.info(
"explain: %s", row)
775 _LOG.info(
"EXPLAIN returned nothing")
777 def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
778 replace=False, extra_columns=None):
779 """Generic store method. 781 Takes catalog of records and stores a bunch of objects in a table. 785 objects : `lsst.afw.table.BaseCatalog` 786 Catalog containing object records 789 table : `sqlalchemy.Table` 791 schema_table_name : `str` 792 Name of the table to be used for finding table schema. 794 If `True` then use replace instead of INSERT (should be more efficient) 795 extra_columns : `dict`, optional 796 Mapping (column_name, column_value) which gives column values to add 797 to every row, only if column is missing in catalog records. 801 """Quote and escape values""" 804 elif isinstance(v, datetime):
805 v =
"'" +
str(v) +
"'" 806 elif isinstance(v, str):
822 def quoteId(columnName):
823 """Smart quoting for column names. 824 Lower-case names are not quoted. 826 if not columnName.islower():
827 columnName =
'"' + columnName +
'"' 830 if conn.engine.name ==
"oracle":
832 schema_table_name, replace,
835 schema = objects.getSchema()
836 afw_fields = [field.getName()
for key, field
in schema]
838 column_map = self.
_schema.getAfwColumns(schema_table_name)
841 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
844 extra_fields = (extra_columns
or {}).
keys()
845 extra_fields = [field
for field
in extra_fields
if field
not in fields]
847 if replace
and conn.engine.name
in (
'mysql',
'sqlite'):
848 query =
'REPLACE INTO ' 850 query =
'INSERT INTO ' 851 qfields = [quoteId(field)
for field
in fields + extra_fields]
852 query += quoteId(table.name) +
' (' +
','.join(qfields) +
') ' +
'VALUES ' 857 for field
in afw_fields:
858 if field
not in column_map:
861 if column_map[field].type ==
"DATETIME" and \
864 value = datetime.utcfromtimestamp(value)
865 row.append(quoteValue(value))
866 for field
in extra_fields:
867 row.append(quoteValue(extra_columns[field]))
868 values.append(
'(' +
','.join(row) +
')')
872 self.
_explain(query + values[0], conn)
874 query +=
','.join(values)
876 if replace
and conn.engine.name ==
'postgresql':
878 pks = (
'pixelId',
'diaObjectId')
879 query +=
" ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".
format(*pks)
880 fields = [column_map[field].name
for field
in afw_fields]
881 fields = [
'"{0}" = EXCLUDED."{0}"'.
format(field)
882 for field
in fields
if field
not in pks]
883 query +=
', '.join(fields)
886 _LOG.info(
"%s: will store %d records", table.name, len(objects))
887 with
Timer(table.name +
' insert', self.
config.timer):
888 res = conn.execute(sql.text(query))
889 _LOG.debug(
"inserted %s intervals", res.rowcount)
891 def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
892 replace=False, extra_columns=None):
893 """Store method for Oracle. 895 Takes catalog of records and stores a bunch of objects in a table. 899 objects : `lsst.afw.table.BaseCatalog` 900 Catalog containing object records 903 table : `sqlalchemy.Table` 905 schema_table_name : `str` 906 Name of the table to be used for finding table schema. 908 If `True` then use replace instead of INSERT (should be more efficient) 909 extra_columns : `dict`, optional 910 Mapping (column_name, column_value) which gives column values to add 911 to every row, only if column is missing in catalog records. 914 def quoteId(columnName):
915 """Smart quoting for column names. 916 Lower-case naems are not quoted (Oracle backend needs them unquoted). 918 if not columnName.islower():
919 columnName =
'"' + columnName +
'"' 922 schema = objects.getSchema()
923 afw_fields = [field.getName()
for key, field
in schema]
926 column_map = self.
_schema.getAfwColumns(schema_table_name)
930 fields = [column_map[field].name
for field
in afw_fields
931 if field
in column_map]
935 extra_fields = (extra_columns
or {}).
keys()
936 extra_fields = [field
for field
in extra_fields
if field
not in fields]
938 qfields = [quoteId(field)
for field
in fields + extra_fields]
941 vals = [
":col{}".
format(i)
for i
in range(len(fields))]
942 vals += [
":extcol{}".
format(i)
for i
in range(len(extra_fields))]
943 query =
'INSERT INTO ' + quoteId(table.name)
944 query +=
' (' +
','.join(qfields) +
') VALUES' 945 query +=
' (' +
','.join(vals) +
')' 947 qvals = [
":col{} {}".
format(i, quoteId(field))
for i, field
in enumerate(fields)]
948 qvals += [
":extcol{} {}".
format(i, quoteId(field))
for i, field
in enumerate(extra_fields)]
949 pks = (
'pixelId',
'diaObjectId')
950 onexpr = [
"SRC.{col} = DST.{col}".
format(col=quoteId(col))
for col
in pks]
951 setexpr = [
"DST.{col} = SRC.{col}".
format(col=quoteId(col))
952 for col
in fields + extra_fields
if col
not in pks]
953 vals = [
"SRC.{col}".
format(col=quoteId(col))
for col
in fields + extra_fields]
954 query =
"MERGE INTO {} DST ".
format(quoteId(table.name))
955 query +=
"USING (SELECT {} FROM DUAL) SRC ".
format(
", ".join(qvals))
956 query +=
"ON ({}) ".
format(
" AND ".join(onexpr))
957 query +=
"WHEN MATCHED THEN UPDATE SET {} ".
format(
" ,".join(setexpr))
958 query +=
"WHEN NOT MATCHED THEN INSERT " 959 query +=
"({}) VALUES ({})".
format(
','.join(qfields),
','.join(vals))
966 for field
in afw_fields:
967 if field
not in column_map:
970 if column_map[field].type ==
"DATETIME" and not np.isnan(value):
972 value = datetime.utcfromtimestamp(value)
974 value =
str(value.asDegrees())
975 elif not np.isfinite(value):
977 row[
"col{}".
format(col)] = value
979 for i, field
in enumerate(extra_fields):
980 row[
"extcol{}".
format(i)] = extra_columns[field]
984 _LOG.info(
"%s: will store %d records", table.name, len(objects))
985 with
Timer(table.name +
' insert', self.
config.timer):
986 res = conn.execute(sql.text(query), values)
987 _LOG.debug(
"inserted %s intervals", res.rowcount)
989 def _convertResult(self, res, table_name, catalog=None):
990 """Convert result set into output catalog. 994 res : `sqlalchemy.ResultProxy` 995 SQLAlchemy result set returned by query. 998 catalog : `lsst.afw.table.BaseCatalog` 999 If not None then extend existing catalog 1003 catalog : `lsst.afw.table.SourceCatalog` 1004 If ``catalog`` is None then new instance is returned, otherwise 1005 ``catalog`` is updated and returned. 1008 columns = res.keys()
1009 schema, col_map = self.
_schema.getAfwSchema(table_name, columns)
1011 _LOG.debug(
"_convertResult: schema: %s", schema)
1012 _LOG.debug(
"_convertResult: col_map: %s", col_map)
1017 record = catalog.addNew()
1018 for col, value
in row.items():
1020 col = col_map.get(col)
1022 if isinstance(value, datetime):
1024 value =
int((value - datetime.utcfromtimestamp(0)).total_seconds())
1025 elif col.getTypeString() ==
'Angle' and value
is not None:
1026 value = value * geom.degrees
1027 if value
is not None:
1028 record.set(col, value)
def __exit__(self, exc_type, exc_val, exc_tb)
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def saveVisit(self, visitId, visitTime)
def _convertResult(self, res, table_name, catalog=None)
def __init__(self, config, afw_schemas=None)
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def getDiaSources(self, object_ids, dt)
A class representing an angle.
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
def _explain(self, query, conn)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def getDiaForcedSources(self, object_ids, dt)
def getDiaObjects(self, pixel_ranges)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def storeDiaObjects(self, objs, dt)
def getDiaSourcesInRegion(self, pixel_ranges, dt)
def storeDiaForcedSources(self, sources)
def storeDiaSources(self, sources)
daf::base::PropertyList * list
_log_before_cursor_execute