22 """Module responsible for APDB schema operations.
25 __all__ = [
"ColumnDef",
"IndexDef",
"TableDef",
26 "make_minimal_dia_object_schema",
"make_minimal_dia_source_schema",
29 from collections
import namedtuple
34 from sqlalchemy
import (Column, Index, MetaData, PrimaryKeyConstraint,
35 UniqueConstraint, Table)
36 from sqlalchemy.schema
import CreateTable, CreateIndex
37 from sqlalchemy.ext.compiler
import compiles
41 _LOG = logging.getLogger(__name__.partition(
".")[2])
53 ColumnDef = namedtuple(
'ColumnDef',
'name type nullable default description unit ucd')
59 IndexDef = namedtuple(
'IndexDef',
'name type columns')
66 TableDef = namedtuple(
'TableDef',
'name description columns indices')
70 """Define and create the minimal schema required for a DIAObject.
74 schema : `lsst.afw.table.Schema`
75 Minimal schema for DIAObjects.
77 schema = afwTable.SourceTable.makeMinimalSchema()
78 schema.addField(
"pixelId", type=
'L',
79 doc=
'Unique spherical pixelization identifier.')
80 schema.addField(
"nDiaSources", type=
'L')
85 """ Define and create the minimal schema required for a DIASource.
89 schema : `lsst.afw.table.Schema`
90 Minimal schema for DIASources.
92 schema = afwTable.SourceTable.makeMinimalSchema()
93 schema.addField(
"diaObjectId", type=
'L',
94 doc=
'Unique identifier of the DIAObject this source is '
96 schema.addField(
"ccdVisitId", type=
'L',
97 doc=
'Id of the exposure and ccd this object was detected '
99 schema.addField(
"psFlux", type=
'D',
100 doc=
'Calibrated PSF flux of this source.')
101 schema.addField(
"psFluxErr", type=
'D',
102 doc=
'Calibrated PSF flux err of this source.')
103 schema.addField(
"flags", type=
'L',
104 doc=
'Quality flags for this DIASource.')
105 schema.addField(
"pixelId", type=
'L',
106 doc=
'Unique spherical pixelization identifier.')
110 @compiles(CreateTable,
"oracle")
111 def _add_suffixes_tbl(element, compiler, **kw):
112 """Add all needed suffixed for Oracle CREATE TABLE statement.
114 This is a special compilation method for CreateTable clause which
115 registers itself with SQLAlchemy using @compiles decotrator. Exact method
116 name does not matter. Client can pass a dict to ``info`` keyword argument
117 of Table constructor. If the dict has a key "oracle_tablespace" then its
118 value is used as tablespace name. If the dict has a key "oracle_iot" with
119 true value then IOT table is created. This method generates additional
120 clauses for CREATE TABLE statement which specify tablespace name and
121 "ORGANIZATION INDEX" for IOT.
123 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html
125 text = compiler.visit_create_table(element, **kw)
126 _LOG.debug(
"text: %r", text)
127 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
128 oracle_iot = element.element.info.get(
"oracle_iot",
False)
129 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
131 text +=
" ORGANIZATION INDEX"
132 if oracle_tablespace:
133 text +=
" TABLESPACE " + oracle_tablespace
134 _LOG.debug(
"text: %r", text)
138 @compiles(CreateIndex,
"oracle")
139 def _add_suffixes_idx(element, compiler, **kw):
140 """Add all needed suffixed for Oracle CREATE INDEX statement.
142 This is a special compilation method for CreateIndex clause which
143 registers itself with SQLAlchemy using @compiles decotrator. Exact method
144 name does not matter. Client can pass a dict to ``info`` keyword argument
145 of Index constructor. If the dict has a key "oracle_tablespace" then its
146 value is used as tablespace name. This method generates additional
147 clause for CREATE INDEX statement which specifies tablespace name.
149 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html
151 text = compiler.visit_create_index(element, **kw)
152 _LOG.debug(
"text: %r", text)
153 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
154 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
155 if oracle_tablespace:
156 text +=
" TABLESPACE " + oracle_tablespace
157 _LOG.debug(
"text: %r", text)
162 """Class for management of APDB schema.
166 objects : `sqlalchemy.Table`
167 DiaObject table instance
168 objects_nightly : `sqlalchemy.Table`
169 DiaObjectNightly table instance, may be None
170 objects_last : `sqlalchemy.Table`
171 DiaObjectLast table instance, may be None
172 sources : `sqlalchemy.Table`
173 DiaSource table instance
174 forcedSources : `sqlalchemy.Table`
175 DiaForcedSource table instance
176 visits : `sqlalchemy.Table`
177 ApdbProtoVisits table instance
181 engine : `sqlalchemy.engine.Engine`
182 SQLAlchemy engine instance
183 dia_object_index : `str`
184 Indexing mode for DiaObject table, see `ApdbConfig.dia_object_index`
186 dia_object_nightly : `bool`
187 If `True` then create per-night DiaObject table as well.
189 Name of the YAML schema file.
190 extra_schema_file : `str`, optional
191 Name of the YAML schema file with extra column definitions.
192 column_map : `str`, optional
193 Name of the YAML file with column mappings.
194 afw_schemas : `dict`, optional
195 Dictionary with table name for a key and `afw.table.Schema`
196 for a value. Columns in schema will be added to standard APDB
197 schema (only if standard schema does not have matching column).
198 prefix : `str`, optional
199 Prefix to add to all scheam elements.
203 _afw_type_map = {
"I":
"INT",
210 _afw_type_map_reverse = {
"INT":
"I",
218 def __init__(self, engine, dia_object_index, dia_object_nightly,
219 schema_file, extra_schema_file=None, column_map=None,
220 afw_schemas=None, prefix=""):
237 _LOG.debug(
"Reading column map file %s", column_map)
238 with open(column_map)
as yaml_stream:
240 self.
_column_map = yaml.load(yaml_stream, Loader=yaml.SafeLoader)
243 _LOG.debug(
"No column map file is given, initialize to empty")
257 FLOAT=sqlalchemy.types.Float,
258 DATETIME=sqlalchemy.types.TIMESTAMP,
259 BIGINT=sqlalchemy.types.BigInteger,
260 INTEGER=sqlalchemy.types.Integer,
261 INT=sqlalchemy.types.Integer,
262 TINYINT=sqlalchemy.types.Integer,
263 BLOB=sqlalchemy.types.LargeBinary,
264 CHAR=sqlalchemy.types.CHAR,
265 BOOL=sqlalchemy.types.Boolean)
270 def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
271 """Generate schema for all tables.
275 mysql_engine : `str`, optional
276 MySQL engine type to use for new tables.
277 oracle_tablespace : `str`, optional
278 Name of Oracle tablespace, only useful with oracle
279 oracle_iot : `bool`, optional
280 Make Index-organized DiaObjectLast table.
283 info = dict(oracle_tablespace=oracle_tablespace)
287 constraints = self.
_tableIndices(
'DiaObjectIndexHtmFirst', info)
292 mysql_engine=mysql_engine,
300 mysql_engine=mysql_engine,
307 info2.update(oracle_iot=oracle_iot)
311 mysql_engine=mysql_engine,
316 for table_name
in (
'DiaSource',
'SSObject',
'DiaForcedSource',
'DiaObject_To_Object_Match'):
320 mysql_engine=mysql_engine,
322 if table_name ==
'DiaSource':
324 elif table_name ==
'DiaForcedSource':
329 Column(
'visitId', sqlalchemy.types.BigInteger, nullable=
False),
330 Column(
'visitTime', sqlalchemy.types.TIMESTAMP, nullable=
False),
331 PrimaryKeyConstraint(
'visitId', name=self.
_prefix+
'PK_ApdbProtoVisits'),
332 Index(self.
_prefix+
'IDX_ApdbProtoVisits_vTime',
'visitTime', info=info),
333 mysql_engine=mysql_engine,
337 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
338 """Create or re-create all tables.
342 drop : `bool`, optional
343 If True then drop tables before creating new ones.
344 mysql_engine : `str`, optional
345 MySQL engine type to use for new tables.
346 oracle_tablespace : `str`, optional
347 Name of Oracle tablespace, only useful with oracle
348 oracle_iot : `bool`, optional
349 Make Index-organized DiaObjectLast table.
353 _LOG.debug(
"clear metadata")
355 _LOG.debug(
"re-do schema mysql_engine=%r oracle_tablespace=%r",
356 mysql_engine, oracle_tablespace)
357 self.
_makeTables(mysql_engine=mysql_engine, oracle_tablespace=oracle_tablespace,
358 oracle_iot=oracle_iot)
362 _LOG.info(
'dropping all tables')
364 _LOG.info(
'creating all tables')
368 """Return afw schema for given table.
373 One of known APDB table names.
374 columns : `list` of `str`, optional
375 Include only given table columns in schema, by default all columns
380 schema : `lsst.afw.table.Schema`
382 Mapping of the table/result column names into schema key.
390 schema = afwTable.SourceTable.makeMinimalSchema()
391 for column
in table.columns:
392 if columns
and column.name
not in columns:
394 afw_col = col_map.get(column.name, column.name)
395 if afw_col
in schema.getNames():
397 key = schema.find(afw_col).getKey()
398 elif column.type
in (
"DOUBLE",
"FLOAT")
and column.unit ==
"deg":
403 key = schema.addField(afw_col,
405 doc=column.description
or "",
407 elif column.type ==
"BLOB":
411 units = column.unit
or ""
414 key = schema.addField(afw_col,
416 doc=column.description
or "",
418 parse_strict=
"silent",
421 key = schema.addField(afw_col,
423 doc=column.description
or "",
424 parse_strict=
"silent")
426 key = schema.addField(afw_col,
428 doc=column.description
or "",
430 parse_strict=
"silent")
431 col2afw[column.name] = key
433 return schema, col2afw
436 """Returns mapping of afw column names to Column definitions.
441 One of known APDB table names.
446 Mapping of afw column names to `ColumnDef` instances.
452 for column
in table.columns:
453 afw_name = col_map.get(column.name, column.name)
454 cmap[afw_name] = column
458 """Returns mapping of column names to Column definitions.
463 One of known APDB table names.
468 Mapping of column names to `ColumnDef` instances.
471 cmap = {column.name: column
for column
in table.columns}
474 def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None):
475 """Create schema definitions for all tables.
477 Reads YAML schemas and builds dictionary containing `TableDef`
478 instances for each table.
483 Name of YAML file with standard cat schema.
484 extra_schema_file : `str`, optional
485 Name of YAML file with extra table information or `None`.
486 afw_schemas : `dict`, optional
487 Dictionary with table name for a key and `afw.table.Schema`
488 for a value. Columns in schema will be added to standard APDB
489 schema (only if standard schema does not have matching column).
494 Mapping of table names to `TableDef` instances.
497 _LOG.debug(
"Reading schema file %s", schema_file)
498 with open(schema_file)
as yaml_stream:
499 tables =
list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
501 _LOG.debug(
"Read %d tables from schema", len(tables))
503 if extra_schema_file:
504 _LOG.debug(
"Reading extra schema file %s", extra_schema_file)
505 with open(extra_schema_file)
as yaml_stream:
506 extras =
list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
508 schemas_extra = {table[
'table']: table
for table
in extras}
514 table_name = table[
'table']
515 if table_name
in schemas_extra:
516 columns = table[
'columns']
517 extra_columns = schemas_extra[table_name].get(
'columns', [])
518 extra_columns = {col[
'name']: col
for col
in extra_columns}
519 _LOG.debug(
"Extra columns for table %s: %s", table_name, extra_columns.keys())
521 for col
in table[
'columns']:
522 if col[
'name']
in extra_columns:
523 columns.append(extra_columns.pop(col[
'name']))
527 table[
'columns'] = columns +
list(extra_columns.values())
529 if 'indices' in schemas_extra[table_name]:
530 raise RuntimeError(
"Extra table definition contains indices, "
531 "merging is not implemented")
533 del schemas_extra[table_name]
536 tables += schemas_extra.values()
542 columns = table.get(
'columns', [])
544 table_name = table[
'table']
545 afw_schema = afw_schemas
and afw_schemas.get(table_name)
548 column_names = {col[
'name']
for col
in columns}
549 column_names_lower = {col.lower()
for col
in column_names}
550 for _, field
in afw_schema:
552 if column[
'name']
not in column_names:
554 if column[
'name'].lower()
in column_names_lower:
555 raise ValueError(
"afw.table column name case does not match schema column name")
556 columns.append(column)
561 if "default" not in col:
563 if col[
'type']
not in (
"BLOB",
"DATETIME"):
566 default = col[
"default"]
570 nullable=col.get(
"nullable"),
572 description=col.get(
"description"),
573 unit=col.get(
"unit"),
575 table_columns.append(column)
578 for idx
in table.get(
'indices', []):
579 index =
IndexDef(name=idx.get(
'name'),
580 type=idx.get(
'type'),
581 columns=idx.get(
'columns'))
582 table_indices.append(index)
584 schemas[table_name] =
TableDef(name=table_name,
585 description=table.get(
'description'),
586 columns=table_columns,
587 indices=table_indices)
591 def _tableColumns(self, table_name):
592 """Return set of columns in a table
602 List of `Column` objects.
607 table_schema = self.
_schemas[table_name]
609 for index
in table_schema.indices:
610 if index.type ==
'PRIMARY':
611 pkey_columns =
set(index.columns)
616 for column
in table_schema.columns:
617 kwargs = dict(nullable=column.nullable)
618 if column.default
is not None:
619 kwargs.update(server_default=str(column.default))
620 if column.name
in pkey_columns:
621 kwargs.update(autoincrement=
False)
623 column_defs.append(Column(column.name, ctype, **kwargs))
627 def _field2dict(self, field, table_name):
628 """Convert afw schema field definition into a dict format.
632 field : `lsst.afw.table.Field`
633 Field in afw table schema.
640 Field attributes for SQL schema:
642 - ``name`` : field name (`str`)
643 - ``type`` : type name in SQL, e.g. "INT", "FLOAT" (`str`)
644 - ``nullable`` : `True` if column can be ``NULL`` (`bool`)
646 column = field.getName()
649 return dict(name=column, type=ctype, nullable=
True)
651 def _tableIndices(self, table_name, info):
652 """Return set of constraints/indices in a table
659 Additional options passed to SQLAlchemy index constructor.
664 List of SQLAlchemy index/constraint objects.
667 table_schema = self.
_schemas[table_name]
671 for index
in table_schema.indices:
672 if index.type ==
"INDEX":
673 index_defs.append(Index(self.
_prefix+index.name, *index.columns, info=info))
677 kwargs[
'name'] = self.
_prefix+index.name
678 if index.type ==
"PRIMARY":
679 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
680 elif index.type ==
"UNIQUE":
681 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
685 def _getDoubleType(self):
686 """DOUBLE type is database-specific, select one based on dialect.
690 type_object : `object`
691 Database-specific type definition.
693 if self.
_engine.name ==
'mysql':
694 from sqlalchemy.dialects.mysql
import DOUBLE
695 return DOUBLE(asdecimal=
False)
696 elif self.
_engine.name ==
'postgresql':
697 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
698 return DOUBLE_PRECISION
699 elif self.
_engine.name ==
'oracle':
700 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
701 return DOUBLE_PRECISION
702 elif self.
_engine.name ==
'sqlite':
704 from sqlalchemy.dialects.sqlite
import REAL
707 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + self.
_engine.name)