21 from __future__
import annotations
23 __all__ = [
"CalibRepoConverter"]
28 from typing
import TYPE_CHECKING, Dict, Iterator, Tuple, Optional
30 from .repoConverter
import RepoConverter
31 from .repoWalker
import RepoWalker
32 from .translators
import makeCalibrationLabel
35 from lsst.daf.butler
import StorageClass, FormatterParameter
36 from .repoWalker.scanner
import PathElementHandler
37 from ..cameraMapper
import CameraMapper
38 from ..mapping
import Mapping
as CameraMapperMapping
42 """A specialization of `RepoConverter` for calibration repositories.
46 mapper : `CameraMapper`
47 Gen2 mapper for the data repository. The root associated with the
48 mapper is ignored and need not match the root of the repository.
50 Additional keyword arguments are forwarded to (and required by)
54 def __init__(self, *, mapper: CameraMapper, **kwds):
61 return datasetTypeName
in self.
task.config.curatedCalibrations
63 def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
65 yield from self.
mapper.calibrations.items()
68 storageClass: StorageClass, formatter: FormatterParameter =
None,
69 targetHandler: Optional[PathElementHandler] =
None,
70 ) -> RepoWalker.Target:
72 target = RepoWalker.Target(
73 datasetTypeName=datasetTypeName,
74 storageClass=storageClass,
77 instrument=self.
task.instrument.getName(),
78 universe=self.
task.registry.dimensions,
80 targetHandler=targetHandler,
81 translatorFactory=self.
task.translatorFactory,
92 calibFile = os.path.join(self.
root,
"calibRegistry.sqlite3")
97 if not os.path.exists(calibFile):
98 raise RuntimeError(
"Attempting to convert calibrations but no registry database"
99 f
" found in {self.root}")
100 db = sqlite3.connect(calibFile)
101 db.row_factory = sqlite3.Row
104 if "calibration_label" not in datasetType.dimensions:
106 fields = [
"validStart",
"validEnd",
"calibDate"]
107 if "detector" in datasetType.dimensions.names:
108 fields.append(self.
task.config.ccdKey)
110 fields.append(f
"NULL AS {self.task.config.ccdKey}")
111 if (
"physical_filter" in datasetType.dimensions.names
112 or "abstract_filter" in datasetType.dimensions.names):
113 fields.append(
"filter")
115 fields.append(
"NULL AS filter")
116 query = f
"SELECT DISTINCT {', '.join(fields)} FROM {datasetType.name};"
118 results = db.execute(query)
119 except sqlite3.OperationalError
as e:
120 if (self.
mapper.mappings[datasetType.name].tables
is None
121 or len(self.
mapper.mappings[datasetType.name].tables) == 0):
122 self.
task.log.warn(
"Could not extract calibration ranges for %s in %s: %r",
123 datasetType.name, self.
root, e)
126 name = self.
mapper.mappings[datasetType.name].tables[0]
127 query = f
"SELECT DISTINCT {', '.join(fields)} FROM {name};"
129 results = db.execute(query)
130 except sqlite3.OperationalError
as e:
131 self.
task.log.warn(
"Could not extract calibration ranges for %s in %s: %r",
132 datasetType.name, self.
root, e)
136 ccd=row[self.
task.config.ccdKey], filter=row[
"filter"])
139 day = astropy.time.TimeDelta(1, format=
"jd", scale=
"tai")
141 "instrument": self.
task.instrument.getName(),
143 "datetime_begin": astropy.time.Time(row[
"validStart"], format=
"iso", scale=
"tai"),
144 "datetime_end": astropy.time.Time(row[
"validEnd"], format=
"iso", scale=
"tai") + day
147 self.
task.registry.insertDimensionData(
"calibration_label", *records)
153 """Gen2 mapper associated with this repository.