LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
apdb.py
Go to the documentation of this file.
1 # This file is part of dax_apdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining Apdb class and related methods.
23 """
24 
25 __all__ = ["ApdbConfig", "Apdb", "Visit"]
26 
27 from collections import namedtuple
28 from contextlib import contextmanager
29 from datetime import datetime
30 import logging
31 import numpy as np
32 import os
33 import pandas
34 
35 import lsst.geom as geom
36 import lsst.afw.table as afwTable
37 import lsst.pex.config as pexConfig
38 from lsst.pex.config import Field, ChoiceField, ListField
39 import sqlalchemy
40 from sqlalchemy import (func, sql)
41 from sqlalchemy.pool import NullPool
42 from . import timer, apdbSchema
43 
44 
45 _LOG = logging.getLogger(__name__.partition(".")[2]) # strip leading "lsst."
46 
47 
48 class Timer(object):
49  """Timer class defining context manager which tracks execution timing.
50 
51  Typical use:
52 
53  with Timer("timer_name"):
54  do_something
55 
56  On exit from block it will print elapsed time.
57 
58  See also :py:mod:`timer` module.
59  """
60  def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
61  self._log_before_cursor_execute_log_before_cursor_execute = log_before_cursor_execute
62  self._do_logging_do_logging = do_logging
63  self._timer1_timer1 = timer.Timer(name)
64  self._timer2_timer2 = timer.Timer(name + " (before/after cursor)")
65 
66  def __enter__(self):
67  """
68  Enter context, start timer
69  """
70 # event.listen(engine.Engine, "before_cursor_execute", self._start_timer)
71 # event.listen(engine.Engine, "after_cursor_execute", self._stop_timer)
72  self._timer1_timer1.start()
73  return self
74 
75  def __exit__(self, exc_type, exc_val, exc_tb):
76  """
77  Exit context, stop and dump timer
78  """
79  if exc_type is None:
80  self._timer1_timer1.stop()
81  if self._do_logging_do_logging:
82  self._timer1_timer1.dump()
83 # event.remove(engine.Engine, "before_cursor_execute", self._start_timer)
84 # event.remove(engine.Engine, "after_cursor_execute", self._stop_timer)
85  return False
86 
87  def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
88  """Start counting"""
89  if self._log_before_cursor_execute_log_before_cursor_execute:
90  _LOG.info("before_cursor_execute")
91  self._timer2_timer2.start()
92 
93  def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
94  """Stop counting"""
95  self._timer2_timer2.stop()
96  if self._do_logging_do_logging:
97  self._timer2_timer2.dump()
98 
99 
100 def _split(seq, nItems):
101  """Split a sequence into smaller sequences"""
102  seq = list(seq)
103  while seq:
104  yield seq[:nItems]
105  del seq[:nItems]
106 
107 
108 def _coerce_uint64(df: pandas.DataFrame) -> pandas.DataFrame:
109  """Change type of the uint64 columns to int64, return copy of data frame.
110  """
111  names = [c[0] for c in df.dtypes.items() if c[1] == np.uint64]
112  return df.astype({name: np.int64 for name in names})
113 
114 
115 # Information about single visit
116 Visit = namedtuple('Visit', 'visitId visitTime lastObjectId lastSourceId')
117 
118 
119 @contextmanager
120 def _ansi_session(engine):
121  """Returns a connection, makes sure that ANSI mode is set for MySQL
122  """
123  with engine.begin() as conn:
124  if engine.name == 'mysql':
125  conn.execute(sql.text("SET SESSION SQL_MODE = 'ANSI'"))
126  yield conn
127  return
128 
129 
130 def _data_file_name(basename):
131  """Return path name of a data file.
132  """
133  return os.path.join("${DAX_APDB_DIR}", "data", basename)
134 
135 
136 class ApdbConfig(pexConfig.Config):
137 
138  db_url = Field(dtype=str, doc="SQLAlchemy database connection URI")
139  isolation_level = ChoiceField(dtype=str,
140  doc="Transaction isolation level",
141  allowed={"READ_COMMITTED": "Read committed",
142  "READ_UNCOMMITTED": "Read uncommitted",
143  "REPEATABLE_READ": "Repeatable read",
144  "SERIALIZABLE": "Serializable"},
145  default="READ_COMMITTED",
146  optional=True)
147  connection_pool = Field(dtype=bool,
148  doc=("If False then disable SQLAlchemy connection pool. "
149  "Do not use connection pool when forking."),
150  default=True)
151  connection_timeout = Field(dtype=float,
152  doc="Maximum time to wait time for database lock to be released before "
153  "exiting. Defaults to sqlachemy defaults if not set.",
154  default=None,
155  optional=True)
156  sql_echo = Field(dtype=bool,
157  doc="If True then pass SQLAlchemy echo option.",
158  default=False)
159  dia_object_index = ChoiceField(dtype=str,
160  doc="Indexing mode for DiaObject table",
161  allowed={'baseline': "Index defined in baseline schema",
162  'pix_id_iov': "(pixelId, objectId, iovStart) PK",
163  'last_object_table': "Separate DiaObjectLast table"},
164  default='baseline')
165  dia_object_nightly = Field(dtype=bool,
166  doc="Use separate nightly table for DiaObject",
167  default=False)
168  read_sources_months = Field(dtype=int,
169  doc="Number of months of history to read from DiaSource",
170  default=12)
171  read_forced_sources_months = Field(dtype=int,
172  doc="Number of months of history to read from DiaForcedSource",
173  default=12)
174  dia_object_columns = ListField(dtype=str,
175  doc="List of columns to read from DiaObject, by default read all columns",
176  default=[])
177  object_last_replace = Field(dtype=bool,
178  doc="If True (default) then use \"upsert\" for DiaObjectsLast table",
179  default=True)
180  schema_file = Field(dtype=str,
181  doc="Location of (YAML) configuration file with standard schema",
182  default=_data_file_name("apdb-schema.yaml"))
183  extra_schema_file = Field(dtype=str,
184  doc="Location of (YAML) configuration file with extra schema",
185  default=_data_file_name("apdb-schema-extra.yaml"))
186  column_map = Field(dtype=str,
187  doc="Location of (YAML) configuration file with column mapping",
188  default=_data_file_name("apdb-afw-map.yaml"))
189  prefix = Field(dtype=str,
190  doc="Prefix to add to table names and index names",
191  default="")
192  explain = Field(dtype=bool,
193  doc="If True then run EXPLAIN SQL command on each executed query",
194  default=False)
195  timer = Field(dtype=bool,
196  doc="If True then print/log timing information",
197  default=False)
198  diaobject_index_hint = Field(dtype=str,
199  doc="Name of the index to use with Oracle index hint",
200  default=None,
201  optional=True)
202  dynamic_sampling_hint = Field(dtype=int,
203  doc="If non-zero then use dynamic_sampling hint",
204  default=0)
205  cardinality_hint = Field(dtype=int,
206  doc="If non-zero then use cardinality hint",
207  default=0)
208 
209  def validate(self):
210  super().validate()
211  if self.isolation_levelisolation_level == "READ_COMMITTED" and self.db_urldb_url.startswith("sqlite"):
212  raise ValueError("Attempting to run Apdb with SQLITE and isolation level 'READ_COMMITTED.' "
213  "Use 'READ_UNCOMMITTED' instead.")
214 
215 
216 class Apdb(object):
217  """Interface to L1 database, hides all database access details.
218 
219  The implementation is configured via standard ``pex_config`` mechanism
220  using `ApdbConfig` configuration class. For an example of different
221  configurations check config/ folder.
222 
223  Parameters
224  ----------
225  config : `ApdbConfig`
226  afw_schemas : `dict`, optional
227  Dictionary with table name for a key and `afw.table.Schema`
228  for a value. Columns in schema will be added to standard
229  APDB schema.
230  """
231 
232  def __init__(self, config, afw_schemas=None):
233 
234  self.configconfig = config
235 
236  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
237  _LOG.debug("APDB Configuration:")
238  _LOG.debug(" dia_object_index: %s", self.configconfig.dia_object_index)
239  _LOG.debug(" dia_object_nightly: %s", self.configconfig.dia_object_nightly)
240  _LOG.debug(" read_sources_months: %s", self.configconfig.read_sources_months)
241  _LOG.debug(" read_forced_sources_months: %s", self.configconfig.read_forced_sources_months)
242  _LOG.debug(" dia_object_columns: %s", self.configconfig.dia_object_columns)
243  _LOG.debug(" object_last_replace: %s", self.configconfig.object_last_replace)
244  _LOG.debug(" schema_file: %s", self.configconfig.schema_file)
245  _LOG.debug(" extra_schema_file: %s", self.configconfig.extra_schema_file)
246  _LOG.debug(" column_map: %s", self.configconfig.column_map)
247  _LOG.debug(" schema prefix: %s", self.configconfig.prefix)
248 
249  # engine is reused between multiple processes, make sure that we don't
250  # share connections by disabling pool (by using NullPool class)
251  kw = dict(echo=self.configconfig.sql_echo)
252  conn_args = dict()
253  if not self.configconfig.connection_pool:
254  kw.update(poolclass=NullPool)
255  if self.configconfig.isolation_level is not None:
256  kw.update(isolation_level=self.configconfig.isolation_level)
257  if self.configconfig.connection_timeout is not None:
258  if self.configconfig.db_url.startswith("sqlite"):
259  conn_args.update(timeout=self.configconfig.connection_timeout)
260  elif self.configconfig.db_url.startswith(("postgresql", "mysql")):
261  conn_args.update(connect_timeout=self.configconfig.connection_timeout)
262  kw.update(connect_args=conn_args)
263  self._engine_engine = sqlalchemy.create_engine(self.configconfig.db_url, **kw)
264 
265  self._schema_schema = apdbSchema.ApdbSchema(engine=self._engine_engine,
266  dia_object_index=self.configconfig.dia_object_index,
267  dia_object_nightly=self.configconfig.dia_object_nightly,
268  schema_file=self.configconfig.schema_file,
269  extra_schema_file=self.configconfig.extra_schema_file,
270  column_map=self.configconfig.column_map,
271  afw_schemas=afw_schemas,
272  prefix=self.configconfig.prefix)
273 
274  def lastVisit(self):
275  """Returns last visit information or `None` if visits table is empty.
276 
277  Visits table is used by ap_proto to track visit information, it is
278  not a part of the regular APDB schema.
279 
280  Returns
281  -------
282  visit : `Visit` or `None`
283  Last stored visit info or `None` if there was nothing stored yet.
284  """
285 
286  with self._engine_engine.begin() as conn:
287 
288  stmnt = sql.select([sql.func.max(self._schema_schema.visits.c.visitId),
289  sql.func.max(self._schema_schema.visits.c.visitTime)])
290  res = conn.execute(stmnt)
291  row = res.fetchone()
292  if row[0] is None:
293  return None
294 
295  visitId = row[0]
296  visitTime = row[1]
297  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
298  visitTime, type(visitTime))
299 
300  # get max IDs from corresponding tables
301  stmnt = sql.select([sql.func.max(self._schema_schema.objects.c.diaObjectId)])
302  lastObjectId = conn.scalar(stmnt)
303  stmnt = sql.select([sql.func.max(self._schema_schema.sources.c.diaSourceId)])
304  lastSourceId = conn.scalar(stmnt)
305 
306  return Visit(visitId=visitId, visitTime=visitTime,
307  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
308 
309  def saveVisit(self, visitId, visitTime):
310  """Store visit information.
311 
312  This method is only used by ``ap_proto`` script from ``l1dbproto``
313  and is not intended for production pipelines.
314 
315  Parameters
316  ----------
317  visitId : `int`
318  Visit identifier
319  visitTime : `datetime.datetime`
320  Visit timestamp.
321  """
322 
323  ins = self._schema_schema.visits.insert().values(visitId=visitId,
324  visitTime=visitTime)
325  self._engine_engine.execute(ins)
326 
327  def tableRowCount(self):
328  """Returns dictionary with the table names and row counts.
329 
330  Used by ``ap_proto`` to keep track of the size of the database tables.
331  Depending on database technology this could be expensive operation.
332 
333  Returns
334  -------
335  row_counts : `dict`
336  Dict where key is a table name and value is a row count.
337  """
338  res = {}
339  tables = [self._schema_schema.objects, self._schema_schema.sources, self._schema_schema.forcedSources]
340  if self.configconfig.dia_object_index == 'last_object_table':
341  tables.append(self._schema_schema.objects_last)
342  for table in tables:
343  stmt = sql.select([func.count()]).select_from(table)
344  count = self._engine_engine.scalar(stmt)
345  res[table.name] = count
346 
347  return res
348 
349  def getDiaObjects(self, pixel_ranges, return_pandas=False):
350  """Returns catalog of DiaObject instances from given region.
351 
352  Objects are searched based on pixelization index and region is
353  determined by the set of indices. There is no assumption on a
354  particular type of index, client is responsible for consistency
355  when calculating pixelization indices.
356 
357  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
358  the schema of APDB table. Re-mapping of the column names is done for
359  some columns (based on column map passed to constructor) but types
360  or units are not changed.
361 
362  Returns only the last version of each DiaObject.
363 
364  Parameters
365  ----------
366  pixel_ranges : `list` of `tuple`
367  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
368  This defines set of pixel indices to be included in result.
369  return_pandas : `bool`
370  Return a `pandas.DataFrame` instead of
371  `lsst.afw.table.SourceCatalog`.
372 
373  Returns
374  -------
375  catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
376  Catalog containing DiaObject records.
377  """
378 
379  # decide what columns we need
380  if self.configconfig.dia_object_index == 'last_object_table':
381  table = self._schema_schema.objects_last
382  else:
383  table = self._schema_schema.objects
384  if not self.configconfig.dia_object_columns:
385  query = table.select()
386  else:
387  columns = [table.c[col] for col in self.configconfig.dia_object_columns]
388  query = sql.select(columns)
389 
390  if self.configconfig.diaobject_index_hint:
391  val = self.configconfig.diaobject_index_hint
392  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
393  if self.configconfig.dynamic_sampling_hint > 0:
394  val = self.configconfig.dynamic_sampling_hint
395  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
396  if self.configconfig.cardinality_hint > 0:
397  val = self.configconfig.cardinality_hint
398  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
399 
400  # build selection
401  exprlist = []
402  for low, upper in pixel_ranges:
403  upper -= 1
404  if low == upper:
405  exprlist.append(table.c.pixelId == low)
406  else:
407  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
408  query = query.where(sql.expression.or_(*exprlist))
409 
410  # select latest version of objects
411  if self.configconfig.dia_object_index != 'last_object_table':
412  query = query.where(table.c.validityEnd == None) # noqa: E711
413 
414  _LOG.debug("query: %s", query)
415 
416  if self.configconfig.explain:
417  # run the same query with explain
418  self._explain_explain(query, self._engine_engine)
419 
420  # execute select
421  with Timer('DiaObject select', self.configconfig.timer):
422  with self._engine_engine.begin() as conn:
423  if return_pandas:
424  objects = pandas.read_sql_query(query, conn)
425  else:
426  res = conn.execute(query)
427  objects = self._convertResult_convertResult(res, "DiaObject")
428  _LOG.debug("found %s DiaObjects", len(objects))
429  return objects
430 
431  def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False):
432  """Returns catalog of DiaSource instances from given region.
433 
434  Sources are searched based on pixelization index and region is
435  determined by the set of indices. There is no assumption on a
436  particular type of index, client is responsible for consistency
437  when calculating pixelization indices.
438 
439  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
440  the schema of APDB table. Re-mapping of the column names is done for
441  some columns (based on column map passed to constructor) but types or
442  units are not changed.
443 
444  Parameters
445  ----------
446  pixel_ranges : `list` of `tuple`
447  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
448  This defines set of pixel indices to be included in result.
449  dt : `datetime.datetime`
450  Time of the current visit
451  return_pandas : `bool`
452  Return a `pandas.DataFrame` instead of
453  `lsst.afw.table.SourceCatalog`.
454 
455  Returns
456  -------
457  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
458  Catalog containing DiaSource records. `None` is returned if
459  ``read_sources_months`` configuration parameter is set to 0.
460  """
461 
462  if self.configconfig.read_sources_months == 0:
463  _LOG.info("Skip DiaSources fetching")
464  return None
465 
466  table = self._schema_schema.sources
467  query = table.select()
468 
469  # build selection
470  exprlist = []
471  for low, upper in pixel_ranges:
472  upper -= 1
473  if low == upper:
474  exprlist.append(table.c.pixelId == low)
475  else:
476  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
477  query = query.where(sql.expression.or_(*exprlist))
478 
479  # execute select
480  with Timer('DiaSource select', self.configconfig.timer):
481  with _ansi_session(self._engine_engine) as conn:
482  if return_pandas:
483  sources = pandas.read_sql_query(query, conn)
484  else:
485  res = conn.execute(query)
486  sources = self._convertResult_convertResult(res, "DiaSource")
487  _LOG.debug("found %s DiaSources", len(sources))
488  return sources
489 
490  def getDiaSources(self, object_ids, dt, return_pandas=False):
491  """Returns catalog of DiaSource instances given set of DiaObject IDs.
492 
493  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
494  the schema of APDB table. Re-mapping of the column names is done for
495  some columns (based on column map passed to constructor) but types or
496  units are not changed.
497 
498  Parameters
499  ----------
500  object_ids :
501  Collection of DiaObject IDs
502  dt : `datetime.datetime`
503  Time of the current visit
504  return_pandas : `bool`
505  Return a `pandas.DataFrame` instead of
506  `lsst.afw.table.SourceCatalog`.
507 
508 
509  Returns
510  -------
511  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
512  Catalog contaning DiaSource records. `None` is returned if
513  ``read_sources_months`` configuration parameter is set to 0 or
514  when ``object_ids`` is empty.
515  """
516 
517  if self.configconfig.read_sources_months == 0:
518  _LOG.info("Skip DiaSources fetching")
519  return None
520 
521  if len(object_ids) <= 0:
522  _LOG.info("Skip DiaSources fetching - no Objects")
523  # this should create a catalog, but the list of columns may be empty
524  return None
525 
526  table = self._schema_schema.sources
527  sources = None
528  with Timer('DiaSource select', self.configconfig.timer):
529  with _ansi_session(self._engine_engine) as conn:
530  for ids in _split(sorted(object_ids), 1000):
531  query = 'SELECT * FROM "' + table.name + '" WHERE '
532 
533  # select by object id
534  ids = ",".join(str(id) for id in ids)
535  query += '"diaObjectId" IN (' + ids + ') '
536 
537  # execute select
538  if return_pandas:
539  df = pandas.read_sql_query(sql.text(query), conn)
540  if sources is None:
541  sources = df
542  else:
543  sources = sources.append(df)
544  else:
545  res = conn.execute(sql.text(query))
546  sources = self._convertResult_convertResult(res, "DiaSource", sources)
547 
548  _LOG.debug("found %s DiaSources", len(sources))
549  return sources
550 
551  def getDiaForcedSources(self, object_ids, dt, return_pandas=False):
552  """Returns catalog of DiaForcedSource instances matching given
553  DiaObjects.
554 
555  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
556  the schema of L1 database table. Re-mapping of the column names may
557  be done for some columns (based on column map passed to constructor)
558  but types or units are not changed.
559 
560  Parameters
561  ----------
562  object_ids :
563  Collection of DiaObject IDs
564  dt : `datetime.datetime`
565  Time of the current visit
566  return_pandas : `bool`
567  Return a `pandas.DataFrame` instead of
568  `lsst.afw.table.SourceCatalog`.
569 
570  Returns
571  -------
572  catalog : `lsst.afw.table.SourceCatalog` or `None`
573  Catalog contaning DiaForcedSource records. `None` is returned if
574  ``read_sources_months`` configuration parameter is set to 0 or
575  when ``object_ids`` is empty.
576  """
577 
578  if self.configconfig.read_forced_sources_months == 0:
579  _LOG.info("Skip DiaForceSources fetching")
580  return None
581 
582  if len(object_ids) <= 0:
583  _LOG.info("Skip DiaForceSources fetching - no Objects")
584  # this should create a catalog, but the list of columns may be empty
585  return None
586 
587  table = self._schema_schema.forcedSources
588  sources = None
589 
590  with Timer('DiaForcedSource select', self.configconfig.timer):
591  with _ansi_session(self._engine_engine) as conn:
592  for ids in _split(sorted(object_ids), 1000):
593 
594  query = 'SELECT * FROM "' + table.name + '" WHERE '
595 
596  # select by object id
597  ids = ",".join(str(id) for id in ids)
598  query += '"diaObjectId" IN (' + ids + ') '
599 
600  # execute select
601  if return_pandas:
602  df = pandas.read_sql_query(sql.text(query), conn)
603  if sources is None:
604  sources = df
605  else:
606  sources = sources.append(df)
607  else:
608  res = conn.execute(sql.text(query))
609  sources = self._convertResult_convertResult(res, "DiaForcedSource", sources)
610 
611  _LOG.debug("found %s DiaForcedSources", len(sources))
612  return sources
613 
614  def storeDiaObjects(self, objs, dt):
615  """Store catalog of DiaObjects from current visit.
616 
617  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
618  compatible with the schema of APDB table:
619 
620  - column names must correspond to database table columns
621  - some columns names are re-mapped based on column map passed to
622  constructor
623  - types and units of the columns must match database definitions,
624  no unit conversion is performed presently
625  - columns that have default values in database schema can be
626  omitted from afw schema
627  - this method knows how to fill interval-related columns
628  (validityStart, validityEnd) they do not need to appear in
629  afw schema
630 
631  Parameters
632  ----------
633  objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
634  Catalog with DiaObject records
635  dt : `datetime.datetime`
636  Time of the visit
637  """
638 
639  if isinstance(objs, pandas.DataFrame):
640  ids = sorted(objs['diaObjectId'])
641  else:
642  ids = sorted([obj['id'] for obj in objs])
643  _LOG.debug("first object ID: %d", ids[0])
644 
645  # NOTE: workaround for sqlite, need this here to avoid
646  # "database is locked" error.
647  table = self._schema_schema.objects
648 
649  # everything to be done in single transaction
650  with _ansi_session(self._engine_engine) as conn:
651 
652  ids = ",".join(str(id) for id in ids)
653 
654  if self.configconfig.dia_object_index == 'last_object_table':
655 
656  # insert and replace all records in LAST table, mysql and postgres have
657  # non-standard features (handled in _storeObjectsAfw)
658  table = self._schema_schema.objects_last
659  do_replace = self.configconfig.object_last_replace
660  # If the input data is of type Pandas, we drop the previous
661  # objects regardless of the do_replace setting due to how
662  # Pandas inserts objects.
663  if not do_replace or isinstance(objs, pandas.DataFrame):
664  query = 'DELETE FROM "' + table.name + '" '
665  query += 'WHERE "diaObjectId" IN (' + ids + ') '
666 
667  if self.configconfig.explain:
668  # run the same query with explain
669  self._explain_explain(query, conn)
670 
671  with Timer(table.name + ' delete', self.configconfig.timer):
672  res = conn.execute(sql.text(query))
673  _LOG.debug("deleted %s objects", res.rowcount)
674 
675  extra_columns = dict(lastNonForcedSource=dt)
676  if isinstance(objs, pandas.DataFrame):
677  with Timer("DiaObjectLast insert", self.configconfig.timer):
678  objs = _coerce_uint64(objs)
679  for col, data in extra_columns.items():
680  objs[col] = data
681  objs.to_sql("DiaObjectLast", conn, if_exists='append',
682  index=False)
683  else:
684  self._storeObjectsAfw_storeObjectsAfw(objs, conn, table, "DiaObjectLast",
685  replace=do_replace,
686  extra_columns=extra_columns)
687 
688  else:
689 
690  # truncate existing validity intervals
691  table = self._schema_schema.objects
692  query = 'UPDATE "' + table.name + '" '
693  query += "SET \"validityEnd\" = '" + str(dt) + "' "
694  query += 'WHERE "diaObjectId" IN (' + ids + ') '
695  query += 'AND "validityEnd" IS NULL'
696 
697  # _LOG.debug("query: %s", query)
698 
699  if self.configconfig.explain:
700  # run the same query with explain
701  self._explain_explain(query, conn)
702 
703  with Timer(table.name + ' truncate', self.configconfig.timer):
704  res = conn.execute(sql.text(query))
705  _LOG.debug("truncated %s intervals", res.rowcount)
706 
707  # insert new versions
708  if self.configconfig.dia_object_nightly:
709  table = self._schema_schema.objects_nightly
710  else:
711  table = self._schema_schema.objects
712  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
713  validityEnd=None)
714  if isinstance(objs, pandas.DataFrame):
715  with Timer("DiaObject insert", self.configconfig.timer):
716  objs = _coerce_uint64(objs)
717  for col, data in extra_columns.items():
718  objs[col] = data
719  objs.to_sql("DiaObject", conn, if_exists='append',
720  index=False)
721  else:
722  self._storeObjectsAfw_storeObjectsAfw(objs, conn, table, "DiaObject",
723  extra_columns=extra_columns)
724 
725  def storeDiaSources(self, sources):
726  """Store catalog of DIASources from current visit.
727 
728  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
729  compatible with the schema of L1 database table:
730 
731  - column names must correspond to database table columns
732  - some columns names may be re-mapped based on column map passed to
733  constructor
734  - types and units of the columns must match database definitions,
735  no unit conversion is performed presently
736  - columns that have default values in database schema can be
737  omitted from afw schema
738 
739  Parameters
740  ----------
741  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
742  Catalog containing DiaSource records
743  """
744 
745  # everything to be done in single transaction
746  with _ansi_session(self._engine_engine) as conn:
747 
748  if isinstance(sources, pandas.DataFrame):
749  with Timer("DiaSource insert", self.configconfig.timer):
750  sources = _coerce_uint64(sources)
751  sources.to_sql("DiaSource", conn, if_exists='append',
752  index=False)
753  else:
754  table = self._schema_schema.sources
755  self._storeObjectsAfw_storeObjectsAfw(sources, conn, table, "DiaSource")
756 
757  def storeDiaForcedSources(self, sources):
758  """Store a set of DIAForcedSources from current visit.
759 
760  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
761  compatible with the schema of L1 database table:
762 
763  - column names must correspond to database table columns
764  - some columns names may be re-mapped based on column map passed to
765  constructor
766  - types and units of the columns must match database definitions,
767  no unit conversion is performed presently
768  - columns that have default values in database schema can be
769  omitted from afw schema
770 
771  Parameters
772  ----------
773  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
774  Catalog containing DiaForcedSource records
775  """
776 
777  # everything to be done in single transaction
778  with _ansi_session(self._engine_engine) as conn:
779 
780  if isinstance(sources, pandas.DataFrame):
781  with Timer("DiaForcedSource insert", self.configconfig.timer):
782  sources = _coerce_uint64(sources)
783  sources.to_sql("DiaForcedSource", conn, if_exists='append',
784  index=False)
785  else:
786  table = self._schema_schema.forcedSources
787  self._storeObjectsAfw_storeObjectsAfw(sources, conn, table, "DiaForcedSource")
788 
790  """Return the number of DiaObjects that have only one DiaSource associated
791  with them.
792 
793  Used as part of ap_verify metrics.
794 
795  Returns
796  -------
797  count : `int`
798  Number of DiaObjects with exactly one associated DiaSource.
799  """
800  # Retrieve the DiaObject table.
801  table = self._schema_schema.objects
802 
803  # Construct the sql statement.
804  stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
805  stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
806 
807  # Return the count.
808  count = self._engine_engine.scalar(stmt)
809 
810  return count
811 
812  def isVisitProcessed(self, visitInfo):
813  """Test whether data from an image has been loaded into the database.
814 
815  Used as part of ap_verify metrics.
816 
817  Parameters
818  ----------
819  visitInfo : `lsst.afw.image.VisitInfo`
820  The metadata for the image of interest.
821 
822  Returns
823  -------
824  isProcessed : `bool`
825  `True` if the data are present, `False` otherwise.
826  """
827  id = visitInfo.getExposureId()
828  table = self._schema_schema.sources
829  idField = table.c.ccdVisitId
830 
831  # Hopefully faster than SELECT DISTINCT
832  query = sql.select([idField]).select_from(table) \
833  .where(idField == id).limit(1)
834 
835  return self._engine_engine.scalar(query) is not None
836 
837  def dailyJob(self):
838  """Implement daily activities like cleanup/vacuum.
839 
840  What should be done during daily cleanup is determined by
841  configuration/schema.
842  """
843 
844  # move data from DiaObjectNightly into DiaObject
845  if self.configconfig.dia_object_nightly:
846  with _ansi_session(self._engine_engine) as conn:
847  query = 'INSERT INTO "' + self._schema_schema.objects.name + '" '
848  query += 'SELECT * FROM "' + self._schema_schema.objects_nightly.name + '"'
849  with Timer('DiaObjectNightly copy', self.configconfig.timer):
850  conn.execute(sql.text(query))
851 
852  query = 'DELETE FROM "' + self._schema_schema.objects_nightly.name + '"'
853  with Timer('DiaObjectNightly delete', self.configconfig.timer):
854  conn.execute(sql.text(query))
855 
856  if self._engine_engine.name == 'postgresql':
857 
858  # do VACUUM on all tables
859  _LOG.info("Running VACUUM on all tables")
860  connection = self._engine_engine.raw_connection()
861  ISOLATION_LEVEL_AUTOCOMMIT = 0
862  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
863  cursor = connection.cursor()
864  cursor.execute("VACUUM ANALYSE")
865 
866  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
867  """Create or re-create all tables.
868 
869  Parameters
870  ----------
871  drop : `bool`
872  If True then drop tables before creating new ones.
873  mysql_engine : `str`, optional
874  Name of the MySQL engine to use for new tables.
875  oracle_tablespace : `str`, optional
876  Name of Oracle tablespace.
877  oracle_iot : `bool`, optional
878  Make Index-organized DiaObjectLast table.
879  """
880  self._schema_schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
881  oracle_tablespace=oracle_tablespace,
882  oracle_iot=oracle_iot)
883 
884  def _explain(self, query, conn):
885  """Run the query with explain
886  """
887 
888  _LOG.info("explain for query: %s...", query[:64])
889 
890  if conn.engine.name == 'mysql':
891  query = "EXPLAIN EXTENDED " + query
892  else:
893  query = "EXPLAIN " + query
894 
895  res = conn.execute(sql.text(query))
896  if res.returns_rows:
897  _LOG.info("explain: %s", res.keys())
898  for row in res:
899  _LOG.info("explain: %s", row)
900  else:
901  _LOG.info("EXPLAIN returned nothing")
902 
903  def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
904  replace=False, extra_columns=None):
905  """Generic store method.
906 
907  Takes catalog of records and stores a bunch of objects in a table.
908 
909  Parameters
910  ----------
911  objects : `lsst.afw.table.BaseCatalog`
912  Catalog containing object records
913  conn :
914  Database connection
915  table : `sqlalchemy.Table`
916  Database table
917  schema_table_name : `str`
918  Name of the table to be used for finding table schema.
919  replace : `boolean`
920  If `True` then use replace instead of INSERT (should be more efficient)
921  extra_columns : `dict`, optional
922  Mapping (column_name, column_value) which gives column values to add
923  to every row, only if column is missing in catalog records.
924  """
925 
926  def quoteValue(v):
927  """Quote and escape values"""
928  if v is None:
929  v = "NULL"
930  elif isinstance(v, datetime):
931  v = "'" + str(v) + "'"
932  elif isinstance(v, str):
933  # we don't expect nasty stuff in strings
934  v = "'" + v + "'"
935  elif isinstance(v, geom.Angle):
936  v = v.asDegrees()
937  if np.isfinite(v):
938  v = str(v)
939  else:
940  v = "NULL"
941  else:
942  if np.isfinite(v):
943  v = str(v)
944  else:
945  v = "NULL"
946  return v
947 
948  def quoteId(columnName):
949  """Smart quoting for column names.
950  Lower-case names are not quoted.
951  """
952  if not columnName.islower():
953  columnName = '"' + columnName + '"'
954  return columnName
955 
956  if conn.engine.name == "oracle":
957  return self._storeObjectsAfwOracle_storeObjectsAfwOracle(objects, conn, table,
958  schema_table_name, replace,
959  extra_columns)
960 
961  schema = objects.getSchema()
962  # use extra columns if specified
963  extra_fields = list((extra_columns or {}).keys())
964 
965  afw_fields = [field.getName() for key, field in schema
966  if field.getName() not in extra_fields]
967 
968  column_map = self._schema_schema.getAfwColumns(schema_table_name)
969  # list of columns (as in cat schema)
970  fields = [column_map[field].name for field in afw_fields if field in column_map]
971 
972  if replace and conn.engine.name in ('mysql', 'sqlite'):
973  query = 'REPLACE INTO '
974  else:
975  query = 'INSERT INTO '
976  qfields = [quoteId(field) for field in fields + extra_fields]
977  query += quoteId(table.name) + ' (' + ','.join(qfields) + ') ' + 'VALUES '
978 
979  values = []
980  for rec in objects:
981  row = []
982  for field in afw_fields:
983  if field not in column_map:
984  continue
985  value = rec[field]
986  if column_map[field].type == "DATETIME" and \
987  np.isfinite(value):
988  # convert seconds into datetime
989  value = datetime.utcfromtimestamp(value)
990  row.append(quoteValue(value))
991  for field in extra_fields:
992  row.append(quoteValue(extra_columns[field]))
993  values.append('(' + ','.join(row) + ')')
994 
995  if self.configconfig.explain:
996  # run the same query with explain, only give it one row of data
997  self._explain_explain(query + values[0], conn)
998 
999  query += ','.join(values)
1000 
1001  if replace and conn.engine.name == 'postgresql':
1002  # This depends on that "replace" can only be true for DiaObjectLast table
1003  pks = ('pixelId', 'diaObjectId')
1004  query += " ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".format(*pks)
1005  fields = [column_map[field].name for field in afw_fields if field in column_map]
1006  fields = ['"{0}" = EXCLUDED."{0}"'.format(field)
1007  for field in fields if field not in pks]
1008  query += ', '.join(fields)
1009 
1010  # _LOG.debug("query: %s", query)
1011  _LOG.info("%s: will store %d records", table.name, len(objects))
1012  with Timer(table.name + ' insert', self.configconfig.timer):
1013  res = conn.execute(sql.text(query))
1014  _LOG.debug("inserted %s intervals", res.rowcount)
1015 
1016  def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
1017  replace=False, extra_columns=None):
1018  """Store method for Oracle.
1019 
1020  Takes catalog of records and stores a bunch of objects in a table.
1021 
1022  Parameters
1023  ----------
1024  objects : `lsst.afw.table.BaseCatalog`
1025  Catalog containing object records
1026  conn :
1027  Database connection
1028  table : `sqlalchemy.Table`
1029  Database table
1030  schema_table_name : `str`
1031  Name of the table to be used for finding table schema.
1032  replace : `boolean`
1033  If `True` then use replace instead of INSERT (should be more efficient)
1034  extra_columns : `dict`, optional
1035  Mapping (column_name, column_value) which gives column values to add
1036  to every row, only if column is missing in catalog records.
1037  """
1038 
1039  def quoteId(columnName):
1040  """Smart quoting for column names.
1041  Lower-case naems are not quoted (Oracle backend needs them unquoted).
1042  """
1043  if not columnName.islower():
1044  columnName = '"' + columnName + '"'
1045  return columnName
1046 
1047  schema = objects.getSchema()
1048 
1049  # use extra columns that as overrides always.
1050  extra_fields = list((extra_columns or {}).keys())
1051 
1052  afw_fields = [field.getName() for key, field in schema
1053  if field.getName() not in extra_fields]
1054  # _LOG.info("afw_fields: %s", afw_fields)
1055 
1056  column_map = self._schema_schema.getAfwColumns(schema_table_name)
1057  # _LOG.info("column_map: %s", column_map)
1058 
1059  # list of columns (as in cat schema)
1060  fields = [column_map[field].name for field in afw_fields
1061  if field in column_map]
1062  # _LOG.info("fields: %s", fields)
1063 
1064  qfields = [quoteId(field) for field in fields + extra_fields]
1065 
1066  if not replace:
1067  vals = [":col{}".format(i) for i in range(len(fields))]
1068  vals += [":extcol{}".format(i) for i in range(len(extra_fields))]
1069  query = 'INSERT INTO ' + quoteId(table.name)
1070  query += ' (' + ','.join(qfields) + ') VALUES'
1071  query += ' (' + ','.join(vals) + ')'
1072  else:
1073  qvals = [":col{} {}".format(i, quoteId(field)) for i, field in enumerate(fields)]
1074  qvals += [":extcol{} {}".format(i, quoteId(field)) for i, field in enumerate(extra_fields)]
1075  pks = ('pixelId', 'diaObjectId')
1076  onexpr = ["SRC.{col} = DST.{col}".format(col=quoteId(col)) for col in pks]
1077  setexpr = ["DST.{col} = SRC.{col}".format(col=quoteId(col))
1078  for col in fields + extra_fields if col not in pks]
1079  vals = ["SRC.{col}".format(col=quoteId(col)) for col in fields + extra_fields]
1080  query = "MERGE INTO {} DST ".format(quoteId(table.name))
1081  query += "USING (SELECT {} FROM DUAL) SRC ".format(", ".join(qvals))
1082  query += "ON ({}) ".format(" AND ".join(onexpr))
1083  query += "WHEN MATCHED THEN UPDATE SET {} ".format(" ,".join(setexpr))
1084  query += "WHEN NOT MATCHED THEN INSERT "
1085  query += "({}) VALUES ({})".format(','.join(qfields), ','.join(vals))
1086  # _LOG.info("query: %s", query)
1087 
1088  values = []
1089  for rec in objects:
1090  row = {}
1091  col = 0
1092  for field in afw_fields:
1093  if field not in column_map:
1094  continue
1095  value = rec[field]
1096  if column_map[field].type == "DATETIME" and not np.isnan(value):
1097  # convert seconds into datetime
1098  value = datetime.utcfromtimestamp(value)
1099  elif isinstance(value, geom.Angle):
1100  value = str(value.asDegrees())
1101  elif not np.isfinite(value):
1102  value = None
1103  row["col{}".format(col)] = value
1104  col += 1
1105  for i, field in enumerate(extra_fields):
1106  row["extcol{}".format(i)] = extra_columns[field]
1107  values.append(row)
1108 
1109  # _LOG.debug("query: %s", query)
1110  _LOG.info("%s: will store %d records", table.name, len(objects))
1111  with Timer(table.name + ' insert', self.configconfig.timer):
1112  res = conn.execute(sql.text(query), values)
1113  _LOG.debug("inserted %s intervals", res.rowcount)
1114 
1115  def _convertResult(self, res, table_name, catalog=None):
1116  """Convert result set into output catalog.
1117 
1118  Parameters
1119  ----------
1120  res : `sqlalchemy.ResultProxy`
1121  SQLAlchemy result set returned by query.
1122  table_name : `str`
1123  Name of the table.
1124  catalog : `lsst.afw.table.BaseCatalog`
1125  If not None then extend existing catalog
1126 
1127  Returns
1128  -------
1129  catalog : `lsst.afw.table.SourceCatalog`
1130  If ``catalog`` is None then new instance is returned, otherwise
1131  ``catalog`` is updated and returned.
1132  """
1133  # make catalog schema
1134  columns = res.keys()
1135  schema, col_map = self._schema_schema.getAfwSchema(table_name, columns)
1136  if catalog is None:
1137  _LOG.debug("_convertResult: schema: %s", schema)
1138  _LOG.debug("_convertResult: col_map: %s", col_map)
1139  catalog = afwTable.SourceCatalog(schema)
1140 
1141  # fill catalog
1142  for row in res:
1143  record = catalog.addNew()
1144  for col, value in row.items():
1145  # some columns may exist in database but not included in afw schema
1146  col = col_map.get(col)
1147  if col is not None:
1148  if isinstance(value, datetime):
1149  # convert datetime to number of seconds
1150  value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1151  elif col.getTypeString() == 'Angle' and value is not None:
1152  value = value * geom.degrees
1153  if value is not None:
1154  record.set(col, value)
1155 
1156  return catalog
table::Key< int > type
Definition: Detector.cc:163
def storeDiaObjects(self, objs, dt)
Definition: apdb.py:614
def lastVisit(self)
Definition: apdb.py:274
def saveVisit(self, visitId, visitTime)
Definition: apdb.py:309
def __init__(self, config, afw_schemas=None)
Definition: apdb.py:232
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: apdb.py:1017
def storeDiaSources(self, sources)
Definition: apdb.py:725
def dailyJob(self)
Definition: apdb.py:837
def _convertResult(self, res, table_name, catalog=None)
Definition: apdb.py:1115
def isVisitProcessed(self, visitInfo)
Definition: apdb.py:812
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: apdb.py:904
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
Definition: apdb.py:431
def getDiaObjects(self, pixel_ranges, return_pandas=False)
Definition: apdb.py:349
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
Definition: apdb.py:866
def _explain(self, query, conn)
Definition: apdb.py:884
def storeDiaForcedSources(self, sources)
Definition: apdb.py:757
def tableRowCount(self)
Definition: apdb.py:327
def countUnassociatedObjects(self)
Definition: apdb.py:789
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
Definition: apdb.py:551
def getDiaSources(self, object_ids, dt, return_pandas=False)
Definition: apdb.py:490
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
Definition: apdb.py:60
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: apdb.py:75
def __enter__(self)
Definition: apdb.py:66
A class representing an angle.
Definition: Angle.h:127
daf::base::PropertyList * list
Definition: fits.cc:913
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174