LSSTApplications  19.0.0-14-gb0260a2+72efe9b372,20.0.0+7927753e06,20.0.0+8829bf0056,20.0.0+995114c5d2,20.0.0+b6f4b2abd1,20.0.0+bddc4f4cbe,20.0.0-1-g253301a+8829bf0056,20.0.0-1-g2b7511a+0d71a2d77f,20.0.0-1-g5b95a8c+7461dd0434,20.0.0-12-g321c96ea+23efe4bbff,20.0.0-16-gfab17e72e+fdf35455f6,20.0.0-2-g0070d88+ba3ffc8f0b,20.0.0-2-g4dae9ad+ee58a624b3,20.0.0-2-g61b8584+5d3db074ba,20.0.0-2-gb780d76+d529cf1a41,20.0.0-2-ged6426c+226a441f5f,20.0.0-2-gf072044+8829bf0056,20.0.0-2-gf1f7952+ee58a624b3,20.0.0-20-geae50cf+e37fec0aee,20.0.0-25-g3dcad98+544a109665,20.0.0-25-g5eafb0f+ee58a624b3,20.0.0-27-g64178ef+f1f297b00a,20.0.0-3-g4cc78c6+e0676b0dc8,20.0.0-3-g8f21e14+4fd2c12c9a,20.0.0-3-gbd60e8c+187b78b4b8,20.0.0-3-gbecbe05+48431fa087,20.0.0-38-ge4adf513+a12e1f8e37,20.0.0-4-g97dc21a+544a109665,20.0.0-4-gb4befbc+087873070b,20.0.0-4-gf910f65+5d3db074ba,20.0.0-5-gdfe0fee+199202a608,20.0.0-5-gfbfe500+d529cf1a41,20.0.0-6-g64f541c+d529cf1a41,20.0.0-6-g9a5b7a1+a1cd37312e,20.0.0-68-ga3f3dda+5fca18c6a4,20.0.0-9-g4aef684+e18322736b,w.2020.45
LSSTDataManagementBasePackage
calibRepoConverter.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ["CalibRepoConverter"]
24 
25 from collections import defaultdict
26 import os
27 import sqlite3
28 from typing import TYPE_CHECKING, Dict, Iterator, List, Mapping, Tuple, Optional
29 
30 import astropy.time
31 import astropy.units as u
32 
33 from lsst.daf.butler import DataCoordinate, FileDataset, Timespan
34 from .repoConverter import RepoConverter
35 from .repoWalker import RepoWalker
36 
37 if TYPE_CHECKING:
38  from lsst.daf.butler import DatasetType, StorageClass, FormatterParameter
39  from .repoWalker.scanner import PathElementHandler
40  from ..cameraMapper import CameraMapper
41  from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping
42 
43 
45  """A specialization of `RepoConverter` for calibration repositories.
46 
47  Parameters
48  ----------
49  mapper : `CameraMapper`
50  Gen2 mapper for the data repository. The root associated with the
51  mapper is ignored and need not match the root of the repository.
52  kwds
53  Additional keyword arguments are forwarded to (and required by)
54  `RepoConverter`.
55  """
56 
57  def __init__(self, *, mapper: CameraMapper, collection: str, **kwds):
58  super().__init__(run=None, **kwds)
59  self.mapper = mapper
60  self.collection = collection
61  self._datasetTypes = set()
62 
63  def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
64  # Docstring inherited from RepoConverter.
65  return datasetTypeName in self.instrument.getCuratedCalibrationNames()
66 
67  def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
68  # Docstring inherited from RepoConverter.
69  yield from self.mapper.calibrations.items()
70 
71  def makeRepoWalkerTarget(self, datasetTypeName: str, template: str, keys: Dict[str, type],
72  storageClass: StorageClass, formatter: FormatterParameter = None,
73  targetHandler: Optional[PathElementHandler] = None,
74  ) -> RepoWalker.Target:
75  # Docstring inherited from RepoConverter.
76  target = RepoWalker.Target(
77  datasetTypeName=datasetTypeName,
78  storageClass=storageClass,
79  template=template,
80  keys=keys,
81  instrument=self.task.instrument.getName(),
82  universe=self.task.registry.dimensions,
83  formatter=formatter,
84  targetHandler=targetHandler,
85  translatorFactory=self.task.translatorFactory,
86  )
87  self._datasetTypes.add(target.datasetType)
88  return target
89 
90  def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
91  ) -> Iterator[sqlite3.Row]:
92  # TODO: docs
93  fields = ["validStart", "validEnd"]
94  if "detector" in datasetType.dimensions.names:
95  fields.append(self.task.config.ccdKey)
96  else:
97  fields.append(f"NULL AS {self.task.config.ccdKey}")
98  if "physical_filter" in datasetType.dimensions.names:
99  fields.append("filter")
100  else:
101  assert "band" not in datasetType.dimensions.names
102  fields.append("NULL AS filter")
103  tables = self.mapper.mappings[datasetType.name].tables
104  if tables is None or len(tables) == 0:
105  self.task.log.warn("Could not extract calibration ranges for %s in %s; "
106  "no tables in Gen2 mapper.",
107  datasetType.name, self.root, tables[0])
108  return
109  query = f"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
110  try:
111  results = db.execute(query, (calibDate,))
112  except sqlite3.OperationalError as e:
113  self.task.log.warn("Could not extract calibration ranges for %s in %s from table %s: %r",
114  datasetType.name, self.root, tables[0], e)
115  return
116  yield from results
117 
118  def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
119  # Read Gen2 calibration repository and extract validity ranges for
120  # all datasetType + calibDate combinations we ingested.
121  calibFile = os.path.join(self.root, "calibRegistry.sqlite3")
122  # If the registry file does not exist this indicates a problem.
123  # We check explicitly because sqlite will try to create the
124  # missing file if it can.
125  if not os.path.exists(calibFile):
126  raise RuntimeError("Attempting to convert calibrations but no registry database"
127  f" found in {self.root}")
128 
129  # Initially we collate timespans for each dataId + dataset type
130  # combination. This allows us to check for small gaps or overlaps
131  # inherent in the ambiguous usage of validity ranges in gen2
132  timespansByDataId = defaultdict(list)
133 
134  db = sqlite3.connect(calibFile)
135  db.row_factory = sqlite3.Row
136 
137  for datasetType, datasetsByCalibDate in datasets.items():
138  if not datasetType.isCalibration():
139  continue
140  gen2keys = {}
141  if "detector" in datasetType.dimensions.names:
142  gen2keys[self.task.config.ccdKey] = int
143  if "physical_filter" in datasetType.dimensions.names:
144  gen2keys["filter"] = str
145  translator = self.instrument.makeDataIdTranslatorFactory().makeMatching(
146  datasetType.name,
147  gen2keys,
148  instrument=self.instrument.getName()
149  )
150  for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
151  assert calibDate is not None, ("datasetType.isCalibration() is set by "
152  "the presence of calibDate in the Gen2 template")
153  # Build a mapping that lets us find DatasetRefs by data ID,
154  # for this DatasetType and calibDate. We know there is only
155  # one ref for each data ID (given DatasetType and calibDate as
156  # well).
157  refsByDataId = {}
158  for dataset in datasetsForCalibDate:
159  refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
160  # Query the Gen2 calibration repo for the validity ranges for
161  # this DatasetType and calibDate, and look up the appropriate
162  # refs by data ID.
163  for row in self._queryGen2CalibRegistry(db, datasetType, calibDate):
164  # For validity times we use TAI as some gen2 repos have
165  # validity dates very far in the past or future.
166  timespan = Timespan(
167  astropy.time.Time(row["validStart"], format="iso", scale="tai"),
168  astropy.time.Time(row["validEnd"], format="iso", scale="tai"),
169  )
170  # Make a Gen2 data ID from query results.
171  gen2id = {}
172  if "detector" in datasetType.dimensions.names:
173  gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey]
174  if "physical_filter" in datasetType.dimensions.names:
175  gen2id["filter"] = row["filter"]
176  # Translate that to Gen3.
177  gen3id, _ = translator(gen2id)
178  dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
179  ref = refsByDataId.get(dataId)
180  if ref is not None:
181  # Validity ranges must not overlap for the same dataID
182  # datasetType combination. Use that as a primary
183  # key and store the timespan and ref in a tuple
184  # as the value for later timespan validation.
185  timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref))
186  else:
187  # The Gen2 calib registry mentions this dataset, but it
188  # isn't included in what we've ingested. This might
189  # sometimes be a problem, but it should usually
190  # represent someone just trying to convert a subset of
191  # the Gen2 repo, so I don't think it's appropriate to
192  # warn or even log at info, since in that case there
193  # may be a _lot_ of these messages.
194  self.task.log.debug(
195  "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
196  datasetType.name, calibDate, dataId
197  )
198 
199  # Analyze the timespans to check for overlap problems
200  # Gaps of a day should be closed since we assume differing
201  # conventions in gen2 repos.
202 
203  # We need to correct any validity range issues and store the
204  # results in a dict-of-lists keyed by Timespan, since
205  # Registry.certify operates on one Timespan and multiple refs at a
206  # time.
207  refsByTimespan = defaultdict(list)
208 
209  # A day with a bit of fuzz to indicate the largest gap we will close
210  max_gap = astropy.time.TimeDelta(1.001, format="jd", scale="tai")
211 
212  # Since in many cases the validity ranges are relevant for multiple
213  # dataset types and dataIds we don't want to over-report and so
214  # cache the messages for later.
215  info_messages = set()
216  warn_messages = set()
217  for timespans in timespansByDataId.values():
218  # Sort all the timespans and check overlaps
219  sorted_timespans = sorted(timespans, key=lambda x: x[0])
220  timespan_prev, ref_prev = sorted_timespans.pop(0)
221  for timespan, ref in sorted_timespans:
222  # See if we have a suspicious gap
223  delta = timespan.begin - timespan_prev.end
224  abs_delta = abs(delta)
225  if abs_delta > 0 and abs_delta < max_gap:
226  if delta > 0:
227  # Gap between timespans
228  msg = f"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}"
229  info_messages.add(msg)
230  else:
231  # Overlap of timespans
232  msg = f"Calibration validity overlap of {abs(delta).to(u.s)} removed for period " \
233  f"{timespan.begin} to {timespan_prev.end}"
234  warn_messages.add(msg)
235 
236  self.task.log.debug("Correcting validity range for %s with end %s",
237  ref_prev, timespan_prev.end)
238 
239  # Assume this gap is down to convention in gen2.
240  # We have to adjust the previous timespan to fit
241  # since we always trust validStart.
242  timespan_prev = Timespan(begin=timespan_prev.begin,
243  end=timespan.begin)
244  # Store the previous timespan and ref since it has now
245  # been verified
246  refsByTimespan[timespan_prev].append(ref_prev)
247 
248  # And update the previous values for the next iteration
249  timespan_prev = timespan
250  ref_prev = ref
251 
252  # Store the final timespan/ref pair
253  refsByTimespan[timespan_prev].append(ref_prev)
254 
255  # Issue any pending log messages we have recorded
256  for msg in sorted(info_messages):
257  self.task.log.info(msg)
258  for msg in sorted(warn_messages):
259  self.task.log.warn(msg)
260 
261  # Done reading from Gen2, time to certify into Gen3.
262  for timespan, refs in refsByTimespan.items():
263  self.task.registry.certify(self.collection, refs, timespan)
264 
265  def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str:
266  if calibDate is None:
267  return super().getRun(datasetTypeName)
268  else:
269  return self.instrument.makeCollectionName("calib", "gen2", calibDate)
270 
271  # Class attributes that will be shadowed by public instance attributes;
272  # defined here only for documentation purposes.
273 
274  mapper: CameraMapper
275  """Gen2 mapper associated with this repository.
276  """
lsst.obs.base.gen2to3.repoConverter.RepoConverter.instrument
instrument
Definition: repoConverter.py:212
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.getRun
str getRun(self, str datasetTypeName, Optional[str] calibDate=None)
Definition: calibRepoConverter.py:265
lsst.obs.base.gen2to3.repoConverter.RepoConverter.root
root
Definition: repoConverter.py:211
lsst::sphgeom::abs
Angle abs(Angle const &a)
Definition: Angle.h:106
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter._queryGen2CalibRegistry
Iterator[sqlite3.Row] _queryGen2CalibRegistry(self, sqlite3.Connection db, DatasetType datasetType, str calibDate)
Definition: calibRepoConverter.py:90
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.collection
collection
Definition: calibRepoConverter.py:60
lsst.obs.base.gen2to3.repoConverter.RepoConverter.task
task
Definition: repoConverter.py:210
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter._datasetTypes
_datasetTypes
Definition: calibRepoConverter.py:61
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.iterMappings
Iterator[Tuple[str, CameraMapperMapping]] iterMappings(self)
Definition: calibRepoConverter.py:67
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.__init__
def __init__(self, *CameraMapper mapper, str collection, **kwds)
Definition: calibRepoConverter.py:57
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.isDatasetTypeSpecial
bool isDatasetTypeSpecial(self, str datasetTypeName)
Definition: calibRepoConverter.py:63
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter
Definition: calibRepoConverter.py:44
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.mapper
mapper
Definition: calibRepoConverter.py:59
lsst.obs.base.gen2to3.repoConverter.RepoConverter
Definition: repoConverter.py:180
lsst.obs.base.gen2to3.calibRepoConverter.CalibRepoConverter.makeRepoWalkerTarget
RepoWalker.Target makeRepoWalkerTarget(self, str datasetTypeName, str template, Dict[str, type] keys, StorageClass storageClass, FormatterParameter formatter=None, Optional[PathElementHandler] targetHandler=None)
Definition: calibRepoConverter.py:71
set
daf::base::PropertySet * set
Definition: fits.cc:912