22 """Module responsible for PPDB schema operations.    25 __all__ = [
"ColumnDef", 
"IndexDef", 
"TableDef",
    26            "make_minimal_dia_object_schema", 
"make_minimal_dia_source_schema",
    29 from collections 
import namedtuple
    34 from sqlalchemy 
import (Column, Index, MetaData, PrimaryKeyConstraint,
    35                         UniqueConstraint, Table)
    36 from sqlalchemy.schema 
import CreateTable, CreateIndex
    37 from sqlalchemy.ext.compiler 
import compiles
    41 _LOG = logging.getLogger(__name__.partition(
".")[2])  
    53 ColumnDef = namedtuple(
'ColumnDef', 
'name type nullable default description unit ucd')
    59 IndexDef = namedtuple(
'IndexDef', 
'name type columns')
    66 TableDef = namedtuple(
'TableDef', 
'name description columns indices')
    70     """Define and create the minimal schema required for a DIAObject.    74     schema : `lsst.afw.table.Schema`    75         Minimal schema for DIAObjects.    77     schema = afwTable.SourceTable.makeMinimalSchema()
    78     schema.addField(
"pixelId", type=
'L',
    79                     doc=
'Unique spherical pixelization identifier.')
    80     schema.addField(
"nDiaSources", type=
'L')
    85     """ Define and create the minimal schema required for a DIASource.    89     schema : `lsst.afw.table.Schema`    90         Minimal schema for DIASources.    92     schema = afwTable.SourceTable.makeMinimalSchema()
    93     schema.addField(
"diaObjectId", type=
'L',
    94                     doc=
'Unique identifier of the DIAObject this source is '    96     schema.addField(
"ccdVisitId", type=
'L',
    97                     doc=
'Id of the exposure and ccd this object was detected '    99     schema.addField(
"psFlux", type=
'D',
   100                     doc=
'Calibrated PSF flux of this source.')
   101     schema.addField(
"psFluxErr", type=
'D',
   102                     doc=
'Calibrated PSF flux err of this source.')
   103     schema.addField(
"flags", type=
'L',
   104                     doc=
'Quality flags for this DIASource.')
   105     schema.addField(
"pixelId", type=
'L',
   106                     doc=
'Unique spherical pixelization identifier.')
   110 @compiles(CreateTable, 
"oracle")
   111 def _add_suffixes_tbl(element, compiler, **kw):
   112     """Add all needed suffixed for Oracle CREATE TABLE statement.   114     This is a special compilation method for CreateTable clause which   115     registers itself with SQLAlchemy using @compiles decotrator. Exact method   116     name does not matter. Client can pass a dict to ``info`` keyword argument   117     of Table constructor. If the dict has a key "oracle_tablespace" then its   118     value is used as tablespace name. If the dict has a key "oracle_iot" with   119     true value then IOT table is created. This method generates additional   120     clauses for CREATE TABLE statement which specify tablespace name and   121     "ORGANIZATION INDEX" for IOT.   123     .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html   125     text = compiler.visit_create_table(element, **kw)
   126     _LOG.debug(
"text: %r", text)
   127     oracle_tablespace = element.element.info.get(
"oracle_tablespace")
   128     oracle_iot = element.element.info.get(
"oracle_iot", 
False)
   129     _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
   131         text += 
" ORGANIZATION INDEX"   132     if oracle_tablespace:
   133         text += 
