21 from __future__
import annotations
23 __all__ = [
"CalibRepoConverter"]
25 from collections
import defaultdict
28 from typing
import TYPE_CHECKING, Dict, Iterator, List, Mapping, Tuple, Optional
31 import astropy.units
as u
33 from lsst.daf.butler
import DataCoordinate, FileDataset, Timespan
34 from .repoConverter
import RepoConverter
35 from .repoWalker
import RepoWalker
38 from lsst.daf.butler
import DatasetType, StorageClass, FormatterParameter
39 from .repoWalker.scanner
import PathElementHandler
40 from ..cameraMapper
import CameraMapper
41 from ..mapping
import Mapping
as CameraMapperMapping
45 """A specialization of `RepoConverter` for calibration repositories.
49 mapper : `CameraMapper`
50 Gen2 mapper for the data repository. The root associated with the
51 mapper is ignored and need not match the root of the repository.
53 Additional keyword arguments are forwarded to (and required by)
57 def __init__(self, *, mapper: CameraMapper, collection: str, **kwds):
65 return datasetTypeName
in self.
instrument.getCuratedCalibrationNames()
67 def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
69 yield from self.
mapper.calibrations.items()
72 storageClass: StorageClass, formatter: FormatterParameter =
None,
73 targetHandler: Optional[PathElementHandler] =
None,
74 ) -> RepoWalker.Target:
76 target = RepoWalker.Target(
77 datasetTypeName=datasetTypeName,
78 storageClass=storageClass,
81 instrument=self.
task.instrument.getName(),
82 universe=self.
task.registry.dimensions,
84 targetHandler=targetHandler,
85 translatorFactory=self.
task.translatorFactory,
90 def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
91 ) -> Iterator[sqlite3.Row]:
93 fields = [
"validStart",
"validEnd"]
94 if "detector" in datasetType.dimensions.names:
95 fields.append(self.
task.config.ccdKey)
97 fields.append(f
"NULL AS {self.task.config.ccdKey}")
98 if "physical_filter" in datasetType.dimensions.names:
99 fields.append(
"filter")
101 assert "band" not in datasetType.dimensions.names
102 fields.append(
"NULL AS filter")
103 tables = self.
mapper.mappings[datasetType.name].tables
104 if tables
is None or len(tables) == 0:
105 self.
task.log.warn(
"Could not extract calibration ranges for %s in %s; "
106 "no tables in Gen2 mapper.",
107 datasetType.name, self.
root, tables[0])
109 query = f
"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
111 results = db.execute(query, (calibDate,))
112 except sqlite3.OperationalError
as e:
113 self.
task.log.warn(
"Could not extract calibration ranges for %s in %s from table %s: %r",
114 datasetType.name, self.
root, tables[0], e)
118 def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
121 calibFile = os.path.join(self.
root,
"calibRegistry.sqlite3")
125 if not os.path.exists(calibFile):
126 raise RuntimeError(
"Attempting to convert calibrations but no registry database"
127 f
" found in {self.root}")
132 timespansByDataId = defaultdict(list)
134 db = sqlite3.connect(calibFile)
135 db.row_factory = sqlite3.Row
137 for datasetType, datasetsByCalibDate
in datasets.items():
138 if not datasetType.isCalibration():
141 if "detector" in datasetType.dimensions.names:
142 gen2keys[self.
task.config.ccdKey] = int
143 if "physical_filter" in datasetType.dimensions.names:
144 gen2keys[
"filter"] = str
145 translator = self.
instrument.makeDataIdTranslatorFactory().makeMatching(
150 for calibDate, datasetsForCalibDate
in datasetsByCalibDate.items():
151 assert calibDate
is not None, (
"datasetType.isCalibration() is set by "
152 "the presence of calibDate in the Gen2 template")
158 for dataset
in datasetsForCalibDate:
159 refsByDataId.update((ref.dataId, ref)
for ref
in dataset.refs)
167 astropy.time.Time(row[
"validStart"], format=
"iso", scale=
"tai"),
168 astropy.time.Time(row[
"validEnd"], format=
"iso", scale=
"tai"),
172 if "detector" in datasetType.dimensions.names:
173 gen2id[self.
task.config.ccdKey] = row[self.
task.config.ccdKey]
174 if "physical_filter" in datasetType.dimensions.names:
175 gen2id[
"filter"] = row[
"filter"]
177 gen3id, _ = translator(gen2id)
178 dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
179 ref = refsByDataId.get(dataId)
185 timespansByDataId[(ref.dataId, ref.datasetType.name)].
append((timespan, ref))
195 "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
196 datasetType.name, calibDate, dataId
207 refsByTimespan = defaultdict(list)
210 max_gap = astropy.time.TimeDelta(1.001, format=
"jd", scale=
"tai")
215 info_messages =
set()
216 warn_messages =
set()
217 for timespans
in timespansByDataId.values():
219 sorted_timespans = sorted(timespans, key=
lambda x: x[0])
220 timespan_prev, ref_prev = sorted_timespans.pop(0)
221 for timespan, ref
in sorted_timespans:
223 delta = timespan.begin - timespan_prev.end
224 abs_delta =
abs(delta)
225 if abs_delta > 0
and abs_delta < max_gap:
228 msg = f
"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}"
229 info_messages.add(msg)
232 msg = f
"Calibration validity overlap of {abs(delta).to(u.s)} removed for period " \
233 f
"{timespan.begin} to {timespan_prev.end}"
234 warn_messages.add(msg)
236 self.
task.log.debug(
"Correcting validity range for %s with end %s",
237 ref_prev, timespan_prev.end)
242 timespan_prev = Timespan(begin=timespan_prev.begin,
246 refsByTimespan[timespan_prev].
append(ref_prev)
249 timespan_prev = timespan
253 refsByTimespan[timespan_prev].
append(ref_prev)
256 for msg
in sorted(info_messages):
257 self.
task.log.info(msg)
258 for msg
in sorted(warn_messages):
259 self.
task.log.warn(msg)
262 for timespan, refs
in refsByTimespan.items():
265 def getRun(self, datasetTypeName: str, calibDate: Optional[str] =
None) -> str:
266 if calibDate
is None:
267 return super().
getRun(datasetTypeName)
269 return self.
instrument.makeCollectionName(
"calib",
"gen2", calibDate)
275 """Gen2 mapper associated with this repository.