21 from __future__
import annotations
23 __all__ = [
"CalibRepoConverter"]
25 from collections
import defaultdict
28 from typing
import TYPE_CHECKING, Dict, Iterator, List, Mapping, Sequence, Tuple, Optional
31 import astropy.units
as u
33 from lsst.daf.butler
import DataCoordinate, FileDataset, Timespan
34 from .repoConverter
import RepoConverter
35 from .repoWalker
import RepoWalker
38 from lsst.daf.butler
import DatasetType, StorageClass, FormatterParameter
39 from .repoWalker.scanner
import PathElementHandler
40 from ..cameraMapper
import CameraMapper
41 from ..mapping
import Mapping
as CameraMapperMapping
45 """A specialization of `RepoConverter` for calibration repositories.
49 mapper : `CameraMapper`
50 Gen2 mapper for the data repository. The root associated with the
51 mapper is ignored and need not match the root of the repository.
52 labels : `Sequence` [ `str` ]
53 Strings injected into the names of the collections that calibration
54 datasets are written and certified into (forwarded as the ``extra``
55 argument to `Instrument` methods that generate collection names and
56 write curated calibrations).
58 Additional keyword arguments are forwarded to (and required by)
62 def __init__(self, *, mapper: CameraMapper, labels: Sequence[str] = (), **kwargs):
65 self.
collectioncollection = self.
tasktask.instrument.makeCalibrationCollectionName(*labels)
66 self.
_labels_labels = tuple(labels)
71 return datasetTypeName
in self.
instrumentinstrument.getCuratedCalibrationNames()
73 def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
75 yield from self.
mappermapper.calibrations.items()
78 storageClass: StorageClass, formatter: FormatterParameter =
None,
79 targetHandler: Optional[PathElementHandler] =
None,
80 ) -> RepoWalker.Target:
82 target = RepoWalker.Target(
83 datasetTypeName=datasetTypeName,
84 storageClass=storageClass,
87 instrument=self.
tasktask.instrument.getName(),
88 universe=self.
tasktask.registry.dimensions,
90 targetHandler=targetHandler,
91 translatorFactory=self.
tasktask.translatorFactory,
96 def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
97 ) -> Iterator[sqlite3.Row]:
98 """Query the Gen2 calibration registry for the validity ranges and
99 optionally detectors and filters associated with the given dataset type
104 db : `sqlite3.Connection`
105 DBAPI connection to the Gen2 ``calibRegistry.sqlite3`` file.
106 datasetType : `DatasetType`
107 Gen3 dataset type being queried.
109 String extracted from the ``calibDate`` template entry in Gen2
115 SQLite result object; will have ``validStart`` and ``validEnd``
116 columns, may have a detector column (named
117 ``self.task.config.ccdKey``) and/or a ``filter`` column, depending
118 on whether ``datasetType.dimensions`` includes ``detector`` and
119 ``physical_filter``, respectively.
121 fields = [
"validStart",
"validEnd"]
122 if "detector" in datasetType.dimensions.names:
123 fields.append(self.
tasktask.config.ccdKey)
125 fields.append(f
"NULL AS {self.task.config.ccdKey}")
126 if "physical_filter" in datasetType.dimensions.names:
127 fields.append(
"filter")
129 assert "band" not in datasetType.dimensions.names
130 fields.append(
"NULL AS filter")
131 tables = self.
mappermapper.mappings[datasetType.name].tables
132 if tables
is None or len(tables) == 0:
133 self.
tasktask.log.warn(
"Could not extract calibration ranges for %s in %s; "
134 "no tables in Gen2 mapper.",
135 datasetType.name, self.
rootroot, tables[0])
137 query = f
"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
139 results = db.execute(query, (calibDate,))
140 except sqlite3.OperationalError
as e:
141 self.
tasktask.log.warn(
"Could not extract calibration ranges for %s in %s from table %s: %r",
142 datasetType.name, self.
rootroot, tables[0], e)
146 def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
150 calibFile = os.path.join(self.
rootroot,
"calibRegistry.sqlite3")
154 if not os.path.exists(calibFile):
155 raise RuntimeError(
"Attempting to convert calibrations but no registry database"
156 f
" found in {self.root}")
161 timespansByDataId = defaultdict(list)
163 db = sqlite3.connect(calibFile)
164 db.row_factory = sqlite3.Row
166 for datasetType, datasetsByCalibDate
in datasets.items():
167 if not datasetType.isCalibration():
170 if "detector" in datasetType.dimensions.names:
171 gen2keys[self.
tasktask.config.ccdKey] = int
172 if "physical_filter" in datasetType.dimensions.names:
173 gen2keys[
"filter"] = str
174 translator = self.
instrumentinstrument.makeDataIdTranslatorFactory().makeMatching(
179 for calibDate, datasetsForCalibDate
in datasetsByCalibDate.items():
180 assert calibDate
is not None, (
"datasetType.isCalibration() is set by "
181 "the presence of calibDate in the Gen2 template")
187 for dataset
in datasetsForCalibDate:
188 refsByDataId.update((ref.dataId, ref)
for ref
in dataset.refs)
196 astropy.time.Time(row[
"validStart"], format=
"iso", scale=
"tai"),
197 astropy.time.Time(row[
"validEnd"], format=
"iso", scale=
"tai"),
201 if "detector" in datasetType.dimensions.names:
202 gen2id[self.
tasktask.config.ccdKey] = row[self.
tasktask.config.ccdKey]
203 if "physical_filter" in datasetType.dimensions.names:
204 gen2id[
"filter"] = row[
"filter"]
206 gen3id, _ = translator(gen2id)
207 dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
208 ref = refsByDataId.get(dataId)
214 timespansByDataId[(ref.dataId, ref.datasetType.name)].
append((timespan, ref))
223 self.
tasktask.log.debug(
224 "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
225 datasetType.name, calibDate, dataId
236 refsByTimespan = defaultdict(list)
239 max_gap = astropy.time.TimeDelta(1.001, format=
"jd", scale=
"tai")
244 info_messages =
set()
245 warn_messages =
set()
246 for timespans
in timespansByDataId.values():
248 sorted_timespans = sorted(timespans, key=
lambda x: x[0])
249 timespan_prev, ref_prev = sorted_timespans.pop(0)
250 for timespan, ref
in sorted_timespans:
252 delta = timespan.begin - timespan_prev.end
253 abs_delta =
abs(delta)
254 if abs_delta > 0
and abs_delta < max_gap:
257 msg = f
"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}"
258 info_messages.add(msg)
261 msg = f
"Calibration validity overlap of {abs(delta).to(u.s)} removed for period " \
262 f
"{timespan.begin} to {timespan_prev.end}"
263 warn_messages.add(msg)
265 self.
tasktask.log.debug(
"Correcting validity range for %s with end %s",
266 ref_prev, timespan_prev.end)
271 timespan_prev = Timespan(begin=timespan_prev.begin,
275 refsByTimespan[timespan_prev].
append(ref_prev)
278 timespan_prev = timespan
282 refsByTimespan[timespan_prev].
append(ref_prev)
285 for msg
in sorted(info_messages):
286 self.
tasktask.log.info(msg)
287 for msg
in sorted(warn_messages):
288 self.
tasktask.log.warn(msg)
291 for timespan, refs
in refsByTimespan.items():
292 self.
tasktask.registry.certify(self.
collectioncollection, refs, timespan)
294 def getRun(self, datasetTypeName: str, calibDate: Optional[str] =
None) -> str:
296 if calibDate
is None:
297 return super().
getRun(datasetTypeName)
299 return self.
instrumentinstrument.makeCalibrationCollectionName(
301 self.
instrumentinstrument.formatCollectionTimestamp(calibDate),
308 """Gen2 mapper associated with this repository.
str getRun(self, str datasetTypeName, Optional[str] calibDate=None)
Iterator[sqlite3.Row] _queryGen2CalibRegistry(self, sqlite3.Connection db, DatasetType datasetType, str calibDate)
def __init__(self, *CameraMapper mapper, Sequence[str] labels=(), **kwargs)
bool isDatasetTypeSpecial(self, str datasetTypeName)
RepoWalker.Target makeRepoWalkerTarget(self, str datasetTypeName, str template, Dict[str, type] keys, StorageClass storageClass, FormatterParameter formatter=None, Optional[PathElementHandler] targetHandler=None)
Iterator[Tuple[str, CameraMapperMapping]] iterMappings(self)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
std::string const & getName() const noexcept
Return a filter's name.
Angle abs(Angle const &a)
daf::base::PropertySet * set