" TABLESPACE " + oracle_tablespace
   134     _LOG.debug(
"text: %r", text)
   138 @compiles(CreateIndex, 
"oracle")
   139 def _add_suffixes_idx(element, compiler, **kw):
   140     """Add all needed suffixed for Oracle CREATE INDEX statement.   142     This is a special compilation method for CreateIndex clause which   143     registers itself with SQLAlchemy using @compiles decotrator. Exact method   144     name does not matter. Client can pass a dict to ``info`` keyword argument   145     of Index constructor. If the dict has a key "oracle_tablespace" then its   146     value is used as tablespace name. This method generates additional   147     clause for CREATE INDEX statement which specifies tablespace name.   149     .. seealso:: https://docs.sqlalchemy.org/en/latest/core/compiler.html   151     text = compiler.visit_create_index(element, **kw)
   152     _LOG.debug(
"text: %r", text)
   153     oracle_tablespace = element.element.info.get(
"oracle_tablespace")
   154     _LOG.debug(
"oracle_tablespace: %r", oracle_tablespace)
   155     if oracle_tablespace:
   156         text += 
" TABLESPACE " + oracle_tablespace
   157     _LOG.debug(
"text: %r", text)
   162     """Class for management of PPDB schema.   166     objects : `sqlalchemy.Table`   167         DiaObject table instance   168     objects_nightly : `sqlalchemy.Table`   169         DiaObjectNightly table instance, may be None   170     objects_last : `sqlalchemy.Table`   171         DiaObjectLast table instance, may be None   172     sources : `sqlalchemy.Table`   173         DiaSource table instance   174     forcedSources : `sqlalchemy.Table`   175         DiaForcedSource table instance   176     visits : `sqlalchemy.Table`   177         PpdbProtoVisits table instance   181     engine : `sqlalchemy.engine.Engine`   182         SQLAlchemy engine instance   183     dia_object_index : `str`   184         Indexing mode for DiaObject table, see `PpdbConfig.dia_object_index`   186     dia_object_nightly : `bool`   187         If `True` then create per-night DiaObject table as well.   189         Name of the YAML schema file.   190     extra_schema_file : `str`, optional   191         Name of the YAML schema file with extra column definitions.   192     column_map : `str`, optional   193         Name of the YAML file with column mappings.   194     afw_schemas : `dict`, optional   195         Dictionary with table name for a key and `afw.table.Schema`   196         for a value. Columns in schema will be added to standard PPDB   197         schema (only if standard schema does not have matching column).   198     prefix : `str`, optional   199         Prefix to add to all scheam elements.   203     _afw_type_map = {
"I": 
"INT",
   209     _afw_type_map_reverse = {
"INT": 
"I",
   216     def __init__(self, engine, dia_object_index, dia_object_nightly,
   217                  schema_file, extra_schema_file=None, column_map=None,
   218                  afw_schemas=None, prefix=""):
   235             _LOG.debug(
"Reading column map file %s", column_map)
   236             with open(column_map) 
as yaml_stream:
   241             _LOG.debug(
"No column map file is given, initialize to empty")
   255                               FLOAT=sqlalchemy.types.Float,
   256                               DATETIME=sqlalchemy.types.TIMESTAMP,
   257                               BIGINT=sqlalchemy.types.BigInteger,
   258                               INTEGER=sqlalchemy.types.Integer,
   259                               INT=sqlalchemy.types.Integer,
   260                               TINYINT=sqlalchemy.types.Integer,
   261                               BLOB=sqlalchemy.types.LargeBinary,
   262                               CHAR=sqlalchemy.types.CHAR)
   267     def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
   268         """Generate schema for all tables.   272         mysql_engine : `str`, optional   273             MySQL engine type to use for new tables.   274         oracle_tablespace : `str`, optional   275             Name of Oracle tablespace, only useful with oracle   276         oracle_iot : `bool`, optional   277             Make Index-organized DiaObjectLast table.   280         info = dict(oracle_tablespace=oracle_tablespace)
   284             constraints = self.
_tableIndices(
'DiaObjectIndexHtmFirst', info)
   289                       mysql_engine=mysql_engine,
   297                           mysql_engine=mysql_engine,
   304             info2.update(oracle_iot=oracle_iot)
   308                           mysql_engine=mysql_engine,
   313         for table_name 
in (
'DiaSource', 
'SSObject', 
'DiaForcedSource', 
'DiaObject_To_Object_Match'):
   317                           mysql_engine=mysql_engine,
   319             if table_name == 
'DiaSource':
   321             elif table_name == 
'DiaForcedSource':
   326                       Column(
'visitId', sqlalchemy.types.BigInteger, nullable=
False),
   327                       Column(
'visitTime', sqlalchemy.types.TIMESTAMP, nullable=
False),
   328                       PrimaryKeyConstraint(
'visitId', name=self.
_prefix+
'PK_PpdbProtoVisits'),
   329                       Index(self.
_prefix+
'IDX_PpdbProtoVisits_vTime', 
'visitTime', info=info),
   330                       mysql_engine=mysql_engine,
   334     def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
   335         """Create or re-create all tables.   339         drop : `bool`, optional   340             If True then drop tables before creating new ones.   341         mysql_engine : `str`, optional   342             MySQL engine type to use for new tables.   343         oracle_tablespace : `str`, optional   344             Name of Oracle tablespace, only useful with oracle   345         oracle_iot : `bool`, optional   346             Make Index-organized DiaObjectLast table.   350         _LOG.debug(
"clear metadata")
   352         _LOG.debug(
"re-do schema mysql_engine=%r oracle_tablespace=%r",
   353                    mysql_engine, oracle_tablespace)
   354         self.
_makeTables(mysql_engine=mysql_engine, oracle_tablespace=oracle_tablespace,
   355                          oracle_iot=oracle_iot)
   359             _LOG.info(
'dropping all tables')
   361         _LOG.info(
'creating all tables')
   365         """Return afw schema for given table.   370             One of known PPDB table names.   371         columns : `list` of `str`, optional   372             Include only given table columns in schema, by default all columns   377         schema : `lsst.afw.table.Schema`   379             Mapping of the table/result column names into schema key.   387         schema = afwTable.SourceTable.makeMinimalSchema()
   388         for column 
in table.columns:
   389             if columns 
and column.name 
not in columns:
   391             afw_col = col_map.get(column.name, column.name)
   392             if afw_col 
in schema.getNames():
   394                 key = schema.find(afw_col).getKey()
   395             elif column.type 
in (
"DOUBLE", 
"FLOAT") 
and column.unit == 
"deg":
   400                 key = schema.addField(afw_col,
   402                                       doc=column.description 
or "",
   404             elif column.type == 
"BLOB":
   408                 units = column.unit 
or ""   411                     key = schema.addField(afw_col,
   413                                           doc=column.description 
or "",
   415                                           parse_strict=
"silent",
   418                     key = schema.addField(afw_col,
   420                                           doc=column.description 
or "",
   421                                           parse_strict=
"silent")
   423                     key = schema.addField(afw_col,
   425                                           doc=column.description 
or "",
   427                                           parse_strict=
"silent")
   428             col2afw[column.name] = key
   430         return schema, col2afw
   433         """Returns mapping of afw column names to Column definitions.   438             One of known PPDB table names.   443             Mapping of afw column names to `ColumnDef` instances.   449         for column 
in table.columns:
   450             afw_name = col_map.get(column.name, column.name)
   451             cmap[afw_name] = column
   455         """Returns mapping of column names to Column definitions.   460             One of known PPDB table names.   465             Mapping of column names to `ColumnDef` instances.   468         cmap = {column.name: column 
for column 
in table.columns}
   471     def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None):
   472         """Create schema definitions for all tables.   474         Reads YAML schemas and builds dictionary containing `TableDef`   475         instances for each table.   480             Name of YAML file with standard cat schema.   481         extra_schema_file : `str`, optional   482             Name of YAML file with extra table information or `None`.   483         afw_schemas : `dict`, optional   484             Dictionary with table name for a key and `afw.table.Schema`   485             for a value. Columns in schema will be added to standard PPDB   486             schema (only if standard schema does not have matching column).   491             Mapping of table names to `TableDef` instances.   494         _LOG.debug(
"Reading schema file %s", schema_file)
   495         with open(schema_file) 
as yaml_stream:
   496             tables = 
list(yaml.load_all(yaml_stream))
   498         _LOG.debug(
"Read %d tables from schema", len(tables))
   500         if extra_schema_file:
   501             _LOG.debug(
"Reading extra schema file %s", extra_schema_file)
   502             with open(extra_schema_file) 
as yaml_stream:
   503                 extras = 
list(yaml.load_all(yaml_stream))
   505                 schemas_extra = {table[
'table']: table 
for table 
in extras}
   511             table_name = table[
'table']
   512             if table_name 
in schemas_extra:
   513                 columns = table[
'columns']
   514                 extra_columns = schemas_extra[table_name].get(
'columns', [])
   515                 extra_columns = {col[
'name']: col 
for col 
in extra_columns}
   516                 _LOG.debug(
"Extra columns for table %s: %s", table_name, extra_columns.keys())
   518                 for col 
in table[
'columns']:
   519                     if col[
'name'] 
in extra_columns:
   520                         columns.append(extra_columns.pop(col[
'name']))
   524                 table[
'columns'] = columns + 
list(extra_columns.values())
   526                 if 'indices' in schemas_extra[table_name]:
   527                     raise RuntimeError(
"Extra table definition contains indices, "   528                                        "merging is not implemented")
   530                 del schemas_extra[table_name]
   533         tables += schemas_extra.values()
   539             columns = table.get(
'columns', [])
   541             table_name = table[
'table']
   542             afw_schema = afw_schemas 
and afw_schemas.get(table_name)
   545                 column_names = {col[
'name'] 
for col 
in columns}
   546                 column_names_lower = {col.lower() 
for col 
in column_names}
   547                 for _, field 
in afw_schema:
   549                     if column[
'name'] 
not in column_names:
   551                         if column[
'name'].lower() 
in column_names_lower:
   552                             raise ValueError(
"afw.table column name case does not match schema column name")
   553                         columns.append(column)
   558                 if "default" not in col:
   560                     if col[
'type'] 
not in (
"BLOB", 
"DATETIME"):
   563                     default = col[
"default"]
   567                                    nullable=col.get(
"nullable"),
   569                                    description=col.get(
"description"),
   570                                    unit=col.get(
"unit"),
   572                 table_columns.append(column)
   575             for idx 
in table.get(
'indices', []):
   576                 index = 
IndexDef(name=idx.get(
'name'),
   577                                  type=idx.get(
'type'),
   578                                  columns=idx.get(
'columns'))
   579                 table_indices.append(index)
   581             schemas[table_name] = 
TableDef(name=table_name,
   582                                            description=table.get(
'description'),
   583                                            columns=table_columns,
   584                                            indices=table_indices)
   588     def _tableColumns(self, table_name):
   589         """Return set of columns in a table   599             List of `Column` objects.   604         table_schema = self.
_schemas[table_name]
   606         for index 
in table_schema.indices:
   607             if index.type == 
'PRIMARY':
   608                 pkey_columns = 
set(index.columns)
   613         for column 
in table_schema.columns:
   614             kwargs = dict(nullable=column.nullable)
   615             if column.default 
is not None:
   616                 kwargs.update(server_default=
str(column.default))
   617             if column.name 
in pkey_columns:
   618                 kwargs.update(autoincrement=
False)
   620             column_defs.append(Column(column.name, ctype, **kwargs))
   624     def _field2dict(self, field, table_name):
   625         """Convert afw schema field definition into a dict format.   629         field : `lsst.afw.table.Field`   630             Field in afw table schema.   637             Field attributes for SQL schema:   639             - ``name`` : field name (`str`)   640             - ``type`` : type name in SQL, e.g. "INT", "FLOAT" (`str`)   641             - ``nullable`` : `True` if column can be ``NULL`` (`bool`)   643         column = field.getName()
   646         return dict(name=column, type=ctype, nullable=
True)
   648     def _tableIndices(self, table_name, info):
   649         """Return set of constraints/indices in a table   656             Additional options passed to SQLAlchemy index constructor.   661             List of SQLAlchemy index/constraint objects.   664         table_schema = self.
_schemas[table_name]
   668         for index 
in table_schema.indices:
   669             if index.type == 
"INDEX":
   670                 index_defs.append(Index(self.
_prefix+index.name, *index.columns, info=info))
   674                     kwargs[
'name'] = self.
_prefix+index.name
   675                 if index.type == 
"PRIMARY":
   676                     index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
   677                 elif index.type == 
"UNIQUE":
   678                     index_defs.append(UniqueConstraint(*index.columns, **kwargs))
   682     def _getDoubleType(self):
   683         """DOUBLE type is database-specific, select one based on dialect.   687         type_object : `object`   688             Database-specific type definition.   690         if self.
_engine.name == 
'mysql':
   691             from sqlalchemy.dialects.mysql 
import DOUBLE
   692             return DOUBLE(asdecimal=
False)
   693         elif self.
_engine.name == 
'postgresql':
   694             from sqlalchemy.dialects.postgresql 
import DOUBLE_PRECISION
   695             return DOUBLE_PRECISION
   696         elif self.
_engine.name == 
'oracle':
   697             from sqlalchemy.dialects.oracle 
import DOUBLE_PRECISION
   698             return DOUBLE_PRECISION
   699         elif self.
_engine.name == 
'sqlite':
   701             from sqlalchemy.dialects.sqlite 
import REAL
   704             raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + self.
_engine.name)
 
def getAfwColumns(self, table_name)
def _tableIndices(self, table_name, info)
def getColumnMap(self, table_name)
def _tableColumns(self, table_name)
dictionary _afw_type_map_reverse
daf::base::PropertySet * set
def _buildSchemas(self, schema_file, extra_schema_file=None, afw_schemas=None)
def _makeTables(self, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
def __init__(self, engine, dia_object_index, dia_object_nightly, schema_file, extra_schema_file=None, column_map=None, afw_schemas=None, prefix="")
def make_minimal_dia_source_schema()
def getAfwSchema(self, table_name, columns=None)
std::vector< SchemaItem< Flag > > * items
def make_minimal_dia_object_schema()
daf::base::PropertyList * list
def _field2dict(self, field, table_name)