4 from dateutil
import parser
12 def _convertToDate(dateString):
13 """Convert a string into a date object"""
14 return parser.parse(dateString).date()
18 """Task that will parse the filename and/or its contents to get the
19 required information to populate the calibration registry."""
22 """Return a a known calibration dataset type using
23 the observation type in the header keyword OBSTYPE
25 @param filename: Input filename
28 if not md.exists(
"OBSTYPE"):
29 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30 (filename, self.
config.hdu))
31 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
34 elif "zero" in obstype
or "bias" in obstype:
36 elif "dark" in obstype:
38 elif "fringe" in obstype:
40 elif "sky" in obstype:
42 elif "illumcor" in obstype:
44 elif "defects" in obstype:
46 elif "qe_curve" in obstype:
48 elif "linearizer" in obstype:
49 obstype =
"linearizer"
50 elif "crosstalk" in obstype:
55 """Get destination for the file
57 @param butler Data butler
58 @param info File properties, used as dataId for the butler
59 @param filename Input filename
60 @return Destination filename
65 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
67 raw = butler.get(calibType +
"_filename", tempinfo)[0]
76 """Configuration for the CalibsRegisterTask"""
77 tables =
ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky",
"defects",
"qe_curve",
78 "linearizer",
"crosstalk"], doc=
"Names of tables")
79 calibDate =
Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
80 validStart =
Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
81 validEnd =
Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
82 detector =
ListField(dtype=str, default=[
"filter",
"ccd"],
83 doc=
"Columns that identify individual detectors")
84 validityUntilSuperseded =
ListField(dtype=str, default=[
"defects",
"qe_curve",
"linearizer",
"crosstalk"],
85 doc=
"Tables for which to set validity for a calib from when it is "
86 "taken until it is superseded by the next; validity in other tables "
87 "is calculated by applying the validity range.")
91 doc=
"Fix the off-by-one error by incrementing validEnd. See "
92 "fixSubsetValidity for more details.",
97 """Task that will generate the calibration registry for the Mapper"""
98 ConfigClass = CalibsRegisterConfig
100 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
101 """Open the registry and return the connection handle"""
102 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
105 """Create the registry tables"""
106 for table
in self.
config.tables:
107 RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
109 def addRow(self, conn, info, *args, **kwargs):
110 """Add a row to the file table"""
111 info[self.
config.validStart] =
None
112 info[self.
config.validEnd] =
None
113 RegisterTask.addRow(self, conn, info, *args, **kwargs)
116 """Loop over all tables, filters, and ccdnums,
117 and update the validity ranges in the registry.
119 @param conn: Database connection
120 @param validity: Validity range (days)
122 conn.row_factory = sqlite3.Row
123 cursor = conn.cursor()
125 tables = self.
config.tables
127 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.
config.detector), table)
129 rows = cursor.fetchall()
134 """Update the validity ranges among selected rows in the registry.
136 For defects and qe_curve, the products are valid from their start date until
137 they are superseded by subsequent defect data.
138 For other calibration products, the validity ranges are checked and
139 if there are overlaps, a midpoint is used to fix the overlaps,
140 so that the calibration data with whose date is nearest the date
141 of the observation is used.
143 DM generated calibrations contain a CALIB_ID header
144 keyword. These calibrations likely require the
145 incrementValidEnd configuration option set to True. Other
146 calibrations generate the calibDate via the DATE-OBS header
147 keyword, and likely require incrementValidEnd=False.
149 @param conn: Database connection
150 @param table: Name of table to be selected
151 @param detectorData: Values identifying a detector (from columns in self.config.detector)
152 @param validity: Validity range (days)
154 columns =
", ".join([self.
config.calibDate, self.
config.validStart, self.
config.validEnd])
155 sql =
"SELECT id, %s FROM %s" % (columns, table)
156 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.
config.detector)
157 sql +=
" ORDER BY " + self.
config.calibDate
158 cursor = conn.cursor()
159 cursor.execute(sql, detectorData)
160 rows = cursor.fetchall()
163 valids = collections.OrderedDict([(_convertToDate(row[self.
config.calibDate]), [
None,
None])
for
166 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.
config.detector, detectorData))
168 self.
log.
warn(
str(
"Skipped setting the validity overlaps for %s %s: missing calibration dates" %
171 dates =
list(valids.keys())
172 if table
in self.
config.validityUntilSuperseded:
174 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
175 valids[thisDate][0] = thisDate
176 valids[thisDate][1] = nextDate
177 valids[dates[-1]][0] = dates[-1]
178 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
182 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
185 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
186 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
187 if valids[date][1] > midpoint:
188 nextDate = dates[i + 1]
189 valids[nextDate][0] = midpoint + datetime.timedelta(1)
190 if self.
config.incrementValidEnd:
191 valids[date][1] = midpoint + datetime.timedelta(1)
193 valids[date][1] = midpoint
198 calibDate = _convertToDate(row[self.
config.calibDate])
199 validStart = valids[calibDate][0].isoformat()
200 validEnd = valids[calibDate][1].isoformat()
201 sql =
"UPDATE %s" % table
202 sql +=
" SET %s=?, %s=?" % (self.
config.validStart, self.
config.validEnd)
204 conn.execute(sql, (validStart, validEnd, row[
"id"]))
208 """Argument parser to support ingesting calibration images into the repository"""
211 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
212 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
213 default=
False, help=
"Don't perform any action?")
214 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
215 help=
"Mode of delivering the files to their destination")
216 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
217 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
218 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
219 help=
"Don't register files that have already been registered")
220 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
224 """Configuration for IngestCalibsTask"""
227 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
228 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
232 """Task that generates registry for calibration images"""
233 ConfigClass = IngestCalibsConfig
234 ArgumentParser = IngestCalibsArgumentParser
235 _DefaultName =
"ingestCalibs"
238 """Ingest all specified files and add them to the registry"""
239 calibRoot = args.calib
if args.calib
is not None else args.output
241 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
243 for infile
in filenameList:
244 fileInfo, hduInfoList = self.parse.
getInfo(infile)
245 calibType = self.parse.getCalibType(infile)
246 if calibType
not in self.register.config.tables:
247 self.
log.
warn(
str(
"Skipped adding %s of observation type '%s' to registry "
248 "(must be one of %s)" %
249 (infile, calibType,
", ".join(self.register.config.tables))))
251 calibTypes.add(calibType)
252 if args.mode !=
'skip':
253 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
254 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
256 self.
log.
warn(
str(
"Failed to ingest %s of observation type '%s'" %
257 (infile, calibType)))
259 if self.register.check(registry, fileInfo, table=calibType):
260 if args.ignoreIngested:
263 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
264 for info
in hduInfoList:
265 self.register.addRow(registry, info, dryrun=args.dryrun,
266 create=args.create, table=calibType)
268 self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
270 self.
log.
info(
"Would update validity ranges here, but dryrun")