LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | List of all members
lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema Class Reference
Inheritance diagram for lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema:
lsst.dax.apdb.apdbSchema.ApdbSchema

Public Member Functions

def __init__ (self, cassandra.cluster.Session session, str schema_file, Optional[str] extra_schema_file=None, str prefix="", str packing="none", bool time_partition_tables=False)
 
str tableName (self, ApdbTables table_name)
 
Mapping[str, ColumnDefgetColumnMap (self, ApdbTables table_name)
 
List[str] partitionColumns (self, ApdbTables table_name)
 
List[str] clusteringColumns (self, ApdbTables table_name)
 
None makeSchema (self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
 
List[ColumnDefpackedColumns (self, ApdbTables table_name)
 

Public Attributes

 tableSchemas
 

Detailed Description

Class for management of APDB schema.

Parameters
----------
session : `cassandra.cluster.Session`
    Cassandra session object
schema_file : `str`
    Name of the YAML schema file.
extra_schema_file : `str`, optional
    Name of the YAML schema file with extra column definitions.
prefix : `str`, optional
    Prefix to add to all schema elements.
packing : `str`
    Type of packing to apply to columns, string "none" disable packing,
    any other value enables it.
time_partition_tables : `bool`
    If True then schema will have a separate table for each time partition.

Definition at line 39 of file apdbCassandraSchema.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.__init__ (   self,
cassandra.cluster.Session  session,
str  schema_file,
Optional[str]   extra_schema_file = None,
str   prefix = "",
str   packing = "none",
bool   time_partition_tables = False 
)

Definition at line 71 of file apdbCassandraSchema.py.

73  packing: str = "none", time_partition_tables: bool = False):
74 
75  super().__init__(schema_file, extra_schema_file)
76 
77  self._session = session
78  self._prefix = prefix
79  self._packing = packing
80 
81  # add columns and index for partitioning.
82  self._ignore_tables = []
83  for table, tableDef in self.tableSchemas.items():
84  columns = []
85  if table is ApdbTables.DiaObjectLast:
86  # DiaObjectLast does not need temporal partitioning
87  columns = ["apdb_part"]
88  elif table in (ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource):
89  # these three tables can use either pure spatial or combined
90  if time_partition_tables:
91  columns = ["apdb_part"]
92  else:
93  columns = ["apdb_part", "apdb_time_part"]
94  else:
95  # TODO: Do not know yet how other tables can be partitioned
96  self._ignore_tables.append(table)
97 
98  # add columns to the column list
99  columnDefs = [ColumnDef(name=name,
100  type="BIGINT",
101  nullable=False,
102  default=None,
103  description="",
104  unit=None,
105  ucd=None) for name in columns]
106  tableDef.columns = columnDefs + tableDef.columns
107 
108  # make an index
109  index = IndexDef(name=f"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
110  tableDef.indices.append(index)
111 
112  self._packed_columns = {}
113  if self._packing != "none":
114  for table, tableDef in self.tableSchemas.items():
115  index_columns = set(itertools.chain.from_iterable(
116  index.columns for index in tableDef.indices
117  ))
118  columnsDefs = [column for column in tableDef.columns if column.name not in index_columns]
119  self._packed_columns[table] = columnsDefs
120 
std::vector< SchemaItem< Flag > > * items
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33

Member Function Documentation

◆ clusteringColumns()

List[str] lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.clusteringColumns (   self,
ApdbTables  table_name 
)
Return a list of columns used for clustering.

Parameters
----------
table_name : `ApdbTables`
    Table name in APDB schema

Returns
-------
columns : `list` of `str`
    Names of columns for used for partitioning.

Definition at line 163 of file apdbCassandraSchema.py.

163  def clusteringColumns(self, table_name: ApdbTables) -> List[str]:
164  """Return a list of columns used for clustering.
165 
166  Parameters
167  ----------
168  table_name : `ApdbTables`
169  Table name in APDB schema
170 
171  Returns
172  -------
173  columns : `list` of `str`
174  Names of columns for used for partitioning.
175  """
176  table_schema = self.tableSchemas[table_name]
177  for index in table_schema.indices:
178  if index.type is IndexType.PRIMARY:
179  return index.columns
180  return []
181 

◆ getColumnMap()

Mapping[str, ColumnDef] lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.getColumnMap (   self,
ApdbTables  table_name 
)
Returns mapping of column names to Column definitions.

Parameters
----------
table_name : `ApdbTables`
    One of known APDB table names.

Returns
-------
column_map : `dict`
    Mapping of column names to `ColumnDef` instances.

Definition at line 126 of file apdbCassandraSchema.py.

126  def getColumnMap(self, table_name: ApdbTables) -> Mapping[str, ColumnDef]:
127  """Returns mapping of column names to Column definitions.
128 
129  Parameters
130  ----------
131  table_name : `ApdbTables`
132  One of known APDB table names.
133 
134  Returns
135  -------
136  column_map : `dict`
137  Mapping of column names to `ColumnDef` instances.
138  """
139  table = self.tableSchemas[table_name]
140  cmap = {column.name: column for column in table.columns}
141  return cmap
142 

◆ makeSchema()

None lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.makeSchema (   self,
bool   drop = False,
Optional[Tuple[int, int]]   part_range = None 
)
Create or re-create all tables.

Parameters
----------
drop : `bool`
    If True then drop tables before creating new ones.
part_range : `tuple` [ `int` ] or `None`
    Start and end partition number for time partitions, end is not
    inclusive. Used to create per-partition DiaObject, DiaSource, and
    DiaForcedSource tables. If `None` then per-partition tables are
    not created.

Definition at line 182 of file apdbCassandraSchema.py.

182  def makeSchema(self, drop: bool = False, part_range: Optional[Tuple[int, int]] = None) -> None:
183  """Create or re-create all tables.
184 
185  Parameters
186  ----------
187  drop : `bool`
188  If True then drop tables before creating new ones.
189  part_range : `tuple` [ `int` ] or `None`
190  Start and end partition number for time partitions, end is not
191  inclusive. Used to create per-partition DiaObject, DiaSource, and
192  DiaForcedSource tables. If `None` then per-partition tables are
193  not created.
194  """
195 
196  for table in self.tableSchemas:
197  if table in self._ignore_tables:
198  _LOG.debug("Skipping schema for table %s", table)
199  continue
200  _LOG.debug("Making table %s", table)
201 
202  fullTable = table.table_name(self._prefix)
203 
204  table_list = [fullTable]
205  if part_range is not None:
206  if table in (ApdbTables.DiaSource, ApdbTables.DiaForcedSource, ApdbTables.DiaObject):
207  partitions = range(*part_range)
208  table_list = [f"{fullTable}_{part}" for part in partitions]
209 
210  if drop:
211  queries = [f'DROP TABLE IF EXISTS "{table_name}"' for table_name in table_list]
212  futures = [self._session.execute_async(query, timeout=None) for query in queries]
213  for future in futures:
214  _LOG.debug("wait for query: %s", future.query)
215  future.result()
216  _LOG.debug("query finished: %s", future.query)
217 
218  queries = []
219  for table_name in table_list:
220  if_not_exists = "" if drop else "IF NOT EXISTS"
221  columns = ", ".join(self._tableColumns(table))
222  query = f'CREATE TABLE {if_not_exists} "{table_name}" ({columns})'
223  _LOG.debug("query: %s", query)
224  queries.append(query)
225  futures = [self._session.execute_async(query, timeout=None) for query in queries]
226  for future in futures:
227  _LOG.debug("wait for query: %s", future.query)
228  future.result()
229  _LOG.debug("query finished: %s", future.query)
230 

◆ packedColumns()

List[ColumnDef] lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.packedColumns (   self,
ApdbTables  table_name 
)
Return set of columns that are packed into BLOB.

Parameters
----------
table_name : `ApdbTables`
    Name of the table.

Returns
-------
columns : `list` [ `ColumnDef` ]
    List of column definitions. Empty list is returned if packing is
    not configured.

Definition at line 288 of file apdbCassandraSchema.py.

288  def packedColumns(self, table_name: ApdbTables) -> List[ColumnDef]:
289  """Return set of columns that are packed into BLOB.
290 
291  Parameters
292  ----------
293  table_name : `ApdbTables`
294  Name of the table.
295 
296  Returns
297  -------
298  columns : `list` [ `ColumnDef` ]
299  List of column definitions. Empty list is returned if packing is
300  not configured.
301  """
302  return self._packed_columns.get(table_name, [])

◆ partitionColumns()

List[str] lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.partitionColumns (   self,
ApdbTables  table_name 
)
Return a list of columns used for table partitioning.

Parameters
----------
table_name : `ApdbTables`
    Table name in APDB schema

Returns
-------
columns : `list` of `str`
    Names of columns for used for partitioning.

Definition at line 143 of file apdbCassandraSchema.py.

143  def partitionColumns(self, table_name: ApdbTables) -> List[str]:
144  """Return a list of columns used for table partitioning.
145 
146  Parameters
147  ----------
148  table_name : `ApdbTables`
149  Table name in APDB schema
150 
151  Returns
152  -------
153  columns : `list` of `str`
154  Names of columns for used for partitioning.
155  """
156  table_schema = self.tableSchemas[table_name]
157  for index in table_schema.indices:
158  if index.type is IndexType.PARTITION:
159  # there could be just one partitoning index (possibly with few columns)
160  return index.columns
161  return []
162 

◆ tableName()

str lsst.dax.apdb.apdbCassandraSchema.ApdbCassandraSchema.tableName (   self,
ApdbTables  table_name 
)
Return Cassandra table name for APDB table.

Definition at line 121 of file apdbCassandraSchema.py.

121  def tableName(self, table_name: ApdbTables) -> str:
122  """Return Cassandra table name for APDB table.
123  """
124  return table_name.table_name(self._prefix)
125 

Member Data Documentation

◆ tableSchemas

lsst.dax.apdb.apdbSchema.ApdbSchema.tableSchemas
inherited

Definition at line 178 of file apdbSchema.py.


The documentation for this class was generated from the following file: