22 """Module defining Ppdb class and related methods. 25 __all__ = [
"PpdbConfig",
"Ppdb",
"Visit"]
27 from collections
import namedtuple
28 from contextlib
import contextmanager
29 from datetime
import datetime
37 import lsst.pex.config
as pexConfig
38 from lsst.pex.config
import Field, ChoiceField, ListField
41 from sqlalchemy
import (func, sql)
42 from sqlalchemy.pool
import NullPool
43 from .
import timer, ppdbSchema
46 _LOG = logging.getLogger(__name__.partition(
".")[2])
50 """Timer class defining context manager which tracks execution timing. 54 with Timer("timer_name"): 57 On exit from block it will print elapsed time. 59 See also :py:mod:`timer` module. 61 def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
69 Enter context, start timer 78 Exit context, stop and dump timer 88 def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
91 _LOG.info(
"before_cursor_execute")
94 def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
101 def _split(seq, nItems):
102 """Split a sequence into smaller sequences""" 110 Visit = namedtuple(
'Visit',
'visitId visitTime lastObjectId lastSourceId')
114 def _ansi_session(engine):
115 """Returns a connection, makes sure that ANSI mode is set for MySQL 117 with engine.begin()
as conn:
118 if engine.name ==
'mysql':
119 conn.execute(sql.text(
"SET SESSION SQL_MODE = 'ANSI'"))
124 def _data_file_name(basename):
125 """Return path name of a data file. 127 return os.path.join(
getPackageDir(
"dax_ppdb"),
"data", basename)
132 db_url = Field(dtype=str, doc=
"SQLAlchemy database connection URI")
133 isolation_level = ChoiceField(dtype=str,
134 doc=
"Transaction isolation level",
135 allowed={
"READ_COMMITTED":
"Read committed",
136 "READ_UNCOMMITTED":
"Read uncommitted",
137 "REPEATABLE_READ":
"Repeatable read",
138 "SERIALIZABLE":
"Serializable"},
139 default=
"READ_COMMITTED",
141 connection_pool = Field(dtype=bool,
142 doc=(
"If False then disable SQLAlchemy connection pool. " 143 "Do not use connection pool when forking."),
145 connection_timeout = Field(dtype=float,
146 doc=
"Maximum time to wait time for database lock to be released before " 147 "exiting. Defaults to sqlachemy defaults if not set.",
150 sql_echo = Field(dtype=bool,
151 doc=
"If True then pass SQLAlchemy echo option.",
153 dia_object_index = ChoiceField(dtype=str,
154 doc=
"Indexing mode for DiaObject table",
155 allowed={
'baseline':
"Index defined in baseline schema",
156 'pix_id_iov':
"(pixelId, objectId, iovStart) PK",
157 'last_object_table':
"Separate DiaObjectLast table"},
159 dia_object_nightly = Field(dtype=bool,
160 doc=
"Use separate nightly table for DiaObject",
162 read_sources_months = Field(dtype=int,
163 doc=
"Number of months of history to read from DiaSource",
165 read_forced_sources_months = Field(dtype=int,
166 doc=
"Number of months of history to read from DiaForcedSource",
168 dia_object_columns = ListField(dtype=str,
169 doc=
"List of columns to read from DiaObject, by default read all columns",
171 object_last_replace = Field(dtype=bool,
172 doc=
"If True (default) then use \"upsert\" for DiaObjectsLast table",
174 schema_file = Field(dtype=str,
175 doc=
"Location of (YAML) configuration file with standard schema",
176 default=_data_file_name(
"ppdb-schema.yaml"))
177 extra_schema_file = Field(dtype=str,
178 doc=
"Location of (YAML) configuration file with extra schema",
179 default=_data_file_name(
"ppdb-schema-extra.yaml"))
180 column_map = Field(dtype=str,
181 doc=
"Location of (YAML) configuration file with column mapping",
182 default=_data_file_name(
"ppdb-afw-map.yaml"))
183 prefix = Field(dtype=str,
184 doc=
"Prefix to add to table names and index names",
186 explain = Field(dtype=bool,
187 doc=
"If True then run EXPLAIN SQL command on each executed query",
189 timer = Field(dtype=bool,
190 doc=
"If True then print/log timing information",
192 diaobject_index_hint = Field(dtype=str,
193 doc=
"Name of the index to use with Oracle index hint",
196 dynamic_sampling_hint = Field(dtype=int,
197 doc=
"If non-zero then use dynamic_sampling hint",
199 cardinality_hint = Field(dtype=int,
200 doc=
"If non-zero then use cardinality hint",
206 raise ValueError(
"Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' " 207 "Use 'READ_UNCOMMITTED' instead.")
211 """Interface to L1 database, hides all database access details. 213 The implementation is configured via standard ``pex_config`` mechanism 214 using `PpdbConfig` configuration class. For an example of different 215 configurations check config/ folder. 219 config : `PpdbConfig` 220 afw_schemas : `dict`, optional 221 Dictionary with table name for a key and `afw.table.Schema` 222 for a value. Columns in schema will be added to standard 231 _LOG.debug(
"PPDB Configuration:")
232 _LOG.debug(
" dia_object_index: %s", self.
config.dia_object_index)
233 _LOG.debug(
" dia_object_nightly: %s", self.
config.dia_object_nightly)
234 _LOG.debug(
" read_sources_months: %s", self.
config.read_sources_months)
235 _LOG.debug(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
236 _LOG.debug(
" dia_object_columns: %s", self.
config.dia_object_columns)
237 _LOG.debug(
" object_last_replace: %s", self.
config.object_last_replace)
238 _LOG.debug(
" schema_file: %s", self.
config.schema_file)
239 _LOG.debug(
" extra_schema_file: %s", self.
config.extra_schema_file)
240 _LOG.debug(
" column_map: %s", self.
config.column_map)
241 _LOG.debug(
" schema prefix: %s", self.
config.prefix)
245 kw = dict(echo=self.
config.sql_echo)
247 if not self.
config.connection_pool:
248 kw.update(poolclass=NullPool)
249 if self.
config.isolation_level
is not None:
250 kw.update(isolation_level=self.
config.isolation_level)
251 if self.
config.connection_timeout
is not None:
252 if self.
config.db_url.startswith(
"sqlite"):
253 conn_args.update(timeout=self.
config.connection_timeout)
254 elif self.
config.db_url.startswith((
"postgresql",
"mysql")):
255 conn_args.update(connect_timeout=self.
config.connection_timeout)
256 kw.update(connect_args=conn_args)
257 self.
_engine = sqlalchemy.create_engine(self.
config.db_url, **kw)
260 dia_object_index=self.
config.dia_object_index,
261 dia_object_nightly=self.
config.dia_object_nightly,
262 schema_file=self.
config.schema_file,
263 extra_schema_file=self.
config.extra_schema_file,
264 column_map=self.
config.column_map,
265 afw_schemas=afw_schemas,
266 prefix=self.
config.prefix)
269 """Returns last visit information or `None` if visits table is empty. 271 Visits table is used by ap_proto to track visit information, it is 272 not a part of the regular PPDB schema. 276 visit : `Visit` or `None` 277 Last stored visit info or `None` if there was nothing stored yet. 280 with self.
_engine.begin()
as conn:
282 stmnt = sql.select([sql.func.max(self.
_schema.visits.c.visitId),
283 sql.func.max(self.
_schema.visits.c.visitTime)])
284 res = conn.execute(stmnt)
291 _LOG.info(
"lastVisit: visitId: %s visitTime: %s (%s)", visitId,
292 visitTime,
type(visitTime))
295 stmnt = sql.select([sql.func.max(self.
_schema.objects.c.diaObjectId)])
296 lastObjectId = conn.scalar(stmnt)
297 stmnt = sql.select([sql.func.max(self.
_schema.sources.c.diaSourceId)])
298 lastSourceId = conn.scalar(stmnt)
300 return Visit(visitId=visitId, visitTime=visitTime,
301 lastObjectId=lastObjectId, lastSourceId=lastSourceId)
304 """Store visit information. 306 This method is only used by ``ap_proto`` script from ``l1dbproto`` 307 and is not intended for production pipelines. 313 visitTime : `datetime.datetime` 317 ins = self.
_schema.visits.insert().values(visitId=visitId,
322 """Returns dictionary with the table names and row counts. 324 Used by ``ap_proto`` to keep track of the size of the database tables. 325 Depending on database technology this could be expensive operation. 330 Dict where key is a table name and value is a row count. 334 if self.
config.dia_object_index ==
'last_object_table':
335 tables.append(self.
_schema.objects_last)
337 stmt = sql.select([func.count()]).select_from(table)
338 count = self.
_engine.scalar(stmt)
339 res[table.name] = count
344 """Returns catalog of DiaObject instances from given region. 346 Objects are searched based on pixelization index and region is 347 determined by the set of indices. There is no assumption on a 348 particular type of index, client is responsible for consistency 349 when calculating pixelization indices. 351 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 352 the schema of PPDB table. Re-mapping of the column names is done for 353 some columns (based on column map passed to constructor) but types 354 or units are not changed. 356 Returns only the last version of each DiaObject. 360 pixel_ranges : `list` of `tuple` 361 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 362 This defines set of pixel indices to be included in result. 363 return_pandas : `bool` 364 Return a `pandas.DataFrame` instead of 365 `lsst.afw.table.SourceCatalog`. 369 catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame` 370 Catalog containing DiaObject records. 374 if self.
config.dia_object_index ==
'last_object_table':
375 table = self.
_schema.objects_last
378 if not self.
config.dia_object_columns:
379 query = table.select()
381 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
382 query = sql.select(columns)
384 if self.
config.diaobject_index_hint:
385 val = self.
config.diaobject_index_hint
386 query = query.with_hint(table,
'index_rs_asc(%(name)s "{}")'.
format(val))
387 if self.
config.dynamic_sampling_hint > 0:
388 val = self.
config.dynamic_sampling_hint
389 query = query.with_hint(table,
'dynamic_sampling(%(name)s {})'.
format(val))
390 if self.
config.cardinality_hint > 0:
391 val = self.
config.cardinality_hint
392 query = query.with_hint(table,
'FIRST_ROWS_1 cardinality(%(name)s {})'.
format(val))
396 for low, upper
in pixel_ranges:
399 exprlist.append(table.c.pixelId == low)
401 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
402 query = query.where(sql.expression.or_(*exprlist))
405 if self.
config.dia_object_index !=
'last_object_table':
406 query = query.where(table.c.validityEnd ==
None)
408 _LOG.debug(
"query: %s", query)
416 with self.
_engine.begin()
as conn:
418 objects = pandas.read_sql_query(query, conn)
420 res = conn.execute(query)
422 _LOG.debug(
"found %s DiaObjects", len(objects))
426 """Returns catalog of DiaSource instances from given region. 428 Sources are searched based on pixelization index and region is 429 determined by the set of indices. There is no assumption on a 430 particular type of index, client is responsible for consistency 431 when calculating pixelization indices. 433 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 434 the schema of PPDB table. Re-mapping of the column names is done for 435 some columns (based on column map passed to constructor) but types or 436 units are not changed. 440 pixel_ranges : `list` of `tuple` 441 Sequence of ranges, range is a tuple (minPixelID, maxPixelID). 442 This defines set of pixel indices to be included in result. 443 dt : `datetime.datetime` 444 Time of the current visit 445 return_pandas : `bool` 446 Return a `pandas.DataFrame` instead of 447 `lsst.afw.table.SourceCatalog`. 451 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None` 452 Catalog containing DiaSource records. `None` is returned if 453 ``read_sources_months`` configuration parameter is set to 0. 456 if self.
config.read_sources_months == 0:
457 _LOG.info(
"Skip DiaSources fetching")
461 query = table.select()
465 for low, upper
in pixel_ranges:
468 exprlist.append(table.c.pixelId == low)
470 exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
471 query = query.where(sql.expression.or_(*exprlist))
475 with _ansi_session(self.
_engine)
as conn:
477 sources = pandas.read_sql_query(query, conn)
479 res = conn.execute(query)
481 _LOG.debug(
"found %s DiaSources", len(sources))
485 """Returns catalog of DiaSource instances given set of DiaObject IDs. 487 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 488 the schema of PPDB table. Re-mapping of the column names is done for 489 some columns (based on column map passed to constructor) but types or 490 units are not changed. 495 Collection of DiaObject IDs 496 dt : `datetime.datetime` 497 Time of the current visit 498 return_pandas : `bool` 499 Return a `pandas.DataFrame` instead of 500 `lsst.afw.table.SourceCatalog`. 505 catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None` 506 Catalog contaning DiaSource records. `None` is returned if 507 ``read_sources_months`` configuration parameter is set to 0 or 508 when ``object_ids`` is empty. 511 if self.
config.read_sources_months == 0:
512 _LOG.info(
"Skip DiaSources fetching")
515 if len(object_ids) <= 0:
516 _LOG.info(
"Skip DiaSources fetching - no Objects")
523 with _ansi_session(self.
_engine)
as conn:
524 for ids
in _split(sorted(object_ids), 1000):
525 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 528 ids =
",".join(str(id)
for id
in ids)
529 query +=
'"diaObjectId" IN (' + ids +
') ' 533 df = pandas.read_sql_query(sql.text(query), conn)
537 sources = sources.append(df)
539 res = conn.execute(sql.text(query))
542 _LOG.debug(
"found %s DiaSources", len(sources))
546 """Returns catalog of DiaForcedSource instances matching given 549 This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by 550 the schema of L1 database table. Re-mapping of the column names may 551 be done for some columns (based on column map passed to constructor) 552 but types or units are not changed. 557 Collection of DiaObject IDs 558 dt : `datetime.datetime` 559 Time of the current visit 560 return_pandas : `bool` 561 Return a `pandas.DataFrame` instead of 562 `lsst.afw.table.SourceCatalog`. 566 catalog : `lsst.afw.table.SourceCatalog` or `None` 567 Catalog contaning DiaForcedSource records. `None` is returned if 568 ``read_sources_months`` configuration parameter is set to 0 or 569 when ``object_ids`` is empty. 572 if self.
config.read_forced_sources_months == 0:
573 _LOG.info(
"Skip DiaForceSources fetching")
577 _LOG.info(
"Skip DiaForceSources fetching - no Objects")
581 table = self.
_schema.forcedSources
584 with
Timer(
'DiaForcedSource select', self.
config.timer):
585 with _ansi_session(self.
_engine)
as conn:
586 for ids
in _split(sorted(object_ids), 1000):
588 query =
'SELECT * FROM "' + table.name +
'" WHERE ' 591 ids =
",".join(str(id)
for id
in ids)
592 query +=
'"diaObjectId" IN (' + ids +
') ' 596 df = pandas.read_sql_query(sql.text(query), conn)
600 sources = sources.append(df)
602 res = conn.execute(sql.text(query))
605 _LOG.debug(
"found %s DiaForcedSources", len(sources))
609 """Store catalog of DiaObjects from current visit. 611 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 612 compatible with the schema of PPDB table: 614 - column names must correspond to database table columns 615 - some columns names are re-mapped based on column map passed to 617 - types and units of the columns must match database definitions, 618 no unit conversion is performed presently 619 - columns that have default values in database schema can be 620 omitted from afw schema 621 - this method knows how to fill interval-related columns 622 (validityStart, validityEnd) they do not need to appear in 627 objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 628 Catalog with DiaObject records 629 dt : `datetime.datetime` 633 if isinstance(objs, pandas.DataFrame):
634 ids = sorted(objs[
'diaObjectId'])
636 ids = sorted([obj[
'id']
for obj
in objs])
637 _LOG.debug(
"first object ID: %d", ids[0])
644 with _ansi_session(self.
_engine)
as conn:
646 ids =
",".join(str(id)
for id
in ids)
648 if self.
config.dia_object_index ==
'last_object_table':
652 table = self.
_schema.objects_last
653 do_replace = self.
config.object_last_replace
657 if not do_replace
or isinstance(objs, pandas.DataFrame):
658 query =
'DELETE FROM "' + table.name +
'" ' 659 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 665 with
Timer(table.name +
' delete', self.
config.timer):
666 res = conn.execute(sql.text(query))
667 _LOG.debug(
"deleted %s objects", res.rowcount)
669 extra_columns = dict(lastNonForcedSource=dt)
670 if isinstance(objs, pandas.DataFrame):
671 with
Timer(
"DiaObjectLast insert", self.
config.timer):
672 for col, data
in extra_columns.items():
674 objs.to_sql(
"DiaObjectLast", conn, if_exists=
'append',
679 extra_columns=extra_columns)
685 query =
'UPDATE "' + table.name +
'" ' 686 query +=
"SET \"validityEnd\" = '" + str(dt) +
"' " 687 query +=
'WHERE "diaObjectId" IN (' + ids +
') ' 688 query +=
'AND "validityEnd" IS NULL' 696 with
Timer(table.name +
' truncate', self.
config.timer):
697 res = conn.execute(sql.text(query))
698 _LOG.debug(
"truncated %s intervals", res.rowcount)
701 if self.
config.dia_object_nightly:
702 table = self.
_schema.objects_nightly
705 extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
707 if isinstance(objs, pandas.DataFrame):
709 for col, data
in extra_columns.items():
711 objs.to_sql(
"DiaObject", conn, if_exists=
'append',
715 extra_columns=extra_columns)
718 """Store catalog of DIASources from current visit. 720 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 721 compatible with the schema of L1 database table: 723 - column names must correspond to database table columns 724 - some columns names may be re-mapped based on column map passed to 726 - types and units of the columns must match database definitions, 727 no unit conversion is performed presently 728 - columns that have default values in database schema can be 729 omitted from afw schema 733 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 734 Catalog containing DiaSource records 738 with _ansi_session(self.
_engine)
as conn:
740 if isinstance(sources, pandas.DataFrame):
742 sources.to_sql(
"DiaSource", conn, if_exists=
'append',
749 """Store a set of DIAForcedSources from current visit. 751 This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be 752 compatible with the schema of L1 database table: 754 - column names must correspond to database table columns 755 - some columns names may be re-mapped based on column map passed to 757 - types and units of the columns must match database definitions, 758 no unit conversion is performed presently 759 - columns that have default values in database schema can be 760 omitted from afw schema 764 sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame` 765 Catalog containing DiaForcedSource records 769 with _ansi_session(self.
_engine)
as conn:
771 if isinstance(sources, pandas.DataFrame):
772 with
Timer(
"DiaForcedSource insert", self.
config.timer):
773 sources.to_sql(
"DiaForcedSource", conn, if_exists=
'append',
776 table = self.
_schema.forcedSources
780 """Return the number of DiaObjects that have only one DiaSource associated 783 Used as part of ap_verify metrics. 788 Number of DiaObjects with exactly one associated DiaSource. 794 stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
795 stmt = stmt.where(table.c.validityEnd ==
None)
798 count = self.
_engine.scalar(stmt)
803 """Test whether data from an image has been loaded into the database. 805 Used as part of ap_verify metrics. 809 visitInfo : `lsst.afw.image.VisitInfo` 810 The metadata for the image of interest. 815 `True` if the data are present, `False` otherwise. 817 id = visitInfo.getExposureId()
819 idField = table.c.ccdVisitId
822 query = sql.select([idField]).select_from(table) \
823 .where(idField == id).limit(1)
825 return self.
_engine.scalar(query)
is not None 828 """Implement daily activities like cleanup/vacuum. 830 What should be done during daily cleanup is determined by 831 configuration/schema. 835 if self.
config.dia_object_nightly:
836 with _ansi_session(self.
_engine)
as conn:
837 query =
'INSERT INTO "' + self.
_schema.objects.name +
'" ' 838 query +=
'SELECT * FROM "' + self.
_schema.objects_nightly.name +
'"' 839 with
Timer(
'DiaObjectNightly copy', self.
config.timer):
840 conn.execute(sql.text(query))
842 query =
'DELETE FROM "' + self.
_schema.objects_nightly.name +
'"' 843 with
Timer(
'DiaObjectNightly delete', self.
config.timer):
844 conn.execute(sql.text(query))
846 if self.
_engine.name ==
'postgresql':
849 _LOG.info(
"Running VACUUM on all tables")
850 connection = self.
_engine.raw_connection()
851 ISOLATION_LEVEL_AUTOCOMMIT = 0
852 connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
853 cursor = connection.cursor()
854 cursor.execute(
"VACUUM ANALYSE")
856 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
857 """Create or re-create all tables. 862 If True then drop tables before creating new ones. 863 mysql_engine : `str`, optional 864 Name of the MySQL engine to use for new tables. 865 oracle_tablespace : `str`, optional 866 Name of Oracle tablespace. 867 oracle_iot : `bool`, optional 868 Make Index-organized DiaObjectLast table. 871 oracle_tablespace=oracle_tablespace,
872 oracle_iot=oracle_iot)
874 def _explain(self, query, conn):
875 """Run the query with explain 878 _LOG.info(
"explain for query: %s...", query[:64])
880 if conn.engine.name ==
'mysql':
881 query =
"EXPLAIN EXTENDED " + query
883 query =
"EXPLAIN " + query
885 res = conn.execute(sql.text(query))
887 _LOG.info(
"explain: %s", res.keys())
889 _LOG.info(
"explain: %s", row)
891 _LOG.info(
"EXPLAIN returned nothing")
893 def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
894 replace=False, extra_columns=None):
895 """Generic store method. 897 Takes catalog of records and stores a bunch of objects in a table. 901 objects : `lsst.afw.table.BaseCatalog` 902 Catalog containing object records 905 table : `sqlalchemy.Table` 907 schema_table_name : `str` 908 Name of the table to be used for finding table schema. 910 If `True` then use replace instead of INSERT (should be more efficient) 911 extra_columns : `dict`, optional 912 Mapping (column_name, column_value) which gives column values to add 913 to every row, only if column is missing in catalog records. 917 """Quote and escape values""" 920 elif isinstance(v, datetime):
921 v =
"'" + str(v) +
"'" 922 elif isinstance(v, str):
938 def quoteId(columnName):
939 """Smart quoting for column names. 940 Lower-case names are not quoted. 942 if not columnName.islower():
943 columnName =
'"' + columnName +
'"' 946 if conn.engine.name ==
"oracle":
948 schema_table_name, replace,
951 schema = objects.getSchema()
953 extra_fields =
list((extra_columns
or {}).
keys())
955 afw_fields = [field.getName()
for key, field
in schema
956 if field.getName()
not in extra_fields]
958 column_map = self.
_schema.getAfwColumns(schema_table_name)
960 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
962 if replace
and conn.engine.name
in (
'mysql',
'sqlite'):
963 query =
'REPLACE INTO ' 965 query =
'INSERT INTO ' 966 qfields = [quoteId(field)
for field
in fields + extra_fields]
967 query += quoteId(table.name) +
' (' +
','.join(qfields) +
') ' +
'VALUES ' 972 for field
in afw_fields:
973 if field
not in column_map:
976 if column_map[field].type ==
"DATETIME" and \
979 value = datetime.utcfromtimestamp(value)
980 row.append(quoteValue(value))
981 for field
in extra_fields:
982 row.append(quoteValue(extra_columns[field]))
983 values.append(
'(' +
','.join(row) +
')')
987 self.
_explain(query + values[0], conn)
989 query +=
','.join(values)
991 if replace
and conn.engine.name ==
'postgresql':
993 pks = (
'pixelId',
'diaObjectId')
994 query +=
" ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".
format(*pks)
995 fields = [column_map[field].name
for field
in afw_fields
if field
in column_map]
996 fields = [
'"{0}" = EXCLUDED."{0}"'.
format(field)
997 for field
in fields
if field
not in pks]
998 query +=
', '.join(fields)
1001 _LOG.info(
"%s: will store %d records", table.name, len(objects))
1002 with
Timer(table.name +
' insert', self.
config.timer):
1003 res = conn.execute(sql.text(query))
1004 _LOG.debug(
"inserted %s intervals", res.rowcount)
1006 def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
1007 replace=False, extra_columns=None):
1008 """Store method for Oracle. 1010 Takes catalog of records and stores a bunch of objects in a table. 1014 objects : `lsst.afw.table.BaseCatalog` 1015 Catalog containing object records 1018 table : `sqlalchemy.Table` 1020 schema_table_name : `str` 1021 Name of the table to be used for finding table schema. 1023 If `True` then use replace instead of INSERT (should be more efficient) 1024 extra_columns : `dict`, optional 1025 Mapping (column_name, column_value) which gives column values to add 1026 to every row, only if column is missing in catalog records. 1029 def quoteId(columnName):
1030 """Smart quoting for column names. 1031 Lower-case naems are not quoted (Oracle backend needs them unquoted). 1033 if not columnName.islower():
1034 columnName =
'"' + columnName +
'"' 1037 schema = objects.getSchema()
1040 extra_fields =
list((extra_columns
or {}).
keys())
1042 afw_fields = [field.getName()
for key, field
in schema
1043 if field.getName()
not in extra_fields]
1046 column_map = self.
_schema.getAfwColumns(schema_table_name)
1050 fields = [column_map[field].name
for field
in afw_fields
1051 if field
in column_map]
1054 qfields = [quoteId(field)
for field
in fields + extra_fields]
1057 vals = [
":col{}".
format(i)
for i
in range(len(fields))]
1058 vals += [
":extcol{}".
format(i)
for i
in range(len(extra_fields))]
1059 query =
'INSERT INTO ' + quoteId(table.name)
1060 query +=
' (' +
','.join(qfields) +
') VALUES' 1061 query +=
' (' +
','.join(vals) +
')' 1063 qvals = [
":col{} {}".
format(i, quoteId(field))
for i, field
in enumerate(fields)]
1064 qvals += [
":extcol{} {}".
format(i, quoteId(field))
for i, field
in enumerate(extra_fields)]
1065 pks = (
'pixelId',
'diaObjectId')
1066 onexpr = [
"SRC.{col} = DST.{col}".
format(col=quoteId(col))
for col
in pks]
1067 setexpr = [
"DST.{col} = SRC.{col}".
format(col=quoteId(col))
1068 for col
in fields + extra_fields
if col
not in pks]
1069 vals = [
"SRC.{col}".
format(col=quoteId(col))
for col
in fields + extra_fields]
1070 query =
"MERGE INTO {} DST ".
format(quoteId(table.name))
1071 query +=
"USING (SELECT {} FROM DUAL) SRC ".
format(
", ".join(qvals))
1072 query +=
"ON ({}) ".
format(
" AND ".join(onexpr))
1073 query +=
"WHEN MATCHED THEN UPDATE SET {} ".
format(
" ,".join(setexpr))
1074 query +=
"WHEN NOT MATCHED THEN INSERT " 1075 query +=
"({}) VALUES ({})".
format(
','.join(qfields),
','.join(vals))
1082 for field
in afw_fields:
1083 if field
not in column_map:
1086 if column_map[field].type ==
"DATETIME" and not np.isnan(value):
1088 value = datetime.utcfromtimestamp(value)
1090 value = str(value.asDegrees())
1091 elif not np.isfinite(value):
1093 row[
"col{}".
format(col)] = value
1095 for i, field
in enumerate(extra_fields):
1096 row[
"extcol{}".
format(i)] = extra_columns[field]
1100 _LOG.info(
"%s: will store %d records", table.name, len(objects))
1101 with
Timer(table.name +
' insert', self.
config.timer):
1102 res = conn.execute(sql.text(query), values)
1103 _LOG.debug(
"inserted %s intervals", res.rowcount)
1105 def _convertResult(self, res, table_name, catalog=None):
1106 """Convert result set into output catalog. 1110 res : `sqlalchemy.ResultProxy` 1111 SQLAlchemy result set returned by query. 1114 catalog : `lsst.afw.table.BaseCatalog` 1115 If not None then extend existing catalog 1119 catalog : `lsst.afw.table.SourceCatalog` 1120 If ``catalog`` is None then new instance is returned, otherwise 1121 ``catalog`` is updated and returned. 1124 columns = res.keys()
1125 schema, col_map = self.
_schema.getAfwSchema(table_name, columns)
1127 _LOG.debug(
"_convertResult: schema: %s", schema)
1128 _LOG.debug(
"_convertResult: col_map: %s", col_map)
1133 record = catalog.addNew()
1134 for col, value
in row.items():
1136 col = col_map.get(col)
1138 if isinstance(value, datetime):
1140 value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1141 elif col.getTypeString() ==
'Angle' and value
is not None:
1142 value = value * geom.degrees
1143 if value
is not None:
1144 record.set(col, value)
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
def getDiaSources(self, object_ids, dt, return_pandas=False)
def __exit__(self, exc_type, exc_val, exc_tb)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def saveVisit(self, visitId, visitTime)
def _convertResult(self, res, table_name, catalog=None)
def __init__(self, config, afw_schemas=None)
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
def countUnassociatedObjects(self)
A class representing an angle.
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
def _explain(self, query, conn)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def getDiaObjects(self, pixel_ranges, return_pandas=False)
def isVisitProcessed(self, visitInfo)
def storeDiaObjects(self, objs, dt)
def storeDiaForcedSources(self, sources)
def storeDiaSources(self, sources)
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
daf::base::PropertyList * list
_log_before_cursor_execute