LSST Applications  21.0.0+75b29a8a7f,21.0.0+e70536a077,21.0.0-1-ga51b5d4+62c747d40b,21.0.0-10-gbfb87ad6+3307648ee3,21.0.0-15-gedb9d5423+47cba9fc36,21.0.0-2-g103fe59+fdf0863a2a,21.0.0-2-g1367e85+d38a93257c,21.0.0-2-g45278ab+e70536a077,21.0.0-2-g5242d73+d38a93257c,21.0.0-2-g7f82c8f+e682ffb718,21.0.0-2-g8dde007+d179fbfa6a,21.0.0-2-g8f08a60+9402881886,21.0.0-2-ga326454+e682ffb718,21.0.0-2-ga63a54e+08647d4b1b,21.0.0-2-gde069b7+26c92b3210,21.0.0-2-gecfae73+0445ed2f95,21.0.0-2-gfc62afb+d38a93257c,21.0.0-27-gbbd0d29+ae871e0f33,21.0.0-28-g5fc5e037+feb0e9397b,21.0.0-3-g21c7a62+f4b9c0ff5c,21.0.0-3-g357aad2+57b0bddf0b,21.0.0-3-g4be5c26+d38a93257c,21.0.0-3-g65f322c+3f454acf5d,21.0.0-3-g7d9da8d+75b29a8a7f,21.0.0-3-gaa929c8+9e4ef6332c,21.0.0-3-ge02ed75+4b120a55c4,21.0.0-4-g3300ddd+e70536a077,21.0.0-4-g591bb35+4b120a55c4,21.0.0-4-gc004bbf+4911b9cd27,21.0.0-4-gccdca77+f94adcd104,21.0.0-4-ge8fba5a+2b3a696ff9,21.0.0-5-gb155db7+2c5429117a,21.0.0-5-gdf36809+637e4641ee,21.0.0-6-g00874e7+c9fd7f7160,21.0.0-6-g4e60332+4b120a55c4,21.0.0-7-gc8ca178+40eb9cf840,21.0.0-8-gfbe0b4b+9e4ef6332c,21.0.0-9-g2fd488a+d83b7cd606,w.2021.05
LSST Data Management Base Package
calibRepoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["CalibRepoConverter"]
24 
25 from collections import defaultdict
26 import os
27 import sqlite3
28 from typing import TYPE_CHECKING, Dict, Iterator, List, Mapping, Sequence, Tuple, Optional
29 
30 import astropy.time
31 import astropy.units as u
32 
33 from lsst.daf.butler import DataCoordinate, FileDataset, Timespan
34 from .repoConverter import RepoConverter
35 from .repoWalker import RepoWalker
36 
37 if TYPE_CHECKING:
38  from lsst.daf.butler import DatasetType, StorageClass, FormatterParameter
39  from .repoWalker.scanner import PathElementHandler
40  from ..cameraMapper import CameraMapper
41  from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping
42 
43 
45  """A specialization of `RepoConverter` for calibration repositories.
46 
47  Parameters
48  ----------
49  mapper : `CameraMapper`
50  Gen2 mapper for the data repository. The root associated with the
51  mapper is ignored and need not match the root of the repository.
52  labels : `Sequence` [ `str` ]
53  Strings injected into the names of the collections that calibration
54  datasets are written and certified into (forwarded as the ``extra``
55  argument to `Instrument` methods that generate collection names and
56  write curated calibrations).
57  **kwargs
58  Additional keyword arguments are forwarded to (and required by)
59  `RepoConverter`.
60  """
61 
62  def __init__(self, *, mapper: CameraMapper, labels: Sequence[str] = (), **kwargs):
63  super().__init__(run=None, **kwargs)
64  self.mappermapper = mapper
65  self.collectioncollection = self.tasktask.instrument.makeCalibrationCollectionName(*labels)
66  self._labels_labels = tuple(labels)
67  self._datasetTypes_datasetTypes = set()
68 
69  def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
70  # Docstring inherited from RepoConverter.
71  return datasetTypeName in self.instrumentinstrument.getCuratedCalibrationNames()
72 
73  def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
74  # Docstring inherited from RepoConverter.
75  yield from self.mappermapper.calibrations.items()
76 
77  def makeRepoWalkerTarget(self, datasetTypeName: str, template: str, keys: Dict[str, type],
78  storageClass: StorageClass, formatter: FormatterParameter = None,
79  targetHandler: Optional[PathElementHandler] = None,
80  ) -> RepoWalker.Target:
81  # Docstring inherited from RepoConverter.
82  target = RepoWalker.Target(
83  datasetTypeName=datasetTypeName,
84  storageClass=storageClass,
85  template=template,
86  keys=keys,
87  instrument=self.tasktask.instrument.getName(),
88  universe=self.tasktask.registry.dimensions,
89  formatter=formatter,
90  targetHandler=targetHandler,
91  translatorFactory=self.tasktask.translatorFactory,
92  )
93  self._datasetTypes_datasetTypes.add(target.datasetType)
94  return target
95 
96  def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
97  ) -> Iterator[sqlite3.Row]:
98  """Query the Gen2 calibration registry for the validity ranges and
99  optionally detectors and filters associated with the given dataset type
100  and ``calibDate``.
101 
102  Parameters
103  ----------
104  db : `sqlite3.Connection`
105  DBAPI connection to the Gen2 ``calibRegistry.sqlite3`` file.
106  datasetType : `DatasetType`
107  Gen3 dataset type being queried.
108  calibDate : `str`
109  String extracted from the ``calibDate`` template entry in Gen2
110  filenames.
111 
112  Yields
113  ------
114  row : `sqlite3.Row`
115  SQLite result object; will have ``validStart`` and ``validEnd``
116  columns, may have a detector column (named
117  ``self.task.config.ccdKey``) and/or a ``filter`` column, depending
118  on whether ``datasetType.dimensions`` includes ``detector`` and
119  ``physical_filter``, respectively.
120  """
121  fields = ["validStart", "validEnd"]
122  if "detector" in datasetType.dimensions.names:
123  fields.append(self.tasktask.config.ccdKey)
124  else:
125  fields.append(f"NULL AS {self.task.config.ccdKey}")
126  if "physical_filter" in datasetType.dimensions.names:
127  fields.append("filter")
128  else:
129  assert "band" not in datasetType.dimensions.names
130  fields.append("NULL AS filter")
131  tables = self.mappermapper.mappings[datasetType.name].tables
132  if tables is None or len(tables) == 0:
133  self.tasktask.log.warn("Could not extract calibration ranges for %s in %s; "
134  "no tables in Gen2 mapper.",
135  datasetType.name, self.rootroot, tables[0])
136  return
137  query = f"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
138  try:
139  results = db.execute(query, (calibDate,))
140  except sqlite3.OperationalError as e:
141  self.tasktask.log.warn("Could not extract calibration ranges for %s in %s from table %s: %r",
142  datasetType.name, self.rootroot, tables[0], e)
143  return
144  yield from results
145 
146  def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
147  # Docstring inherited from RepoConverter.
148  # Read Gen2 calibration repository and extract validity ranges for
149  # all datasetType + calibDate combinations we ingested.
150  calibFile = os.path.join(self.rootroot, "calibRegistry.sqlite3")
151  # If the registry file does not exist this indicates a problem.
152  # We check explicitly because sqlite will try to create the
153  # missing file if it can.
154  if not os.path.exists(calibFile):
155  raise RuntimeError("Attempting to convert calibrations but no registry database"
156  f" found in {self.root}")
157 
158  # Initially we collate timespans for each dataId + dataset type
159  # combination. This allows us to check for small gaps or overlaps
160  # inherent in the ambiguous usage of validity ranges in gen2
161  timespansByDataId = defaultdict(list)
162 
163  db = sqlite3.connect(calibFile)
164  db.row_factory = sqlite3.Row
165 
166  for datasetType, datasetsByCalibDate in datasets.items():
167  if not datasetType.isCalibration():
168  continue
169  gen2keys = {}
170  if "detector" in datasetType.dimensions.names:
171  gen2keys[self.tasktask.config.ccdKey] = int
172  if "physical_filter" in datasetType.dimensions.names:
173  gen2keys["filter"] = str
174  translator = self.instrumentinstrument.makeDataIdTranslatorFactory().makeMatching(
175  datasetType.name,
176  gen2keys,
177  instrument=self.instrumentinstrument.getName()
178  )
179  for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
180  assert calibDate is not None, ("datasetType.isCalibration() is set by "
181  "the presence of calibDate in the Gen2 template")
182  # Build a mapping that lets us find DatasetRefs by data ID,
183  # for this DatasetType and calibDate. We know there is only
184  # one ref for each data ID (given DatasetType and calibDate as
185  # well).
186  refsByDataId = {}
187  for dataset in datasetsForCalibDate:
188  refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
189  # Query the Gen2 calibration repo for the validity ranges for
190  # this DatasetType and calibDate, and look up the appropriate
191  # refs by data ID.
192  for row in self._queryGen2CalibRegistry_queryGen2CalibRegistry(db, datasetType, calibDate):
193  # For validity times we use TAI as some gen2 repos have
194  # validity dates very far in the past or future.
195  timespan = Timespan(
196  astropy.time.Time(row["validStart"], format="iso", scale="tai"),
197  astropy.time.Time(row["validEnd"], format="iso", scale="tai"),
198  )
199  # Make a Gen2 data ID from query results.
200  gen2id = {}
201  if "detector" in datasetType.dimensions.names:
202  gen2id[self.tasktask.config.ccdKey] = row[self.tasktask.config.ccdKey]
203  if "physical_filter" in datasetType.dimensions.names:
204  gen2id["filter"] = row["filter"]
205  # Translate that to Gen3.
206  gen3id, _ = translator(gen2id)
207  dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
208  ref = refsByDataId.get(dataId)
209  if ref is not None:
210  # Validity ranges must not overlap for the same dataID
211  # datasetType combination. Use that as a primary
212  # key and store the timespan and ref in a tuple
213  # as the value for later timespan validation.
214  timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref))
215  else:
216  # The Gen2 calib registry mentions this dataset, but it
217  # isn't included in what we've ingested. This might
218  # sometimes be a problem, but it should usually
219  # represent someone just trying to convert a subset of
220  # the Gen2 repo, so I don't think it's appropriate to
221  # warn or even log at info, since in that case there
222  # may be a _lot_ of these messages.
223  self.tasktask.log.debug(
224  "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
225  datasetType.name, calibDate, dataId
226  )
227 
228  # Analyze the timespans to check for overlap problems
229  # Gaps of a day should be closed since we assume differing
230  # conventions in gen2 repos.
231 
232  # We need to correct any validity range issues and store the
233  # results in a dict-of-lists keyed by Timespan, since
234  # Registry.certify operates on one Timespan and multiple refs at a
235  # time.
236  refsByTimespan = defaultdict(list)
237 
238  # A day with a bit of fuzz to indicate the largest gap we will close
239  max_gap = astropy.time.TimeDelta(1.001, format="jd", scale="tai")
240 
241  # Since in many cases the validity ranges are relevant for multiple
242  # dataset types and dataIds we don't want to over-report and so
243  # cache the messages for later.
244  info_messages = set()
245  warn_messages = set()
246  for timespans in timespansByDataId.values():
247  # Sort all the timespans and check overlaps
248  sorted_timespans = sorted(timespans, key=lambda x: x[0])
249  timespan_prev, ref_prev = sorted_timespans.pop(0)
250  for timespan, ref in sorted_timespans:
251  # See if we have a suspicious gap
252  delta = timespan.begin - timespan_prev.end
253  abs_delta = abs(delta)
254  if abs_delta > 0 and abs_delta < max_gap:
255  if delta > 0:
256  # Gap between timespans
257  msg = f"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}"
258  info_messages.add(msg)
259  else:
260  # Overlap of timespans
261  msg = f"Calibration validity overlap of {abs(delta).to(u.s)} removed for period " \
262  f"{timespan.begin} to {timespan_prev.end}"
263  warn_messages.add(msg)
264 
265  self.tasktask.log.debug("Correcting validity range for %s with end %s",
266  ref_prev, timespan_prev.end)
267 
268  # Assume this gap is down to convention in gen2.
269  # We have to adjust the previous timespan to fit
270  # since we always trust validStart.
271  timespan_prev = Timespan(begin=timespan_prev.begin,
272  end=timespan.begin)
273  # Store the previous timespan and ref since it has now
274  # been verified
275  refsByTimespan[timespan_prev].append(ref_prev)
276 
277  # And update the previous values for the next iteration
278  timespan_prev = timespan
279  ref_prev = ref
280 
281  # Store the final timespan/ref pair
282  refsByTimespan[timespan_prev].append(ref_prev)
283 
284  # Issue any pending log messages we have recorded
285  for msg in sorted(info_messages):
286  self.tasktask.log.info(msg)
287  for msg in sorted(warn_messages):
288  self.tasktask.log.warn(msg)
289 
290  # Done reading from Gen2, time to certify into Gen3.
291  for timespan, refs in refsByTimespan.items():
292  self.tasktask.registry.certify(self.collectioncollection, refs, timespan)
293 
294  def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str:
295  # Docstring inherited from RepoConverter.
296  if calibDate is None:
297  return super().getRun(datasetTypeName)
298  else:
299  return self.instrumentinstrument.makeCalibrationCollectionName(
300  *self._labels_labels,
301  self.instrumentinstrument.formatCollectionTimestamp(calibDate),
302  )
303 
304  # Class attributes that will be shadowed by public instance attributes;
305  # defined here only for documentation purposes.
306 
307  mapper: CameraMapper
308  """Gen2 mapper associated with this repository.
309  """
str getRun(self, str datasetTypeName, Optional[str] calibDate=None)
Iterator[sqlite3.Row] _queryGen2CalibRegistry(self, sqlite3.Connection db, DatasetType datasetType, str calibDate)
def __init__(self, *CameraMapper mapper, Sequence[str] labels=(), **kwargs)
RepoWalker.Target makeRepoWalkerTarget(self, str datasetTypeName, str template, Dict[str, type] keys, StorageClass storageClass, FormatterParameter formatter=None, Optional[PathElementHandler] targetHandler=None)
Iterator[Tuple[str, CameraMapperMapping]] iterMappings(self)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
std::string const & getName() const noexcept
Return a filter's name.
Definition: Filter.h:78
Angle abs(Angle const &a)
Definition: Angle.h:106
daf::base::PropertySet * set
Definition: fits.cc:912