LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
partitioner.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["Partitioner"]
25
26
27import astropy.time
28
29from lsst import sphgeom
30
31from ..apdbSchema import ApdbTables
32from ..pixelization import Pixelization
33from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
34
35
37 """Logic for temporal and spacial partitiong of APDB tables.
38
39 Parameters
40 ----------
41 config : `ApdbCassandraConfig`
42 Configuration object.
43 """
44
45 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
46 """Start time for partition 0, this should never be changed."""
47
48 def __init__(self, config: ApdbCassandraConfig):
49 self._config = config
51 config.partitioning.part_pixelization,
52 config.partitioning.part_pix_level,
53 config.partitioning.part_pix_max_ranges,
54 )
55 self._epoch = float(self.partition_zero_epoch.mjd)
56
57 def pixel(self, direction: sphgeom.UnitVector3d) -> int:
58 """Compute the index of the pixel for given direction.
59
60 Parameters
61 ----------
62 direction : `lsst.sphgeom.UnitVector3d`
63 Spatial position.
64
65 Returns
66 -------
67 pixel : `int`
68 Pixel index.
69 """
70 return self.pixelization.pixel(direction)
71
72 def time_partition(self, time: float | astropy.time.Time) -> int:
73 """Calculate time partition number for a given time.
74
75 Parameters
76 ----------
77 time : `float` or `astropy.time.Time`
78 Time for which to calculate partition number. Can be float to mean
79 MJD or `astropy.time.Time`
80
81 Returns
82 -------
83 partition : `int`
84 Partition number for a given time.
85 """
86 if isinstance(time, astropy.time.Time):
87 mjd = float(time.mjd)
88 else:
89 mjd = time
90 days_since_epoch = mjd - self._epoch
91 partition = int(days_since_epoch) // self._config.partitioning.time_partition_days
92 return partition
93
94 def partition_period(self, time_partition: int) -> tuple[astropy.time.Time, astropy.time.Time]:
95 """Return time period for specified taime partition.
96
97 Parameters
98 ----------
99 time_partition : `int`
100 Time partition.
101
102 Returns
103 -------
104 start : `astropy.time.Time`
105 Start of the period, inclusive boundary.
106 end : `astropy.time.Time`
107 Start of the period, exclusive boundary.
108 """
109 partition_days = self._config.partitioning.time_partition_days
110 start_mjd = self._epoch + partition_days * time_partition
111 end_mjd = self._epoch + partition_days * (time_partition + 1)
112 start = astropy.time.Time(start_mjd, format="mjd", scale="tai")
113 end = astropy.time.Time(end_mjd, format="mjd", scale="tai")
114 return (start, end)
115
117 self, region: sphgeom.Region | None, *, use_ranges: bool = False, for_prepare: bool = False
118 ) -> tuple[list[tuple[str, tuple]], int]:
119 """Generate expressions for spatial part of WHERE clause.
120
121 Parameters
122 ----------
123 region : `sphgeom.Region`
124 Spatial region for query results.
125 use_ranges : `bool`, optional
126 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
127 p2") instead of exact list of pixels. Should be set to True for
128 large regions covering very many pixels.
129 for_prepare : `bool`, optional
130 If True then use placeholders for prepared statement (?), otherwise
131 produce regulr statement placeholders (%s).
132
133 Returns
134 -------
135 expressions : `list` [ `tuple` ]
136 Empty list is returned if ``region`` is `None`, otherwise a list
137 of one or more ``(expression: str, parameters: tuple)`` tuples.
138 partition_count : `int`
139 Number of spatial partitions in the result.
140 """
141 if region is None:
142 return [], 0
143
144 token = "?" if for_prepare else "%s"
145
146 count = 0
147 expressions: list[tuple[str, tuple]] = []
148 if use_ranges:
149 pixel_ranges = self.pixelization.envelope(region)
150 for lower, upper in pixel_ranges:
151 upper -= 1
152 if lower == upper:
153 expressions.append((f'"apdb_part" = {token}', (lower,)))
154 count += 1
155 elif lower + 1 == upper:
156 expressions.append((f'"apdb_part" = {token}', (lower,)))
157 expressions.append((f'"apdb_part" = {token}', (upper,)))
158 count += 2
159 else:
160 count += upper - lower + 1
161 expressions.append((f'"apdb_part" >= {token} AND "apdb_part" <= {token}', (lower, upper)))
162 else:
163 pixels = self.pixelization.pixels(region)
164 count = len(pixels)
165 if self._config.partitioning.query_per_spatial_part:
166 expressions.extend((f'"apdb_part" = {token}', (pixel,)) for pixel in pixels)
167 else:
168 pixels_str = ",".join([str(pix) for pix in pixels])
169 expressions.append((f'"apdb_part" IN ({pixels_str})', ()))
170
171 return expressions, count
172
174 self,
175 table: ApdbTables,
176 start_time: float | astropy.time.Time,
177 end_time: float | astropy.time.Time,
178 *,
179 query_per_time_part: bool | None = None,
180 for_prepare: bool = False,
181 partitons_range: ApdbCassandraTimePartitionRange | None = None,
182 ) -> tuple[list[str], list[tuple[str, tuple]]]:
183 """Generate table names and expressions for temporal part of WHERE
184 clauses.
185
186 Parameters
187 ----------
188 table : `ApdbTables`
189 Table to select from.
190 start_time : `astropy.time.Time` or `float`
191 Starting Datetime of MJD value of the time range.
192 end_time : `astropy.time.Time` or `float`
193 Starting Datetime of MJD value of the time range.
194 query_per_time_part : `bool`, optional
195 If None then use ``query_per_time_part`` from configuration.
196 for_prepare : `bool`, optional
197 If True then use placeholders for prepared statement (?), otherwise
198 produce regulr statement placeholders (%s).
199 partitons_range : `ApdbCassandraTimePartitionRange` or `None`
200 Partitions range to further restrict time range.
201
202 Returns
203 -------
204 tables : `list` [ `str` ]
205 List of the table names to query. Empty list is returned when time
206 range does not overlap ``partitons_range``.
207 expressions : `list` [ `tuple` ]
208 A list of zero or more ``(expression: str, parameters: tuple)``
209 tuples.
210 """
211 tables: list[str]
212 temporal_where: list[tuple[str, tuple]] = []
213 # First and last partition.
214 time_part_start = self.time_partition(start_time)
215 time_part_end = self.time_partition(end_time)
216 if partitons_range:
217 # Check for non-overlapping ranges.
218 if time_part_start > partitons_range.end or time_part_end < partitons_range.start:
219 return [], []
220 if time_part_start < partitons_range.start:
221 time_part_start = partitons_range.start
222 if time_part_end > partitons_range.end:
223 time_part_end = partitons_range.end
224 # Inclusive range.
225 time_parts = list(range(time_part_start, time_part_end + 1))
226 if self._config.partitioning.time_partition_tables:
227 tables = [table.table_name(self._config.prefix, part) for part in time_parts]
228 else:
229 token = "?" if for_prepare else "%s"
230 tables = [table.table_name(self._config.prefix)]
231 if query_per_time_part is None:
232 query_per_time_part = self._config.partitioning.query_per_time_part
233 if query_per_time_part:
234 temporal_where = [(f'"apdb_time_part" = {token}', (time_part,)) for time_part in time_parts]
235 else:
236 time_part_list = ",".join([str(part) for part in time_parts])
237 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
238
239 return tables, temporal_where
tuple[list[str], list[tuple[str, tuple]]] temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, *, bool|None query_per_time_part=None, bool for_prepare=False, ApdbCassandraTimePartitionRange|None partitons_range=None)
tuple[list[tuple[str, tuple]], int] spatial_where(self, sphgeom.Region|None region, *, bool use_ranges=False, bool for_prepare=False)
int time_partition(self, float|astropy.time.Time time)
int pixel(self, sphgeom.UnitVector3d direction)
__init__(self, ApdbCassandraConfig config)
tuple[astropy.time.Time, astropy.time.Time] partition_period(self, int time_partition)