4 from dateutil
import parser
7 from lsst.pex.config
import Config, Field, ListField, ConfigurableField
12 def _convertToDate(dateString):
13 """Convert a string into a date object"""
14 return parser.parse(dateString).date()
18 """Task that will parse the filename and/or its contents to get the
19 required information to populate the calibration registry."""
22 """Return a a known calibration dataset type using
23 the observation type in the header keyword OBSTYPE
25 @param filename: Input filename
28 if not md.exists(
"OBSTYPE"):
29 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30 (filename, self.
config.hdu))
31 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
34 elif "zero" in obstype
or "bias" in obstype:
36 elif "dark" in obstype:
38 elif "fringe" in obstype:
40 elif "sky" in obstype:
42 elif "illumcor" in obstype:
44 elif "defects" in obstype:
46 elif "qe_curve" in obstype:
48 elif "linearizer" in obstype:
49 obstype =
"linearizer"
53 """Get destination for the file
55 @param butler Data butler
56 @param info File properties, used as dataId for the butler
57 @param filename Input filename
58 @return Destination filename
63 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
65 raw = butler.get(calibType +
"_filename", tempinfo)[0]
74 """Configuration for the CalibsRegisterTask"""
75 tables = ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky",
"defects",
"qe_curve",
76 "linearizer"], doc=
"Names of tables")
77 calibDate = Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
78 validStart = Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
79 validEnd = Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
80 detector = ListField(dtype=str, default=[
"filter",
"ccd"],
81 doc=
"Columns that identify individual detectors")
82 validityUntilSuperseded = ListField(dtype=str, default=[
"defects",
"qe_curve",
"linearizer"],
83 doc=
"Tables for which to set validity for a calib from when it is "
84 "taken until it is superseded by the next; validity in other tables "
85 "is calculated by applying the validity range.")
89 """Task that will generate the calibration registry for the Mapper"""
90 ConfigClass = CalibsRegisterConfig
92 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
93 """Open the registry and return the connection handle"""
94 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
97 """Create the registry tables"""
98 for table
in self.
config.tables:
99 RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
101 def addRow(self, conn, info, *args, **kwargs):
102 """Add a row to the file table"""
103 info[self.
config.validStart] =
None
104 info[self.
config.validEnd] =
None
105 RegisterTask.addRow(self, conn, info, *args, **kwargs)
108 """Loop over all tables, filters, and ccdnums,
109 and update the validity ranges in the registry.
111 @param conn: Database connection
112 @param validity: Validity range (days)
114 conn.row_factory = sqlite3.Row
115 cursor = conn.cursor()
117 tables = self.
config.tables
119 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.
config.detector), table)
121 rows = cursor.fetchall()
126 """Update the validity ranges among selected rows in the registry.
128 For defects and qe_curve, the products are valid from their start date until
129 they are superseded by subsequent defect data.
130 For other calibration products, the validity ranges are checked and
131 if there are overlaps, a midpoint is used to fix the overlaps,
132 so that the calibration data with whose date is nearest the date
133 of the observation is used.
135 @param conn: Database connection
136 @param table: Name of table to be selected
137 @param detectorData: Values identifying a detector (from columns in self.config.detector)
138 @param validity: Validity range (days)
140 columns =
", ".join([self.
config.calibDate, self.
config.validStart, self.
config.validEnd])
141 sql =
"SELECT id, %s FROM %s" % (columns, table)
142 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.
config.detector)
143 sql +=
" ORDER BY " + self.
config.calibDate
144 cursor = conn.cursor()
145 cursor.execute(sql, detectorData)
146 rows = cursor.fetchall()
149 valids = collections.OrderedDict([(_convertToDate(row[self.
config.calibDate]), [
None,
None])
for
152 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.
config.detector, detectorData))
154 self.
log.
warn(str(
"Skipped setting the validity overlaps for %s %s: missing calibration dates" %
157 dates =
list(valids.keys())
158 if table
in self.
config.validityUntilSuperseded:
160 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
161 valids[thisDate][0] = thisDate
162 valids[thisDate][1] = nextDate - datetime.timedelta(1)
163 valids[dates[-1]][0] = dates[-1]
164 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
168 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
171 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
172 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
173 if valids[date][1] > midpoint:
174 nextDate = dates[i + 1]
175 valids[nextDate][0] = midpoint + datetime.timedelta(1)
176 valids[date][1] = midpoint
181 calibDate = _convertToDate(row[self.
config.calibDate])
182 validStart = valids[calibDate][0].isoformat()
183 validEnd = valids[calibDate][1].isoformat()
184 sql =
"UPDATE %s" % table
185 sql +=
" SET %s=?, %s=?" % (self.
config.validStart, self.
config.validEnd)
187 conn.execute(sql, (validStart, validEnd, row[
"id"]))
191 """Argument parser to support ingesting calibration images into the repository"""
194 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
195 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
196 default=
False, help=
"Don't perform any action?")
197 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
198 help=
"Mode of delivering the files to their destination")
199 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
200 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
201 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
202 help=
"Don't register files that have already been registered")
203 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
207 """Configuration for IngestCalibsTask"""
208 parse = ConfigurableField(target=CalibsParseTask, doc=
"File parsing")
209 register = ConfigurableField(target=CalibsRegisterTask, doc=
"Registry entry")
210 allowError = Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
211 clobber = Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
215 """Task that generates registry for calibration images"""
216 ConfigClass = IngestCalibsConfig
217 ArgumentParser = IngestCalibsArgumentParser
218 _DefaultName =
"ingestCalibs"
221 """Ingest all specified files and add them to the registry"""
222 calibRoot = args.calib
if args.calib
is not None else args.output
224 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
226 for infile
in filenameList:
227 fileInfo, hduInfoList = self.parse.
getInfo(infile)
228 calibType = self.parse.getCalibType(infile)
229 if calibType
not in self.register.config.tables:
230 self.
log.
warn(str(
"Skipped adding %s of observation type '%s' to registry "
231 "(must be one of %s)" %
232 (infile, calibType,
", ".join(self.register.config.tables))))
234 calibTypes.add(calibType)
235 if args.mode !=
'skip':
236 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
237 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
239 self.
log.
warn(str(
"Failed to ingest %s of observation type '%s'" %
240 (infile, calibType)))
242 if self.register.check(registry, fileInfo, table=calibType):
243 if args.ignoreIngested:
246 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
247 for info
in hduInfoList:
248 self.register.addRow(registry, info, dryrun=args.dryrun,
249 create=args.create, table=calibType)
251 self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
253 self.
log.
info(
"Would update validity ranges here, but dryrun")