22 """Module defining Ppdb class and related methods. 25 __all__ = [
"PpdbConfig",
"Ppdb",
"Visit"]
27 from collections
import namedtuple
28 from contextlib
import contextmanager
29 from datetime
import datetime
41 from sqlalchemy
import (func, sql)
42 from sqlalchemy.pool
import NullPool
43 from .
import timer, ppdbSchema
46 _LOG = logging.getLogger(__name__.partition(
".")[2])
50 """Timer class defining context manager which tracks execution timing. 54 with Timer("timer_name"): 57 On exit from block it will print elapsed time. 59 See also :py:mod:`timer` module. 61 def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
69 Enter context, start timer 78 Exit context, stop and dump timer 88 def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
91 _LOG.info(
"before_cursor_execute")
94 def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
101 def _split(seq, nItems):
102 """Split a sequence into smaller sequences""" 110 Visit = namedtuple(
'Visit',
'visitId visitTime lastObjectId lastSourceId')
114 def _ansi_session(engine):
115 """Returns a connection, makes sure that ANSI mode is set for MySQL 117 with engine.begin()
as conn:
118 if engine.name ==
'mysql':
119 conn.execute(sql.text(
"SET SESSION SQL_MODE = 'ANSI'"))
124 def _data_file_name(basename):
125 """Return path name of a data file. 127 return os.path.join(
getPackageDir(
"dax_ppdb"),
"data", basename)
132 db_url =
Field(dtype=str, doc=
"SQLAlchemy database connection URI")
134 doc=
"Transaction isolation level",
135 allowed={
"READ_COMMITTED":
"Read committed",
136 "READ_UNCOMMITTED":
"Read uncommitted",
137 "REPEATABLE_READ":
"Repeatable read",
138 "SERIALIZABLE":
"Serializable"},
139 default=
"READ_COMMITTED",
142 doc=(
"If False then disable SQLAlchemy connection pool. " 143 "Do not use connection pool when forking."),
145 connection_timeout =
Field(dtype=float,
146 doc=
"Maximum time to wait time for database lock to be released before " 147 "exiting. Defaults to sqlachemy defaults if not set.",
151 doc=
"If True then pass SQLAlchemy echo option.",
154 doc=
"Indexing mode for DiaObject table",
155 allowed={
'baseline':
"Index defined in baseline schema",
156 'pix_id_iov':
"(pixelId, objectId, iovStart) PK",
157 'last_object_table':
"Separate DiaObjectLast table"},
159 dia_object_nightly =
Field(dtype=bool,
160 doc=
"Use separate nightly table for DiaObject",
162 read_sources_months =
Field(dtype=int,
163 doc=
"Number of months of history to read from DiaSource",
165 read_forced_sources_months =
Field(dtype=int,
166 doc=
"Number of months of history to read from DiaForcedSource",
169 doc=
"List of columns to read from DiaObject, by default read all columns",
171 object_last_replace =
Field(dtype=bool,
172 doc=
"If True (default) then use \"upsert\" for DiaObjectsLast table",
175 doc=
"Location of (YAML) configuration file with standard schema",
176 default=_data_file_name(
"ppdb-schema.yaml"))
177 extra_schema_file =
Field(dtype=str,
178 doc=
"Location of (YAML) configuration file with extra schema",
179 default=_data_file_name(
"ppdb-schema-extra.yaml"))
181 doc=
"Location of (YAML) configuration file with column mapping",
182 default=_data_file_name(
"ppdb-afw-map.yaml"))
184 doc=
"Prefix to add to table names and index names",
187 doc=
"If True then run EXPLAIN SQL command on each executed query",
190 doc=
"If True then print/log timing information",
192 diaobject_index_hint =
Field(dtype=str,
193 doc=
"Name of the index to use with Oracle index hint",
196 dynamic_sampling_hint =
Field(dtype=int,
197 doc=
"If non-zero then use dynamic_sampling hint",
200 doc=
"If non-zero then use cardinality hint",
206 raise ValueError(
"Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' " 207 "Use 'READ_UNCOMMITTED' instead.")
211 """Interface to L1 database, hides all database access details. 213 The implementation is configured via standard ``pex_config`` mechanism 214 using `PpdbConfig` configuration class. For an example of different 215 configurations check config/ folder. 219 config : `PpdbConfig` 220 afw_schemas : `dict`, optional 221 Dictionary with table name for a key and `afw.table.Schema` 222 for a value. Columns in schema will be added to standard 231 _LOG.debug(
"PPDB Configuration:")
232 _LOG.debug(
" dia_object_index: %s", self.
config.dia_object_index)
233 _LOG.debug(
" dia_object_nightly: %s", self.
config.dia_object_nightly)
234 _LOG.debug(
" read_sources_months: %s", self.
config.read_sources_months)
235 _LOG.debug(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
236 _LOG.debug(
" dia_object_columns: %s", self.
config.dia_object_columns)
237 _LOG.debug(
" object_last_replace: %s", self.
config.object_last_replace)
238 _LOG.debug(
" schema_file: %s", self.
config.schema_file)
239 _LOG.debug(
" extra_schema_file: %s", self.
config.extra_schema_file)
240 _LOG.debug(
" column_map: %s", self.
config.column_map)
241 _LOG.debug(
" schema prefix: %s", self.
config.prefix)
245 kw = dict(echo=self.
config.sql_echo)
247 if not self.
config.connection_pool:
248 kw.update(poolclass=NullPool)
249 if self.
config.isolation_level
is not None:
250 kw.update(isolation_level=self.
config.isolation_level)
251 if self.
config.connection_timeout
is not None:
252 if self.
config.db_url.startswith(
"sqlite"):
253 conn_args.update(timeout=self.
config.connection_timeout)
254 elif self.
config.db_url.startswith((
"postgresql",
"mysql")):
255 conn_args.update(connect_timeout=self.
config.connection_timeout)
256 kw.update(connect_args=conn_args)
257 self.
_engine = sqlalchemy.create_engine(self.
config.db_url, **kw)
260 dia_object_index=self.
config.dia_object_index,
261 dia_object_nightly=self.
config.dia_object_nightly,
262 schema_file=self.
config.schema_file,
263 extra_schema_file=self.
config.extra_schema_file,
264 column_map=self.
config.column_map,
265 afw_schemas=afw_schemas,
266 prefix=self.
config.prefix)
269 """Returns last visit information or `None` if visits table is empty. 271 Visits table is used by ap_proto to track visit information, it is 272 not a part of the regular PPDB schema. 276 visit : `Visit` or `None` 277 Last stored visit info or `None` if there was nothing stored yet. 280 with self.
_engine.begin()
as conn:
282 stmnt = sql.select([sql.func.max(self.
_schema.visits.c.visitId),
283 sql.func.max(self.
_schema.visits.c.visitTime)])
284 res = conn.execute(stmnt)
291 _LOG.info(
"lastVisit: visitId: %s visitTime: %s (%s)", visitId,
292 visitTime,
type(visitTime))
295 stmnt = sql.select([sql.func.max(self.
_schema.objects.c.diaObjectId)])
296 lastObjectId = conn.scalar(stmnt)
297 stmnt = sql.select([sql.func.max(self.
_schema.sources.c.diaSourceId)])
298 lastSourceId = conn.scalar(stmnt)
300 return Visit(visitId=visitId, visitTime=visitTime,
301 lastObjectId=lastObjectId, lastSourceId=lastSourceId)
304 """Store visit information. 306 This method is only used by ``ap_proto`` script from ``l1dbproto`` 307 and is not intended for production pipelines. 313 visitTime : `datetime.datetime` 317 ins = self.
_schema.visits.insert().values(visitId=visitId,
322 """Returns dictionary with the table names and row counts. 324 Used by ``ap_proto`` to keep track of the size of the database tables. 325 Depending on database technology this could be expensive operation. 330 Dict where key is a table name and value is a row count. 334 if self.
config.dia_object_index ==
'last_object_table':
335 tables.append(self.
_schema.objects_last)
337 stmt = sql.select([func.count()]).select_from(table)
338 count = self.
_engine.scalar(stmt)
339 res[table.name] = count
344 """Returns catalog of DiaObject instances from given region. 346 Objects are searched based on pixelization index and region is 347 determined by the set of indices. There is no assumption on a 348 particular type of index, client is responsible for consistency 349 when calculating pixelization indices. 351 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 352 the schema of PPDB table. Re-mapping of the column names is done for 353 some columns (based on column map passed to constructor) but types 354 or units are not changed. 356 Returns only the last version of each DiaObject. 360 pixel_ranges : `list` of `tuple` 361 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 362 This defines set of pixel indices to be included in result. 363 return_pandas : `bool` 364 Return a `pandas.DataFrame` instead of 365 `lsst.afw.table.SourceCatalog`. 369 catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame` 370 Catalog containing DiaObject records. 374 if self.
config.dia_object_index ==
'last_object_table':
375 table = self.
_schema.objects_last
378 if not self.
config.dia_object_columns:
379 query = table.select()
381 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
382 query = sql.select(columns)
384 if self.
config.diaobject_index_hint:
385 val = self.
config.diaobject_index_hint
386 query = query.with_hint(table,
'index_rs_asc(%(name)s "{}")'.
format(val))
387 if self.
config.dynamic_sampling_hint > 0:
388 val = self.
config.dynamic_sampling_hint
389 query = query.with_hint(table,
'dynamic_sampling(%(name)s {})'.
format(val))
390 if self.
config.cardinality_hint > 0:
391 val = self.
config.cardinality_hint
392 query = query.with_hint(table,
'FIRST_ROWS_1 cardinality(%(name)s {})'.
format(val))
396 for low, upper
in pixel_ranges:
399 exprlist.append(table.c.pixelId == low)
401 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
402 query = query.where(sql.expression.or_(*exprlist))
405 if self.
config.dia_object_index !=
'last_object_table':
406 query = query.where(table.c.validityEnd ==
None)
408 _LOG.debug(
"query: %s", query)
416 with self.
_engine.begin()
as conn:
418 objects = pandas.read_sql_query(query, conn)
420 res = conn.execute(query)
422 _LOG.debug(
"found %s DiaObjects", len(objects))
426 """Returns catalog of DiaSource instances from given region. 428 Sources are searched based on pixelization index and region is 429 determined by the set of indices. There is no assumption on a 430 particular type of index, client is responsible for consistency 431 when calculating pixelization indices. 433 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 434 the schema of PPDB table. Re-mapping of the column names is done for 435 some columns (based on column map passed to constructor) but types or 436 units are not changed. 440 pixel_ranges : `list` of `tuple` 441 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 442 This defines set of pixel indices to be included in result. 443 dt : `datetime.datetime` 444 Time of the current visit 445 return_pandas : `bool` 446 Return a `pandas.DataFrame` instead of 447 `lsst.afw.table.SourceCatalog`. 451 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None` 452 Catalog containing DiaSource records. `None` is returned if 453 ``read_sources_months`` configuration parameter is set to 0. 456 if self.
config.read_sources_months == 0:
457 _LOG.info(
"Skip DiaSources fetching")
461 query = table.select()
465 for low, upper
in pixel_ranges:
468 exprlist.append(table.c.pixelId == low)
470 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
471 query = query.where(sql.expression.or_(*exprlist))
475 with _ansi_session(self.
_engine)
as conn:
477 sources = pandas.read_sql_query(query, conn)
479 res = conn.execute(query)
481 _LOG.debug(
"found %s DiaSources", len(sources))
485 """Returns catalog of DiaSource instances given set of DiaObject IDs. 487 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 488 the schema of PPDB table. Re-mapping of the column names is done for 489 some columns (based on column map passed to constructor) but types or 490 units are not changed. 495 Collection of DiaObject IDs 496 dt : `datetime.datetime` 497 Time of the current visit 498 return_pandas : `bool` 499 Return a `pandas.DataFrame` instead of 500 `lsst.afw.table.SourceCatalog`. 505 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None` 506 Catalog contaning DiaSource records. `None` is returned if 507 ``read_sources_months`` configuration parameter is set to 0 or 508 when ``object_ids`` is empty. 511 if self.
config.read_sources_months == 0:
512 _LOG.info(
"Skip DiaSources fetching")
515 if len(object_ids) <= 0:
516 _LOG.info(
"Skip DiaSources fetching - no Objects")
523 with _ansi_session(self.
_engine)
as conn:
524 for ids
in _split(sorted(object_ids), 1000):
525 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 528 ids =
",".join(
str(id)
for id
in ids)
529 query +=
'"diaObjectId" IN (' + ids +
') ' 533 df = pandas.read_sql_query(sql.text(query), conn)
537 sources = sources.append(df)
539 res = conn.execute(sql.text(query))
542 _LOG.debug(
"found %s DiaSources", len(sources))
546 """Returns catalog of DiaForcedSource instances matching given 549 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 550 the schema of L1 database table. Re-mapping of the column names may 551 be done for some columns (based on column map passed to constructor) 552 but types or units are not changed. 557 Collection of DiaObject IDs 558 dt : `datetime.datetime` 559 Time of the current visit 560 return_pandas : `bool` 561 Return a `pandas.DataFrame` instead of 562 `lsst.afw.table.SourceCatalog`. 566 catalog : `lsst.afw.table.SourceCatalog` or `None` 567 Catalog contaning DiaForcedSource records. `None` is returned if 568 ``read_sources_months`` configuration parameter is set to 0 or 569 when ``object_ids`` is empty. 572 if self.
config.read_forced_sources_months == 0:
573 _LOG.info(
"Skip DiaForceSources fetching")
577 _LOG.info(
"Skip DiaForceSources fetching - no Objects")
581 table = self.
_schema.forcedSources
584 with
Timer(
'DiaForcedSource select', self.
config.timer):
585 with _ansi_session(self.
_engine)
as conn:
586 for ids
in _split(sorted(object_ids), 1000):
588 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 591 ids =
",".join(
str(id)
for id
in ids)
592 query +=
'"diaObjectId" IN (' + ids +
') ' 596 df = pandas.read_sql_query(sql.text(query), conn)
600 sources = sources.append(df)
602 res = conn.execute(sql.text(query))
605 _LOG.debug(
"found %s DiaForcedSources", len(sources))
609 """Store catalog of DiaObjects from current visit. 611 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 612 compatible with the schema of PPDB table: 614 - column names must correspond to database table columns 615 - some columns names are re-mapped based on column map passed to 617 - types and units of the columns must match database definitions, 618 no unit conversion is performed presently 619 - columns that have default values in database schema can be 620 omitted from afw schema 621 - this method knows how to fill interval-related columns 622 (validityStart, validityEnd) they do not need to appear in 627 objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 628 Catalog with DiaObject records 629 dt : `datetime.datetime` 633 if isinstance(objs, pandas.DataFrame):
634 ids = sorted(objs[
'diaObjectId'])
636 ids = sorted([obj[
'id']
for obj
in objs])
637 _LOG.debug(
"first object ID: %d", ids[0])
644 with _ansi_session(self.
_engine)
as conn:
646 ids =
",".join(
str(id)
for id
in ids)
648 if self.
config.dia_object_index ==
'last_object_table':
652 table = self.
_schema.objects_last
653 do_replace = self.
config.object_last_replace
657 if not do_replace
or isinstance(objs, pandas.DataFrame):
658 query =
'DELETE FROM "' + table.name +
'" ' 659 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 665 with
Timer(table.name +
' delete', self.
config.timer):
666 res = conn.execute(sql.text(query))
667 _LOG.debug(
"deleted %s objects", res.rowcount)
669 extra_columns = dict(lastNonForcedSource=dt)
670 if isinstance(objs, pandas.DataFrame):
671 with
Timer(
"DiaObjectLast insert", self.
config.timer):
672 for col, data
in extra_columns.items():
674 objs.to_sql(
"DiaObjectLast", conn, if_exists=
'append',
679 extra_columns=extra_columns)
685 query =
'UPDATE "' + table.name +
'" ' 686 query +=
"SET \"validityEnd\" = '" +
str(dt) +
"' " 687 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 688 query +=
'AND "validityEnd" IS NULL' 696 with
Timer(table.name +
' truncate', self.
config.timer):
697 res = conn.execute(sql.text(query))
698 _LOG.debug(
"truncated %s intervals", res.rowcount)
701 if self.
config.dia_object_nightly:
702 table = self.
_schema.objects_nightly
705 extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
707 if isinstance(objs, pandas.DataFrame):
709 for col, data
in extra_columns.items():
711 objs.to_sql(
"DiaObject", conn, if_exists=
'append',
715 extra_columns=extra_columns)
718 """Store catalog of DIASources from current visit. 720 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 721 compatible with the schema of L1 database table: 723 - column names must correspond to database table columns 724 - some columns names may be re-mapped based on column map passed to 726 - types and units of the columns must match database definitions, 727 no unit conversion is performed presently 728 - columns that have default values in database schema can be 729 omitted from afw schema 733 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 734 Catalog containing DiaSource records 738 with _ansi_session(self.
_engine)
as conn:
740 if isinstance(sources, pandas.DataFrame):
742 sources.to_sql(
"DiaSource", conn, if_exists=
'append',
749 """Store a set of DIAForcedSources from current visit. 751 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 752 compatible with the schema of L1 database table: 754 - column names must correspond to database table columns 755 - some columns names may be re-mapped based on column map passed to 757 - types and units of the columns must match database definitions, 758 no unit conversion is performed presently 759 - columns that have default values in database schema can be 760 omitted from afw schema 764 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 765 Catalog containing DiaForcedSource records 769 with _ansi_session(self.
_engine)
as conn:
771 if isinstance(sources, pandas.DataFrame):
772 with
Timer(
"DiaForcedSource insert", self.
config.timer):
773 sources.to_sql(
"DiaForcedSource", conn, if_exists=
'append',
776 table = self.
_schema.forcedSources
780 """Implement daily activities like cleanup/vacuum. 782 What should be done during daily cleanup is determined by 783 configuration/schema. 787 if self.
config.dia_object_nightly:
788 with _ansi_session(self.
_engine)
as conn:
789 query =
'INSERT INTO "' + self.
_schema.objects.name +
'" ' 790 query +=
'SELECT * FROM "' + self.
_schema.objects_nightly.name +
'"' 791 with
Timer(
'DiaObjectNightly copy', self.
config.timer):
792 conn.execute(sql.text(query))
794 query =
'DELETE FROM "' + self.
_schema.objects_nightly.name +
'"' 795 with
Timer(
'DiaObjectNightly delete', self.
config.timer):
796 conn.execute(sql.text(query))
798 if self.
_engine.name ==
'postgresql':
801 _LOG.info(
"Running VACUUM on all tables")
802 connection = self.
_engine.raw_connection()
803 ISOLATION_LEVEL_AUTOCOMMIT = 0
804 connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
805 cursor = connection.cursor()
806 cursor.execute(
"VACUUM ANALYSE")
808 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
809 """Create or re-create all tables. 814 If True then drop tables before creating new ones. 815 mysql_engine : `str`, optional 816 Name of the MySQL engine to use for new tables. 817 oracle_tablespace : `str`, optional 818 Name of Oracle tablespace. 819 oracle_iot : `bool`, optional 820 Make Index-organized DiaObjectLast table. 823 oracle_tablespace=oracle_tablespace,
824 oracle_iot=oracle_iot)
826 def _explain(self, query, conn):
827 """Run the query with explain 830 _LOG.info(
"explain for query: %s...", query[:64])
832 if conn.engine.name ==
'mysql':
833 query =
"EXPLAIN EXTENDED " + query
835 query =
"EXPLAIN " + query
837 res = conn.execute(sql.text(query))
839 _LOG.info(
"explain: %s", res.keys())
841 _LOG.info(
"explain: %s", row)
843 _LOG.info(
"EXPLAIN returned nothing")
845 def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
846 replace=False, extra_columns=None):
847 """Generic store method. 849 Takes catalog of records and stores a bunch of objects in a table. 853 objects : `lsst.afw.table.BaseCatalog` 854 Catalog containing object records 857 table : `sqlalchemy.Table` 859 schema_table_name : `str` 860 Name of the table to be used for finding table schema. 862 If `True` then use replace instead of INSERT (should be more efficient) 863 extra_columns : `dict`, optional 864 Mapping (column_name, column_value) which gives column values to add 865 to every row, only if column is missing in catalog records. 869 """Quote and escape values""" 872 elif isinstance(v, datetime):
873 v =
"'" +
str(v) +
"'" 874 elif isinstance(v, str):
890 def quoteId(columnName):
891 """Smart quoting for column names. 892 Lower-case names are not quoted. 894 if not columnName.islower():
895 columnName =
'"' + columnName +
'"' 898 if conn.engine.name ==
"oracle":
900 schema_table_name, replace,
903 schema = objects.getSchema()
905 extra_fields =
list((extra_columns
or {}).
keys())
907 afw_fields = [field.getName()
for key, field
in schema
908 if field.getName()
not in extra_fields]
910 column_map = self.
_schema.getAfwColumns(schema_table_name)
912 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
914 if replace
and conn.engine.name
in (
'mysql',
'sqlite'):
915 query =
'REPLACE INTO ' 917 query =
'INSERT INTO ' 918 qfields = [quoteId(field)
for field
in fields + extra_fields]
919 query += quoteId(table.name) +
' (' +
','.join(qfields) +
') ' +
'VALUES ' 924 for field
in afw_fields:
925 if field
not in column_map:
928 if column_map[field].type ==
"DATETIME" and \
931 value = datetime.utcfromtimestamp(value)
932 row.append(quoteValue(value))
933 for field
in extra_fields:
934 row.append(quoteValue(extra_columns[field]))
935 values.append(
'(' +
','.join(row) +
')')
939 self.
_explain(query + values[0], conn)
941 query +=
','.join(values)
943 if replace
and conn.engine.name ==
'postgresql':
945 pks = (
'pixelId',
'diaObjectId')
946 query +=
" ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".
format(*pks)
947 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
948 fields = [
'"{0}" = EXCLUDED."{0}"'.
format(field)
949 for field
in fields
if field
not in pks]
950 query +=
', '.join(fields)
953 _LOG.info(
"%s: will store %d records", table.name, len(objects))
954 with
Timer(table.name +
' insert', self.
config.timer):
955 res = conn.execute(sql.text(query))
956 _LOG.debug(
"inserted %s intervals", res.rowcount)
958 def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
959 replace=False, extra_columns=None):
960 """Store method for Oracle. 962 Takes catalog of records and stores a bunch of objects in a table. 966 objects : `lsst.afw.table.BaseCatalog` 967 Catalog containing object records 970 table : `sqlalchemy.Table` 972 schema_table_name : `str` 973 Name of the table to be used for finding table schema. 975 If `True` then use replace instead of INSERT (should be more efficient) 976 extra_columns : `dict`, optional 977 Mapping (column_name, column_value) which gives column values to add 978 to every row, only if column is missing in catalog records. 981 def quoteId(columnName):
982 """Smart quoting for column names. 983 Lower-case naems are not quoted (Oracle backend needs them unquoted). 985 if not columnName.islower():
986 columnName =
'"' + columnName +
'"' 989 schema = objects.getSchema()
992 extra_fields =
list((extra_columns
or {}).
keys())
994 afw_fields = [field.getName()
for key, field
in schema
995 if field.getName()
not in extra_fields]
998 column_map = self.
_schema.getAfwColumns(schema_table_name)
1002 fields = [column_map[field].name
for field
in afw_fields
1003 if field
in column_map]
1006 qfields = [quoteId(field)
for field
in fields + extra_fields]
1009 vals = [
":col{}".
format(i)
for i
in range(len(fields))]
1010 vals += [
":extcol{}".
format(i)
for i
in range(len(extra_fields))]
1011 query =
'INSERT INTO ' + quoteId(table.name)
1012 query +=
' (' +
','.join(qfields) +
') VALUES' 1013 query +=
' (' +
','.join(vals) +
')' 1015 qvals = [
":col{} {}".
format(i, quoteId(field))
for i, field
in enumerate(fields)]
1016 qvals += [
":extcol{} {}".
format(i, quoteId(field))
for i, field
in enumerate(extra_fields)]
1017 pks = (
'pixelId',
'diaObjectId')
1018 onexpr = [
"SRC.{col} = DST.{col}".
format(col=quoteId(col))
for col
in pks]
1019 setexpr = [
"DST.{col} = SRC.{col}".
format(col=quoteId(col))
1020 for col
in fields + extra_fields
if col
not in pks]
1021 vals = [
"SRC.{col}".
format(col=quoteId(col))
for col
in fields + extra_fields]
1022 query =
"MERGE INTO {} DST ".
format(quoteId(table.name))
1023 query +=
"USING (SELECT {} FROM DUAL) SRC ".
format(
", ".join(qvals))
1024 query +=
"ON ({}) ".
format(
" AND ".join(onexpr))
1025 query +=
"WHEN MATCHED THEN UPDATE SET {} ".
format(
" ,".join(setexpr))
1026 query +=
"WHEN NOT MATCHED THEN INSERT " 1027 query +=
"({}) VALUES ({})".
format(
','.join(qfields),
','.join(vals))
1034 for field
in afw_fields:
1035 if field
not in column_map:
1038 if column_map[field].type ==
"DATETIME" and not np.isnan(value):
1040 value = datetime.utcfromtimestamp(value)
1042 value =
str(value.asDegrees())
1043 elif not np.isfinite(value):
1045 row[
"col{}".
format(col)] = value
1047 for i, field
in enumerate(extra_fields):
1048 row[
"extcol{}".
format(i)] = extra_columns[field]
1052 _LOG.info(
"%s: will store %d records", table.name, len(objects))
1053 with
Timer(table.name +
' insert', self.
config.timer):
1054 res = conn.execute(sql.text(query), values)
1055 _LOG.debug(
"inserted %s intervals", res.rowcount)
1057 def _convertResult(self, res, table_name, catalog=None):
1058 """Convert result set into output catalog. 1062 res : `sqlalchemy.ResultProxy` 1063 SQLAlchemy result set returned by query. 1066 catalog : `lsst.afw.table.BaseCatalog` 1067 If not None then extend existing catalog 1071 catalog : `lsst.afw.table.SourceCatalog` 1072 If ``catalog`` is None then new instance is returned, otherwise 1073 ``catalog`` is updated and returned. 1076 columns = res.keys()
1077 schema, col_map = self.
_schema.getAfwSchema(table_name, columns)
1079 _LOG.debug(
"_convertResult: schema: %s", schema)
1080 _LOG.debug(
"_convertResult: col_map: %s", col_map)
1085 record = catalog.addNew()
1086 for col, value
in row.items():
1088 col = col_map.get(col)
1090 if isinstance(value, datetime):
1092 value =
int((value - datetime.utcfromtimestamp(0)).total_seconds())
1093 elif col.getTypeString() ==
'Angle' and value
is not None:
1094 value = value * geom.degrees
1095 if value
is not None:
1096 record.set(col, value)
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
def getDiaSources(self, object_ids, dt, return_pandas=False)
def __exit__(self, exc_type, exc_val, exc_tb)
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def saveVisit(self, visitId, visitTime)
def _convertResult(self, res, table_name, catalog=None)
def __init__(self, config, afw_schemas=None)
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
A class representing an angle.
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
def _explain(self, query, conn)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def getDiaObjects(self, pixel_ranges, return_pandas=False)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def storeDiaObjects(self, objs, dt)
def storeDiaForcedSources(self, sources)
def storeDiaSources(self, sources)
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
daf::base::PropertyList * list
_log_before_cursor_execute