LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.dax.apdb.apdbCassandra.ApdbCassandra Class Reference
Inheritance diagram for lsst.dax.apdb.apdbCassandra.ApdbCassandra:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

def __init__ (self, ApdbCassandraConfig config)
 
Optional[TableDeftableDef (self, ApdbTables table)
 
None makeSchema (self, bool drop=False)
 
pandas.DataFrame getDiaObjects (self, sphgeom.Region region)
 
Optional[pandas.DataFrame] getDiaSources (self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
Optional[pandas.DataFrame] getDiaForcedSources (self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
None store (self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
pandas.DataFrame getDiaObjects (self, Region region)
 
Optional[pandas.DataFrame] getDiaSources (self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
Optional[pandas.DataFrame] getDiaForcedSources (self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
ConfigurableField makeField (cls, str doc)
 

Public Attributes

 config
 

Static Public Attributes

 partition_zero_epoch = dafBase.DateTime(1970, 1, 1, 0, 0, 0, dafBase.DateTime.TAI)
 
 ConfigClass = ApdbConfig
 

Detailed Description

Implementation of APDB database on to of Apache Cassandra.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbCassandraConfig` configuration class. For an example of
different configurations check config/ folder.

Parameters
----------
config : `ApdbCassandraConfig`
    Configuration object.

Definition at line 365 of file apdbCassandra.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.dax.apdb.apdbCassandra.ApdbCassandra.__init__ (   self,
ApdbCassandraConfig  config 
)

Definition at line 381 of file apdbCassandra.py.

381  def __init__(self, config: ApdbCassandraConfig):
382 
383  if not CASSANDRA_IMPORTED:
384  raise CassandraMissingError()
385 
386  self.config = config
387 
388  _LOG.debug("ApdbCassandra Configuration:")
389  _LOG.debug(" read_consistency: %s", self.config.read_consistency)
390  _LOG.debug(" write_consistency: %s", self.config.write_consistency)
391  _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
392  _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
393  _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
394  _LOG.debug(" schema_file: %s", self.config.schema_file)
395  _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
396  _LOG.debug(" schema prefix: %s", self.config.prefix)
397  _LOG.debug(" part_pixelization: %s", self.config.part_pixelization)
398  _LOG.debug(" part_pix_level: %s", self.config.part_pix_level)
399  _LOG.debug(" query_per_time_part: %s", self.config.query_per_time_part)
400  _LOG.debug(" query_per_spatial_part: %s", self.config.query_per_spatial_part)
401 
402  self._partitioner = Partitioner(config)
403 
404  addressTranslator: Optional[AddressTranslator] = None
405  if config.private_ips:
406  loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
407  addressTranslator = _AddressTranslator(config.contact_points, config.private_ips)
408  else:
409  loadBalancePolicy = RoundRobinPolicy()
410 
411  self._read_consistency = getattr(cassandra.ConsistencyLevel, config.read_consistency)
412  self._write_consistency = getattr(cassandra.ConsistencyLevel, config.write_consistency)
413 
414  self._cluster = Cluster(contact_points=self.config.contact_points,
415  load_balancing_policy=loadBalancePolicy,
416  address_translator=addressTranslator,
417  protocol_version=self.config.protocol_version)
418  self._session = self._cluster.connect(keyspace=config.keyspace)
419  self._session.row_factory = cassandra.query.named_tuple_factory
420 
421  self._schema = ApdbCassandraSchema(session=self._session,
422  schema_file=self.config.schema_file,
423  extra_schema_file=self.config.extra_schema_file,
424  prefix=self.config.prefix,
425  packing=self.config.packing,
426  time_partition_tables=self.config.time_partition_tables)
427  self._partition_zero_epoch_mjd = self.partition_zero_epoch.get(system=dafBase.DateTime.MJD)
428 

Member Function Documentation

◆ countUnassociatedObjects()

int lsst.dax.apdb.apdbCassandra.ApdbCassandra.countUnassociatedObjects (   self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 705 of file apdbCassandra.py.

705  def countUnassociatedObjects(self) -> int:
706  # docstring is inherited from a base class
707  raise NotImplementedError()
708 

◆ dailyJob()

None lsst.dax.apdb.apdbCassandra.ApdbCassandra.dailyJob (   self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 701 of file apdbCassandra.py.

701  def dailyJob(self) -> None:
702  # docstring is inherited from a base class
703  pass
704 

◆ getDiaForcedSources() [1/2]

Optional[pandas.DataFrame] lsst.dax.apdb.apdb.Apdb.getDiaForcedSources (   self,
Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)
inherited
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained. Some implementations may not support latter case.
visit_time : `lsst.daf.base.DateTime`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_forced_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_forced_sources_months`` config parameter,
w.r.t. ``visit_time``. If ``object_ids`` is empty then an empty catalog
is always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented in lsst.dax.apdb.apdbSql.ApdbSql.

Definition at line 163 of file apdb.py.

165  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
166  """Return catalog of DiaForcedSource instances from a given region.
167 
168  Parameters
169  ----------
170  region : `lsst.sphgeom.Region`
171  Region to search for DIASources.
172  object_ids : iterable [ `int` ], optional
173  List of DiaObject IDs to further constrain the set of returned
174  sources. If list is empty then empty catalog is returned with a
175  correct schema. If `None` then returned sources are not
176  constrained. Some implementations may not support latter case.
177  visit_time : `lsst.daf.base.DateTime`
178  Time of the current visit.
179 
180  Returns
181  -------
182  catalog : `pandas.DataFrame`, or `None`
183  Catalog containing DiaSource records. `None` is returned if
184  ``read_forced_sources_months`` configuration parameter is set to 0.
185 
186  Raises
187  ------
188  NotImplementedError
189  May be raised by some implementations if ``object_ids`` is `None`.
190 
191  Notes
192  -----
193  This method returns DiaForcedSource catalog for a region with additional
194  filtering based on DiaObject IDs. Only a subset of DiaSource history
195  is returned limited by ``read_forced_sources_months`` config parameter,
196  w.r.t. ``visit_time``. If ``object_ids`` is empty then an empty catalog
197  is always returned with the correct schema (columns/types). If
198  ``object_ids`` is `None` then no filtering is performed and some of the
199  returned records may be outside the specified region.
200  """
201  raise NotImplementedError()
202 
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64

◆ getDiaForcedSources() [2/2]

Optional[pandas.DataFrame] lsst.dax.apdb.apdbCassandra.ApdbCassandra.getDiaForcedSources (   self,
sphgeom.Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)

Definition at line 486 of file apdbCassandra.py.

488  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
489  return self._getSources(region, object_ids, visit_time, ApdbTables.DiaForcedSource,
490  self.config.read_forced_sources_months)
491 

◆ getDiaObjects() [1/2]

pandas.DataFrame lsst.dax.apdb.apdb.Apdb.getDiaObjects (   self,
Region  region 
)
inherited
Returns catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented in lsst.dax.apdb.apdbSql.ApdbSql.

Definition at line 105 of file apdb.py.

105  def getDiaObjects(self, region: Region) -> pandas.DataFrame:
106  """Returns catalog of DiaObject instances from a given region.
107 
108  This method returns only the last version of each DiaObject. Some
109  records in a returned catalog may be outside the specified region, it
110  is up to a client to ignore those records or cleanup the catalog before
111  futher use.
112 
113  Parameters
114  ----------
115  region : `lsst.sphgeom.Region`
116  Region to search for DIAObjects.
117 
118  Returns
119  -------
120  catalog : `pandas.DataFrame`
121  Catalog containing DiaObject records for a region that may be a
122  superset of the specified region.
123  """
124  raise NotImplementedError()
125 

◆ getDiaObjects() [2/2]

pandas.DataFrame lsst.dax.apdb.apdbCassandra.ApdbCassandra.getDiaObjects (   self,
sphgeom.Region  region 
)

Definition at line 447 of file apdbCassandra.py.

447  def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
448  # docstring is inherited from a base class
449  packedColumns = self._schema.packedColumns(ApdbTables.DiaObjectLast)
450  self._session.row_factory = _PandasRowFactory(packedColumns)
451  self._session.default_fetch_size = None
452 
453  pixels = self._partitioner.pixels(region)
454  _LOG.debug("getDiaObjects: #partitions: %s", len(pixels))
455  pixels_str = ",".join([str(pix) for pix in pixels])
456 
457  queries: List[Tuple] = []
458  query = f'SELECT * from "DiaObjectLast" WHERE "apdb_part" IN ({pixels_str})'
459  queries += [(cassandra.query.SimpleStatement(query, consistency_level=self._read_consistency), {})]
460  _LOG.debug("getDiaObjects: #queries: %s", len(queries))
461  # _LOG.debug("getDiaObjects: queries: %s", queries)
462 
463  objects = None
464  with Timer('DiaObject select', self.config.timer):
465  # submit all queries
466  futures = [self._session.execute_async(query, values, timeout=self.config.read_timeout)
467  for query, values in queries]
468  # TODO: This orders result processing which is not very efficient
469  dataframes = [future.result()._current_rows for future in futures]
470  # concatenate all frames
471  if len(dataframes) == 1:
472  objects = dataframes[0]
473  else:
474  objects = pandas.concat(dataframes)
475 
476  _LOG.debug("found %s DiaObjects", objects.shape[0])
477  return objects
478 

◆ getDiaSources() [1/2]

Optional[pandas.DataFrame] lsst.dax.apdb.apdb.Apdb.getDiaSources (   self,
Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)
inherited
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `lsst.daf.base.DateTime`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented in lsst.dax.apdb.apdbSql.ApdbSql.

Definition at line 127 of file apdb.py.

129  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
130  """Return catalog of DiaSource instances from a given region.
131 
132  Parameters
133  ----------
134  region : `lsst.sphgeom.Region`
135  Region to search for DIASources.
136  object_ids : iterable [ `int` ], optional
137  List of DiaObject IDs to further constrain the set of returned
138  sources. If `None` then returned sources are not constrained. If
139  list is empty then empty catalog is returned with a correct
140  schema.
141  visit_time : `lsst.daf.base.DateTime`
142  Time of the current visit.
143 
144  Returns
145  -------
146  catalog : `pandas.DataFrame`, or `None`
147  Catalog containing DiaSource records. `None` is returned if
148  ``read_sources_months`` configuration parameter is set to 0.
149 
150  Notes
151  -----
152  This method returns DiaSource catalog for a region with additional
153  filtering based on DiaObject IDs. Only a subset of DiaSource history
154  is returned limited by ``read_sources_months`` config parameter, w.r.t.
155  ``visit_time``. If ``object_ids`` is empty then an empty catalog is
156  always returned with the correct schema (columns/types). If
157  ``object_ids`` is `None` then no filtering is performed and some of the
158  returned records may be outside the specified region.
159  """
160  raise NotImplementedError()
161 

◆ getDiaSources() [2/2]

Optional[pandas.DataFrame] lsst.dax.apdb.apdbCassandra.ApdbCassandra.getDiaSources (   self,
sphgeom.Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)

Definition at line 479 of file apdbCassandra.py.

481  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
482  # docstring is inherited from a base class
483  return self._getSources(region, object_ids, visit_time, ApdbTables.DiaSource,
484  self.config.read_sources_months)
485 

◆ makeField()

ConfigurableField lsst.dax.apdb.apdb.Apdb.makeField (   cls,
str  doc 
)
inherited
Make a `~lsst.pex.config.ConfigurableField` for Apdb.

Parameters
----------
doc : `str`
    Help text for the field.

Returns
-------
configurableField : `lsst.pex.config.ConfigurableField`
    A `~lsst.pex.config.ConfigurableField` for Apdb.

Definition at line 268 of file apdb.py.

268  def makeField(cls, doc: str) -> ConfigurableField:
269  """Make a `~lsst.pex.config.ConfigurableField` for Apdb.
270 
271  Parameters
272  ----------
273  doc : `str`
274  Help text for the field.
275 
276  Returns
277  -------
278  configurableField : `lsst.pex.config.ConfigurableField`
279  A `~lsst.pex.config.ConfigurableField` for Apdb.
280  """
281  return ConfigurableField(doc=doc, target=cls)

◆ makeSchema()

None lsst.dax.apdb.apdbCassandra.ApdbCassandra.makeSchema (   self,
bool   drop = False 
)
Create or re-create whole database schema.

Parameters
----------
drop : `bool`
    If True then drop all tables before creating new ones.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 433 of file apdbCassandra.py.

433  def makeSchema(self, drop: bool = False) -> None:
434  # docstring is inherited from a base class
435 
436  if self.config.time_partition_tables:
437  time_partition_start = dafBase.DateTime(self.config.time_partition_start, dafBase.DateTime.TAI)
438  time_partition_end = dafBase.DateTime(self.config.time_partition_end, dafBase.DateTime.TAI)
439  part_range = (
440  self._time_partition(time_partition_start),
441  self._time_partition(time_partition_end) + 1
442  )
443  self._schema.makeSchema(drop=drop, part_range=part_range)
444  else:
445  self._schema.makeSchema(drop=drop)
446 

◆ store()

None lsst.dax.apdb.apdbCassandra.ApdbCassandra.store (   self,
dafBase.DateTime  visit_time,
pandas.DataFrame  objects,
Optional[pandas.DataFrame]   sources = None,
Optional[pandas.DataFrame]   forced_sources = None 
)
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `lsst.daf.base.DateTime`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 638 of file apdbCassandra.py.

642  forced_sources: Optional[pandas.DataFrame] = None) -> None:
643  # docstring is inherited from a base class
644 
645  # fill region partition column for DiaObjects
646  objects = self._add_obj_part(objects)
647  self._storeDiaObjects(objects, visit_time)
648 
649  if sources is not None:
650  # copy apdb_part column from DiaObjects to DiaSources
651  sources = self._add_src_part(sources, objects)
652  self._storeDiaSources(ApdbTables.DiaSource, sources, visit_time)
653 
654  if forced_sources is not None:
655  forced_sources = self._add_fsrc_part(forced_sources, objects)
656  self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time)
657 

◆ tableDef()

Optional[TableDef] lsst.dax.apdb.apdbCassandra.ApdbCassandra.tableDef (   self,
ApdbTables  table 
)
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `TableDef` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 429 of file apdbCassandra.py.

429  def tableDef(self, table: ApdbTables) -> Optional[TableDef]:
430  # docstring is inherited from a base class
431  return self._schema.tableSchemas.get(table)
432 

Member Data Documentation

◆ config

lsst.dax.apdb.apdbCassandra.ApdbCassandra.config

Definition at line 386 of file apdbCassandra.py.

◆ ConfigClass

lsst.dax.apdb.apdb.Apdb.ConfigClass = ApdbConfig
staticinherited

Definition at line 74 of file apdb.py.

◆ partition_zero_epoch

lsst.dax.apdb.apdbCassandra.ApdbCassandra.partition_zero_epoch = dafBase.DateTime(1970, 1, 1, 0, 0, 0, dafBase.DateTime.TAI)
static

Definition at line 378 of file apdbCassandra.py.


The documentation for this class was generated from the following file: