22 """Module responsible for APDB schema operations.
25 __all__ = [
"ColumnDef",
"IndexDef",
"TableDef",
26 "make_minimal_dia_object_schema",
"make_minimal_dia_source_schema",
29 from collections
import namedtuple
35 from sqlalchemy
import (Column, Index, MetaData, PrimaryKeyConstraint,
36 UniqueConstraint, Table)
37 from sqlalchemy.schema
import CreateTable, CreateIndex
38 from sqlalchemy.ext.compiler
import compiles
42 _LOG = logging.getLogger(__name__.partition(
".")[2])
54 ColumnDef = namedtuple(
'ColumnDef',
'name type nullable default description unit ucd')
60 IndexDef = namedtuple(
'IndexDef',
'name type columns')
67 TableDef = namedtuple(
'TableDef',
'name description columns indices')
71 """Define and create the minimal schema required for a DIAObject.
75 schema : `lsst.afw.table.Schema`
76 Minimal schema for DIAObjects.
78 schema = afwTable.SourceTable.makeMinimalSchema()
79 schema.addField(
"pixelId", type=
'L',
80 doc=
'Unique spherical pixelization identifier.')
81 schema.addField(
"nDiaSources", type=
'L')
86 """ Define and create the minimal schema required for a DIASource.
90 schema : `lsst.afw.table.Schema`
91 Minimal schema for DIASources.
93 schema = afwTable.SourceTable.makeMinimalSchema()
94 schema.addField(
"diaObjectId", type=
'L',
95 doc=
'Unique identifier of the DIAObject this source is '
97 schema.addField(
"ccdVisitId", type=
'L',
98 doc=
'Id of the exposure and ccd this object was detected '
100 schema.addField(
"psFlux", type=
'D',
101 doc=
'Calibrated PSF flux of this source.')
102 schema.addField(
"psFluxErr", type=
'D',
103 doc=
'Calibrated PSF flux err of this source.')
104 schema.addField(
"flags", type=
'L',
105 doc=
'Quality flags for this DIASource.')
106 schema.addField(
"pixelId", type=
'L',
107 doc=
'Unique spherical pixelization identifier.')
111 @compiles(CreateTable, "oracle")
112 def _add_suffixes_tbl(element, compiler, **kw):
113 """Add all needed suffixed for Oracle CREATE TABLE statement.
115 This is a special compilation method for CreateTable clause which
116 registers itself with SQLAlchemy using @compiles decotrator. Exact method
117 name does not matter. Client can pass a dict to ``info`` keyword argument
118 of Table constructor. If the dict has a key "oracle_tablespace" then its
119 value is used as tablespace name. If the dict has a key "oracle_iot" with
120 true value then IOT table is created. This method generates additional
121 clauses for CREATE TABLE statement which specify tablespace name and
122 "ORGANIZATION INDEX" for IOT.
124 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html
126 text = compiler.visit_create_table(element, **kw)
127 _LOG.debug(
"text: %r", text)
128 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
129 oracle_iot = element.element.info.get(
"oracle_iot",
False)
130 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
132 text +=
" ORGANIZATION INDEX"
133 if oracle_tablespace:
134 text +=
" TABLESPACE " + oracle_tablespace
135 _LOG.debug(
"text: %r", text)
139 @compiles(CreateIndex, "oracle")
140 def _add_suffixes_idx(element, compiler, **kw):
141 """Add all needed suffixed for Oracle CREATE INDEX statement.
143 This is a special compilation method for CreateIndex clause which
144 registers itself with SQLAlchemy using @compiles decotrator. Exact method
145 name does not matter. Client can pass a dict to ``info`` keyword argument
146 of Index constructor. If the dict has a key "oracle_tablespace" then its
147 value is used as tablespace name. This method generates additional
148 clause for CREATE INDEX statement which specifies tablespace name.
150 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html
152 text = compiler.visit_create_index(element, **kw)
153 _LOG.debug(
"text: %r", text)
154 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
155 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
156 if oracle_tablespace:
157 text +=
" TABLESPACE " + oracle_tablespace
158 _LOG.debug(
"text: %r", text)
163 """Class for management of APDB schema.
167 objects : `sqlalchemy.Table`
168 DiaObject table instance
169 objects_nightly : `sqlalchemy.Table`
170 DiaObjectNightly table instance, may be None
171 objects_last : `sqlalchemy.Table`
172 DiaObjectLast table instance, may be None
173 sources : `sqlalchemy.Table`
174 DiaSource table instance
175 forcedSources : `sqlalchemy.Table`
176 DiaForcedSource table instance
177 visits : `sqlalchemy.Table`
178 ApdbProtoVisits table instance
182 engine : `sqlalchemy.engine.Engine`
183 SQLAlchemy engine instance
184 dia_object_index : `str`
185 Indexing mode for DiaObject table, see `ApdbConfig.dia_object_index`
187 dia_object_nightly : `bool`
188 If `True` then create per-night DiaObject table as well.
190 Name of the YAML schema file.
191 extra_schema_file : `str`, optional
192 Name of the YAML schema file with extra column definitions.
193 column_map : `str`, optional
194 Name of the YAML file with column mappings.
195 afw_schemas : `dict`, optional
196 Dictionary with table name for a key and `afw.table.Schema`
197 for a value. Columns in schema will be added to standard APDB
198 schema (only if standard schema does not have matching column).
199 prefix : `str`, optional
200 Prefix to add to all scheam elements.
204 _afw_type_map = {
"I":
"INT",
211 _afw_type_map_reverse = {
"INT":
"I",
219 def __init__(self, engine, dia_object_index, dia_object_nightly,
220 schema_file, extra_schema_file=None, column_map=None,
221 afw_schemas=None, prefix=""):
238 column_map = os.path.expandvars(column_map)
239 _LOG.debug(
"Reading column map file %s", column_map)
240 with open(column_map)
as yaml_stream:
242 self.
_column_map_column_map = yaml.load(yaml_stream, Loader=yaml.SafeLoader)
243 _LOG.debug(
"column map: %s", self.
_column_map_column_map)
245 _LOG.debug(
"No column map file is given, initialize to empty")
259 FLOAT=sqlalchemy.types.Float,
260 DATETIME=sqlalchemy.types.TIMESTAMP,
261 BIGINT=sqlalchemy.types.BigInteger,
262 INTEGER=sqlalchemy.types.Integer,
263 INT=sqlalchemy.types.Integer,
264 TINYINT=sqlalchemy.types.Integer,
265 BLOB=sqlalchemy.types.LargeBinary,
266 CHAR=sqlalchemy.types.CHAR,
267 BOOL=sqlalchemy.types.Boolean)
272 def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
273 """Generate schema for all tables.
277 mysql_engine : `str`, optional
278 MySQL engine type to use for new tables.
279 oracle_tablespace : `str`, optional
280 Name of Oracle tablespace, only useful with oracle
281 oracle_iot : `bool`, optional
282 Make Index-organized DiaObjectLast table.
285 info = dict(oracle_tablespace=oracle_tablespace)
289 constraints = self.
_tableIndices_tableIndices(
'DiaObjectIndexHtmFirst', info)
291 constraints = self.
_tableIndices_tableIndices(
'DiaObject', info)
293 *(self.
_tableColumns_tableColumns(
'DiaObject') + constraints),
294 mysql_engine=mysql_engine,
300 table = Table(self.
_prefix_prefix+
'DiaObjectNightly', self.
_metadata_metadata,
302 mysql_engine=mysql_engine,
309 info2.update(oracle_iot=oracle_iot)
313 mysql_engine=mysql_engine,
318 for table_name
in (
'DiaSource',
'SSObject',
'DiaForcedSource',
'DiaObject_To_Object_Match'):
322 mysql_engine=mysql_engine,
324 if table_name ==
'DiaSource':
326 elif table_name ==
'DiaForcedSource':
330 table = Table(self.
_prefix_prefix+
'ApdbProtoVisits', self.
_metadata_metadata,
331 Column(
'visitId', sqlalchemy.types.BigInteger, nullable=
False),
332 Column(
'visitTime', sqlalchemy.types.TIMESTAMP, nullable=
False),
333 PrimaryKeyConstraint(
'visitId', name=self.
_prefix_prefix+
'PK_ApdbProtoVisits'),
334 Index(self.
_prefix_prefix+
'IDX_ApdbProtoVisits_vTime',
'visitTime', info=info),
335 mysql_engine=mysql_engine,
339 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
340 """Create or re-create all tables.
344 drop : `bool`, optional
345 If True then drop tables before creating new ones.
346 mysql_engine : `str`, optional
347 MySQL engine type to use for new tables.
348 oracle_tablespace : `str`, optional
349 Name of Oracle tablespace, only useful with oracle
350 oracle_iot : `bool`, optional
351 Make Index-organized DiaObjectLast table.
355 _LOG.debug(
"clear metadata")
357 _LOG.debug(
"re-do schema mysql_engine=%r oracle_tablespace=%r",
358 mysql_engine, oracle_tablespace)
359 self.
_makeTables_makeTables(mysql_engine=mysql_engine, oracle_tablespace=oracle_tablespace,
360 oracle_iot=oracle_iot)
364 _LOG.info(
'dropping all tables')
366 _LOG.info(
'creating all tables')
370 """Return afw schema for given table.
375 One of known APDB table names.
376 columns : `list` of `str`, optional
377 Include only given table columns in schema, by default all columns
382 schema : `lsst.afw.table.Schema`
384 Mapping of the table/result column names into schema key.
387 table = self.
_schemas_schemas[table_name]
388 col_map = self.
_column_map_column_map.get(table_name, {})
392 schema = afwTable.SourceTable.makeMinimalSchema()
393 for column
in table.columns:
394 if columns
and column.name
not in columns:
396 afw_col = col_map.get(column.name, column.name)
397 if afw_col
in schema.getNames():
399 key = schema.find(afw_col).getKey()
400 elif column.type
in (
"DOUBLE",
"FLOAT")
and column.unit ==
"deg":
405 key = schema.addField(afw_col,
407 doc=column.description
or "",
409 elif column.type ==
"BLOB":
413 units = column.unit
or ""
416 key = schema.addField(afw_col,
418 doc=column.description
or "",
420 parse_strict=
"silent",
423 key = schema.addField(afw_col,
425 doc=column.description
or "",
426 parse_strict=
"silent")
428 key = schema.addField(afw_col,
430 doc=column.description
or "",
432 parse_strict=
"silent")
433 col2afw[column.name] = key
435 return schema, col2afw
438 """Returns mapping of afw column names to Column definitions.
443 One of known APDB table names.
448 Mapping of afw column names to `ColumnDef` instances.
450 table = self.
_schemas_schemas[table_name]
451 col_map = self.
_column_map_column_map.get(table_name, {})
454 for column
in table.columns:
455 afw_name = col_map.get(column.name, column.name)
456 cmap[afw_name] = column
460 """Returns mapping of column names to Column definitions.
465 One of known APDB table names.
470 Mapping of column names to `ColumnDef` instances.
472 table = self.
_schemas_schemas[table_name]
473 cmap = {column.name: column
for column
in table.columns}
476 def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None):
477 """Create schema definitions for all tables.
479 Reads YAML schemas and builds dictionary containing `TableDef`
480 instances for each table.
485 Name of YAML file with standard cat schema.
486 extra_schema_file : `str`, optional
487 Name of YAML file with extra table information or `None`.
488 afw_schemas : `dict`, optional
489 Dictionary with table name for a key and `afw.table.Schema`
490 for a value. Columns in schema will be added to standard APDB
491 schema (only if standard schema does not have matching column).
496 Mapping of table names to `TableDef` instances.
499 schema_file = os.path.expandvars(schema_file)
500 _LOG.debug(
"Reading schema file %s", schema_file)
501 with open(schema_file)
as yaml_stream:
502 tables =
list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
504 _LOG.debug(
"Read %d tables from schema", len(tables))
506 if extra_schema_file:
507 extra_schema_file = os.path.expandvars(extra_schema_file)
508 _LOG.debug(
"Reading extra schema file %s", extra_schema_file)
509 with open(extra_schema_file)
as yaml_stream:
510 extras =
list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
512 schemas_extra = {table[
'table']: table
for table
in extras}
518 table_name = table[
'table']
519 if table_name
in schemas_extra:
520 columns = table[
'columns']
521 extra_columns = schemas_extra[table_name].get(
'columns', [])
522 extra_columns = {col[
'name']: col
for col
in extra_columns}
523 _LOG.debug(
"Extra columns for table %s: %s", table_name, extra_columns.keys())
525 for col
in table[
'columns']:
526 if col[
'name']
in extra_columns:
527 columns.append(extra_columns.pop(col[
'name']))
531 table[
'columns'] = columns +
list(extra_columns.values())
533 if 'indices' in schemas_extra[table_name]:
534 raise RuntimeError(
"Extra table definition contains indices, "
535 "merging is not implemented")
537 del schemas_extra[table_name]
540 tables += schemas_extra.values()
546 columns = table.get(
'columns', [])
548 table_name = table[
'table']
549 afw_schema = afw_schemas
and afw_schemas.get(table_name)
552 column_names = {col[
'name']
for col
in columns}
553 column_names_lower = {col.lower()
for col
in column_names}
554 for _, field
in afw_schema:
555 column = self.
_field2dict_field2dict(field, table_name)
556 if column[
'name']
not in column_names:
558 if column[
'name'].lower()
in column_names_lower:
559 raise ValueError(
"afw.table column name case does not match schema column name")
560 columns.append(column)
565 if "default" not in col:
567 if col[
'type']
not in (
"BLOB",
"DATETIME"):
570 default = col[
"default"]
574 nullable=col.get(
"nullable"),
576 description=col.get(
"description"),
577 unit=col.get(
"unit"),
579 table_columns.append(column)
582 for idx
in table.get(
'indices', []):
583 index =
IndexDef(name=idx.get(
'name'),
584 type=idx.get(
'type'),
585 columns=idx.get(
'columns'))
586 table_indices.append(index)
588 schemas[table_name] =
TableDef(name=table_name,
589 description=table.get(
'description'),
590 columns=table_columns,
591 indices=table_indices)
595 def _tableColumns(self, table_name):
596 """Return set of columns in a table
606 List of `Column` objects.
611 table_schema = self.
_schemas_schemas[table_name]
613 for index
in table_schema.indices:
614 if index.type ==
'PRIMARY':
615 pkey_columns =
set(index.columns)
620 for column
in table_schema.columns:
621 kwargs = dict(nullable=column.nullable)
622 if column.default
is not None:
623 kwargs.update(server_default=str(column.default))
624 if column.name
in pkey_columns:
625 kwargs.update(autoincrement=
False)
626 ctype = self.
_type_map_type_map[column.type]
627 column_defs.append(Column(column.name, ctype, **kwargs))
631 def _field2dict(self, field, table_name):
632 """Convert afw schema field definition into a dict format.
636 field : `lsst.afw.table.Field`
637 Field in afw table schema.
644 Field attributes for SQL schema:
646 - ``name`` : field name (`str`)
647 - ``type`` : type name in SQL, e.g. "INT", "FLOAT" (`str`)
648 - ``nullable`` : `True` if column can be ``NULL`` (`bool`)
650 column = field.getName()
652 ctype = self.
_afw_type_map_afw_type_map[field.getTypeString()]
653 return dict(name=column, type=ctype, nullable=
True)
655 def _tableIndices(self, table_name, info):
656 """Return set of constraints/indices in a table
663 Additional options passed to SQLAlchemy index constructor.
668 List of SQLAlchemy index/constraint objects.
671 table_schema = self.
_schemas_schemas[table_name]
675 for index
in table_schema.indices:
676 if index.type ==
"INDEX":
677 index_defs.append(Index(self.
_prefix_prefix+index.name, *index.columns, info=info))
681 kwargs[
'name'] = self.
_prefix_prefix+index.name
682 if index.type ==
"PRIMARY":
683 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
684 elif index.type ==
"UNIQUE":
685 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
689 def _getDoubleType(self):
690 """DOUBLE type is database-specific, select one based on dialect.
694 type_object : `object`
695 Database-specific type definition.
697 if self.
_engine_engine.name ==
'mysql':
698 from sqlalchemy.dialects.mysql
import DOUBLE
699 return DOUBLE(asdecimal=
False)
700 elif self.
_engine_engine.name ==
'postgresql':
701 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
702 return DOUBLE_PRECISION
703 elif self.
_engine_engine.name ==
'oracle':
704 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
705 return DOUBLE_PRECISION
706 elif self.
_engine_engine.name ==
'sqlite':
708 from sqlalchemy.dialects.sqlite
import REAL
711 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + self.
_engine_engine.name)
std::vector< SchemaItem< Flag > > * items
def __init__(self, engine, dia_object_index, dia_object_nightly, schema_file, extra_schema_file=None, column_map=None, afw_schemas=None, prefix="")
def _tableIndices(self, table_name, info)
def getAfwSchema(self, table_name, columns=None)
def getColumnMap(self, table_name)
def getAfwColumns(self, table_name)
def _tableColumns(self, table_name)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def _field2dict(self, field, table_name)
def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
dictionary _afw_type_map_reverse
def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None)
daf::base::PropertyList * list
daf::base::PropertySet * set
def make_minimal_dia_object_schema()
def make_minimal_dia_source_schema()