22 """Module responsible for PPDB schema operations. 25 __all__ = [
"ColumnDef",
"IndexDef",
"TableDef",
26 "make_minimal_dia_object_schema",
"make_minimal_dia_source_schema",
29 from collections
import namedtuple
34 from sqlalchemy
import (Column, Index, MetaData, PrimaryKeyConstraint,
35 UniqueConstraint, Table)
36 from sqlalchemy.schema
import CreateTable, CreateIndex
37 from sqlalchemy.ext.compiler
import compiles
41 _LOG = logging.getLogger(__name__.partition(
".")[2])
53 ColumnDef = namedtuple(
'ColumnDef',
'name type nullable default description unit ucd')
59 IndexDef = namedtuple(
'IndexDef',
'name type columns')
66 TableDef = namedtuple(
'TableDef',
'name description columns indices')
70 """Define and create the minimal schema required for a DIAObject. 74 schema : `lsst.afw.table.Schema` 75 Minimal schema for DIAObjects. 77 schema = afwTable.SourceTable.makeMinimalSchema()
78 schema.addField(
"pixelId", type=
'L',
79 doc=
'Unique spherical pixelization identifier.')
80 schema.addField(
"nDiaSources", type=
'L')
85 """ Define and create the minimal schema required for a DIASource. 89 schema : `lsst.afw.table.Schema` 90 Minimal schema for DIASources. 92 schema = afwTable.SourceTable.makeMinimalSchema()
93 schema.addField(
"diaObjectId", type=
'L',
94 doc=
'Unique identifier of the DIAObject this source is ' 96 schema.addField(
"ccdVisitId", type=
'L',
97 doc=
'Id of the exposure and ccd this object was detected ' 99 schema.addField(
"psFlux", type=
'D',
100 doc=
'Calibrated PSF flux of this source.')
101 schema.addField(
"psFluxErr", type=
'D',
102 doc=
'Calibrated PSF flux err of this source.')
103 schema.addField(
"flags", type=
'L',
104 doc=
'Quality flags for this DIASource.')
105 schema.addField(
"pixelId", type=
'L',
106 doc=
'Unique spherical pixelization identifier.')
110 @compiles(CreateTable,
"oracle")
111 def _add_suffixes_tbl(element, compiler, **kw):
112 """Add all needed suffixed for Oracle CREATE TABLE statement. 114 This is a special compilation method for CreateTable clause which 115 registers itself with SQLAlchemy using @compiles decotrator. Exact method 116 name does not matter. Client can pass a dict to ``info`` keyword argument 117 of Table constructor. If the dict has a key "oracle_tablespace" then its 118 value is used as tablespace name. If the dict has a key "oracle_iot" with 119 true value then IOT table is created. This method generates additional 120 clauses for CREATE TABLE statement which specify tablespace name and 121 "ORGANIZATION INDEX" for IOT. 123 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html 125 text = compiler.visit_create_table(element, **kw)
126 _LOG.debug(
"text: %r", text)
127 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
128 oracle_iot = element.element.info.get(
"oracle_iot",
False)
129 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
131 text +=
" ORGANIZATION INDEX" 132 if oracle_tablespace:
133 text +=
" TABLESPACE " + oracle_tablespace
134 _LOG.debug(
"text: %r", text)
138 @compiles(CreateIndex,
"oracle")
139 def _add_suffixes_idx(element, compiler, **kw):
140 """Add all needed suffixed for Oracle CREATE INDEX statement. 142 This is a special compilation method for CreateIndex clause which 143 registers itself with SQLAlchemy using @compiles decotrator. Exact method 144 name does not matter. Client can pass a dict to ``info`` keyword argument 145 of Index constructor. If the dict has a key "oracle_tablespace" then its 146 value is used as tablespace name. This method generates additional 147 clause for CREATE INDEX statement which specifies tablespace name. 149 .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html 151 text = compiler.visit_create_index(element, **kw)
152 _LOG.debug(
"text: %r", text)
153 oracle_tablespace = element.element.info.get(
"oracle_tablespace")
154 _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
155 if oracle_tablespace:
156 text +=
" TABLESPACE " + oracle_tablespace
157 _LOG.debug(
"text: %r", text)
162 """Class for management of PPDB schema. 166 objects : `sqlalchemy.Table` 167 DiaObject table instance 168 objects_nightly : `sqlalchemy.Table` 169 DiaObjectNightly table instance, may be None 170 objects_last : `sqlalchemy.Table` 171 DiaObjectLast table instance, may be None 172 sources : `sqlalchemy.Table` 173 DiaSource table instance 174 forcedSources : `sqlalchemy.Table` 175 DiaForcedSource table instance 176 visits : `sqlalchemy.Table` 177 PpdbProtoVisits table instance 181 engine : `sqlalchemy.engine.Engine` 182 SQLAlchemy engine instance 183 dia_object_index : `str` 184 Indexing mode for DiaObject table, see `PpdbConfig.dia_object_index` 186 dia_object_nightly : `bool` 187 If `True` then create per-night DiaObject table as well. 189 Name of the YAML schema file. 190 extra_schema_file : `str`, optional 191 Name of the YAML schema file with extra column definitions. 192 column_map : `str`, optional 193 Name of the YAML file with column mappings. 194 afw_schemas : `dict`, optional 195 Dictionary with table name for a key and `afw.table.Schema` 196 for a value. Columns in schema will be added to standard PPDB 197 schema (only if standard schema does not have matching column). 198 prefix : `str`, optional 199 Prefix to add to all scheam elements. 203 _afw_type_map = {
"I":
"INT",
209 _afw_type_map_reverse = {
"INT":
"I",
216 def __init__(self, engine, dia_object_index, dia_object_nightly,
217 schema_file, extra_schema_file=None, column_map=None,
218 afw_schemas=None, prefix=""):
235 _LOG.debug(
"Reading column map file %s", column_map)
236 with open(column_map)
as yaml_stream:
241 _LOG.debug(
"No column map file is given, initialize to empty")
255 FLOAT=sqlalchemy.types.Float,
256 DATETIME=sqlalchemy.types.TIMESTAMP,
257 BIGINT=sqlalchemy.types.BigInteger,
258 INTEGER=sqlalchemy.types.Integer,
259 INT=sqlalchemy.types.Integer,
260 TINYINT=sqlalchemy.types.Integer,
261 BLOB=sqlalchemy.types.LargeBinary,
262 CHAR=sqlalchemy.types.CHAR)
267 def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
268 """Generate schema for all tables. 272 mysql_engine : `str`, optional 273 MySQL engine type to use for new tables. 274 oracle_tablespace : `str`, optional 275 Name of Oracle tablespace, only useful with oracle 276 oracle_iot : `bool`, optional 277 Make Index-organized DiaObjectLast table. 280 info = dict(oracle_tablespace=oracle_tablespace)
284 constraints = self.
_tableIndices(
'DiaObjectIndexHtmFirst', info)
289 mysql_engine=mysql_engine,
297 mysql_engine=mysql_engine,
304 info2.update(oracle_iot=oracle_iot)
308 mysql_engine=mysql_engine,
313 for table_name
in (
'DiaSource',
'SSObject',
'DiaForcedSource',
'DiaObject_To_Object_Match'):
317 mysql_engine=mysql_engine,
319 if table_name ==
'DiaSource':
321 elif table_name ==
'DiaForcedSource':
326 Column(
'visitId', sqlalchemy.types.BigInteger, nullable=
False),
327 Column(
'visitTime', sqlalchemy.types.TIMESTAMP, nullable=
False),
328 PrimaryKeyConstraint(
'visitId', name=self.
_prefix+
'PK_PpdbProtoVisits'),
329 Index(self.
_prefix+
'IDX_PpdbProtoVisits_vTime',
'visitTime', info=info),
330 mysql_engine=mysql_engine,
334 def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
335 """Create or re-create all tables. 339 drop : `bool`, optional 340 If True then drop tables before creating new ones. 341 mysql_engine : `str`, optional 342 MySQL engine type to use for new tables. 343 oracle_tablespace : `str`, optional 344 Name of Oracle tablespace, only useful with oracle 345 oracle_iot : `bool`, optional 346 Make Index-organized DiaObjectLast table. 350 _LOG.debug(
"clear metadata")
352 _LOG.debug(
"re-do schema mysql_engine=%r oracle_tablespace=%r",
353 mysql_engine, oracle_tablespace)
354 self.
_makeTables(mysql_engine=mysql_engine, oracle_tablespace=oracle_tablespace,
355 oracle_iot=oracle_iot)
359 _LOG.info(
'dropping all tables')
361 _LOG.info(
'creating all tables')
365 """Return afw schema for given table. 370 One of known PPDB table names. 371 columns : `list` of `str`, optional 372 Include only given table columns in schema, by default all columns 377 schema : `lsst.afw.table.Schema` 379 Mapping of the table/result column names into schema key. 387 schema = afwTable.SourceTable.makeMinimalSchema()
388 for column
in table.columns:
389 if columns
and column.name
not in columns:
391 afw_col = col_map.get(column.name, column.name)
392 if afw_col
in schema.getNames():
394 key = schema.find(afw_col).getKey()
395 elif column.type
in (
"DOUBLE",
"FLOAT")
and column.unit ==
"deg":
400 key = schema.addField(afw_col,
402 doc=column.description
or "",
404 elif column.type ==
"BLOB":
408 units = column.unit
or "" 411 key = schema.addField(afw_col,
413 doc=column.description
or "",
415 parse_strict=
"silent",
418 key = schema.addField(afw_col,
420 doc=column.description
or "",
421 parse_strict=
"silent")
423 key = schema.addField(afw_col,
425 doc=column.description
or "",
427 parse_strict=
"silent")
428 col2afw[column.name] = key
430 return schema, col2afw
433 """Returns mapping of afw column names to Column definitions. 438 One of known PPDB table names. 443 Mapping of afw column names to `ColumnDef` instances. 449 for column
in table.columns:
450 afw_name = col_map.get(column.name, column.name)
451 cmap[afw_name] = column
455 """Returns mapping of column names to Column definitions. 460 One of known PPDB table names. 465 Mapping of column names to `ColumnDef` instances. 468 cmap = {column.name: column
for column
in table.columns}
471 def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None):
472 """Create schema definitions for all tables. 474 Reads YAML schemas and builds dictionary containing `TableDef` 475 instances for each table. 480 Name of YAML file with standard cat schema. 481 extra_schema_file : `str`, optional 482 Name of YAML file with extra table information or `None`. 483 afw_schemas : `dict`, optional 484 Dictionary with table name for a key and `afw.table.Schema` 485 for a value. Columns in schema will be added to standard PPDB 486 schema (only if standard schema does not have matching column). 491 Mapping of table names to `TableDef` instances. 494 _LOG.debug(
"Reading schema file %s", schema_file)
495 with open(schema_file)
as yaml_stream:
496 tables =
list(yaml.load_all(yaml_stream))
498 _LOG.debug(
"Read %d tables from schema", len(tables))
500 if extra_schema_file:
501 _LOG.debug(
"Reading extra schema file %s", extra_schema_file)
502 with open(extra_schema_file)
as yaml_stream:
503 extras =
list(yaml.load_all(yaml_stream))
505 schemas_extra = {table[
'table']: table
for table
in extras}
511 table_name = table[
'table']
512 if table_name
in schemas_extra:
513 columns = table[
'columns']
514 extra_columns = schemas_extra[table_name].get(
'columns', [])
515 extra_columns = {col[
'name']: col
for col
in extra_columns}
516 _LOG.debug(
"Extra columns for table %s: %s", table_name, extra_columns.keys())
518 for col
in table[
'columns']:
519 if col[
'name']
in extra_columns:
520 columns.append(extra_columns.pop(col[
'name']))
524 table[
'columns'] = columns +
list(extra_columns.values())
526 if 'indices' in schemas_extra[table_name]:
527 raise RuntimeError(
"Extra table definition contains indices, " 528 "merging is not implemented")
530 del schemas_extra[table_name]
533 tables += schemas_extra.values()
539 columns = table.get(
'columns', [])
541 table_name = table[
'table']
542 afw_schema = afw_schemas
and afw_schemas.get(table_name)
545 column_names = {col[
'name']
for col
in columns}
546 column_names_lower = {col.lower()
for col
in column_names}
547 for _, field
in afw_schema:
549 if column[
'name']
not in column_names:
551 if column[
'name'].lower()
in column_names_lower:
552 raise ValueError(
"afw.table column name case does not match schema column name")
553 columns.append(column)
558 if "default" not in col:
560 if col[
'type']
not in (
"BLOB",
"DATETIME"):
563 default = col[
"default"]
567 nullable=col.get(
"nullable"),
569 description=col.get(
"description"),
570 unit=col.get(
"unit"),
572 table_columns.append(column)
575 for idx
in table.get(
'indices', []):
576 index =
IndexDef(name=idx.get(
'name'),
577 type=idx.get(
'type'),
578 columns=idx.get(
'columns'))
579 table_indices.append(index)
581 schemas[table_name] =
TableDef(name=table_name,
582 description=table.get(
'description'),
583 columns=table_columns,
584 indices=table_indices)
588 def _tableColumns(self, table_name):
589 """Return set of columns in a table 599 List of `Column` objects. 604 table_schema = self.
_schemas[table_name]
606 for index
in table_schema.indices:
607 if index.type ==
'PRIMARY':
608 pkey_columns =
set(index.columns)
613 for column
in table_schema.columns:
614 kwargs = dict(nullable=column.nullable)
615 if column.default
is not None:
616 kwargs.update(server_default=
str(column.default))
617 if column.name
in pkey_columns:
618 kwargs.update(autoincrement=
False)
620 column_defs.append(Column(column.name, ctype, **kwargs))
624 def _field2dict(self, field, table_name):
625 """Convert afw schema field definition into a dict format. 629 field : `lsst.afw.table.Field` 630 Field in afw table schema. 637 Field attributes for SQL schema: 639 - ``name`` : field name (`str`) 640 - ``type`` : type name in SQL, e.g. "INT", "FLOAT" (`str`) 641 - ``nullable`` : `True` if column can be ``NULL`` (`bool`) 643 column = field.getName()
646 return dict(name=column, type=ctype, nullable=
True)
648 def _tableIndices(self, table_name, info):
649 """Return set of constraints/indices in a table 656 Additional options passed to SQLAlchemy index constructor. 661 List of SQLAlchemy index/constraint objects. 664 table_schema = self.
_schemas[table_name]
668 for index
in table_schema.indices:
669 if index.type ==
"INDEX":
670 index_defs.append(Index(self.
_prefix+index.name, *index.columns, info=info))
674 kwargs[
'name'] = self.
_prefix+index.name
675 if index.type ==
"PRIMARY":
676 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
677 elif index.type ==
"UNIQUE":
678 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
682 def _getDoubleType(self):
683 """DOUBLE type is database-specific, select one based on dialect. 687 type_object : `object` 688 Database-specific type definition. 690 if self.
_engine.name ==
'mysql':
691 from sqlalchemy.dialects.mysql
import DOUBLE
692 return DOUBLE(asdecimal=
False)
693 elif self.
_engine.name ==
'postgresql':
694 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
695 return DOUBLE_PRECISION
696 elif self.
_engine.name ==
'oracle':
697 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
698 return DOUBLE_PRECISION
699 elif self.
_engine.name ==
'sqlite':
701 from sqlalchemy.dialects.sqlite
import REAL
704 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + self.
_engine.name)
def getAfwColumns(self, table_name)
def _tableIndices(self, table_name, info)
def getColumnMap(self, table_name)
def _tableColumns(self, table_name)
dictionary _afw_type_map_reverse
daf::base::PropertySet * set
def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None)
def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def __init__(self, engine, dia_object_index, dia_object_nightly, schema_file, extra_schema_file=None, column_map=None, afw_schemas=None, prefix="")
def make_minimal_dia_source_schema()
def getAfwSchema(self, table_name, columns=None)
std::vector< SchemaItem< Flag > > * items
def make_minimal_dia_object_schema()
daf::base::PropertyList * list
def _field2dict(self, field, table_name)