4 from dateutil
import parser
12 def _convertToDate(dateString):
13 """Convert a string into a date object"""
14 return parser.parse(dateString).date()
18 """Task that will parse the filename and/or its contents to get the
19 required information to populate the calibration registry."""
22 """Return a a known calibration dataset type using
23 the observation type in the header keyword OBSTYPE
25 @param filename: Input filename
28 if not md.exists(
"OBSTYPE"):
29 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30 (filename, self.config.hdu))
31 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
34 elif "zero" in obstype
or "bias" in obstype:
36 elif "dark" in obstype:
38 elif "fringe" in obstype:
40 elif "sky" in obstype:
42 elif "illumcor" in obstype:
44 elif "defects" in obstype:
46 elif "qe_curve" in obstype:
48 elif "linearizer" in obstype:
49 obstype =
"linearizer"
50 elif "crosstalk" in obstype:
52 elif "BFK" in obstype:
54 elif "photodiode" in obstype:
55 obstype =
'photodiode'
59 """Get destination for the file
61 @param butler Data butler
62 @param info File properties, used as dataId for the butler
63 @param filename Input filename
64 @return Destination filename
69 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
71 raw = butler.get(calibType +
"_filename", tempinfo)[0]
80 """Configuration for the CalibsRegisterTask"""
81 tables =
ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky",
"defects",
"qe_curve",
82 "linearizer",
"crosstalk",
"bfk",
"photodiode"],
83 doc=
"Names of tables")
84 calibDate =
Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
85 validStart =
Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
86 validEnd =
Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
87 detector =
ListField(dtype=str, default=[
"filter",
"ccd"],
88 doc=
"Columns that identify individual detectors")
89 validityUntilSuperseded =
ListField(dtype=str, default=[
"defects",
"qe_curve",
"linearizer",
"crosstalk",
91 doc=
"Tables for which to set validity for a calib from when it is "
92 "taken until it is superseded by the next; validity in other tables "
93 "is calculated by applying the validity range.")
97 doc=
"Fix the off-by-one error by incrementing validEnd. See "
98 "fixSubsetValidity for more details.",
103 """Task that will generate the calibration registry for the Mapper"""
104 ConfigClass = CalibsRegisterConfig
106 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
107 """Open the registry and return the connection handle"""
108 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
111 """Create the registry tables"""
112 for table
in self.config.tables:
113 RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
115 def addRow(self, conn, info, *args, **kwargs):
116 """Add a row to the file table"""
117 info[self.config.validStart] =
None
118 info[self.config.validEnd] =
None
119 RegisterTask.addRow(self, conn, info, *args, **kwargs)
122 """Loop over all tables, filters, and ccdnums,
123 and update the validity ranges in the registry.
125 @param conn: Database connection
126 @param validity: Validity range (days)
128 conn.row_factory = sqlite3.Row
129 cursor = conn.cursor()
131 tables = self.config.tables
133 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.config.detector), table)
135 rows = cursor.fetchall()
140 """Update the validity ranges among selected rows in the registry.
142 For defects and qe_curve, the products are valid from their start date until
143 they are superseded by subsequent defect data.
144 For other calibration products, the validity ranges are checked and
145 if there are overlaps, a midpoint is used to fix the overlaps,
146 so that the calibration data with whose date is nearest the date
147 of the observation is used.
149 DM generated calibrations contain a CALIB_ID header
150 keyword. These calibrations likely require the
151 incrementValidEnd configuration option set to True. Other
152 calibrations generate the calibDate via the DATE-OBS header
153 keyword, and likely require incrementValidEnd=False.
155 @param conn: Database connection
156 @param table: Name of table to be selected
157 @param detectorData: Values identifying a detector (from columns in self.config.detector)
158 @param validity: Validity range (days)
160 columns =
", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
161 sql =
"SELECT id, %s FROM %s" % (columns, table)
162 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.config.detector)
163 sql +=
" ORDER BY " + self.config.calibDate
164 cursor = conn.cursor()
165 cursor.execute(sql, detectorData)
166 rows = cursor.fetchall()
169 valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [
None,
None])
for
172 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.config.detector, detectorData))
173 self.log.
warning(
"Skipped setting the validity overlaps for %s %s: missing calibration dates",
176 dates =
list(valids.keys())
177 if table
in self.config.validityUntilSuperseded:
179 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
180 valids[thisDate][0] = thisDate
181 valids[thisDate][1] = nextDate
182 valids[dates[-1]][0] = dates[-1]
183 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
187 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
190 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
191 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
192 if valids[date][1] > midpoint:
193 nextDate = dates[i + 1]
194 valids[nextDate][0] = midpoint + datetime.timedelta(1)
195 if self.config.incrementValidEnd:
196 valids[date][1] = midpoint + datetime.timedelta(1)
198 valids[date][1] = midpoint
203 calibDate = _convertToDate(row[self.config.calibDate])
204 validStart = valids[calibDate][0].isoformat()
205 validEnd = valids[calibDate][1].isoformat()
206 sql =
"UPDATE %s" % table
207 sql +=
" SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
209 conn.execute(sql, (validStart, validEnd, row[
"id"]))
213 """Argument parser to support ingesting calibration images into the repository"""
216 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
217 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
218 default=
False, help=
"Don't perform any action?")
219 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
220 help=
"Mode of delivering the files to their destination")
221 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
222 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
223 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
224 help=
"Don't register files that have already been registered")
225 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
229 """Configuration for IngestCalibsTask"""
232 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
233 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
237 """Task that generates registry for calibration images"""
238 ConfigClass = IngestCalibsConfig
239 ArgumentParser = IngestCalibsArgumentParser
240 _DefaultName =
"ingestCalibs"
243 """Ingest all specified files and add them to the registry"""
244 calibRoot = args.calib
if args.calib
is not None else args.output
245 filenameList = self.
expandFilesexpandFiles(args.files)
246 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
248 for infile
in filenameList:
249 fileInfo, hduInfoList = self.parse.
getInfo(infile)
250 calibType = self.parse.getCalibType(infile)
251 if calibType
not in self.register.config.tables:
252 self.log.
warning(
"Skipped adding %s of observation type '%s' to registry "
253 "(must be one of %s)",
254 infile, calibType,
", ".join(self.register.config.tables))
256 calibTypes.add(calibType)
257 if args.mode !=
'skip':
258 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
259 ingested = self.
ingestingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
261 self.log.
warning(
"Failed to ingest %s of observation type '%s'",
264 if self.register.check(registry, fileInfo, table=calibType):
265 if args.ignoreIngested:
268 self.log.
warning(
"%s: already ingested: %s", infile, fileInfo)
269 for info
in hduInfoList:
270 self.register.addRow(registry, info, dryrun=args.dryrun,
271 create=args.create, table=calibType)
273 self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
275 self.log.
info(
"Would update validity ranges here, but dryrun")
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def ingest(self, infile, outfile, mode="move", dryrun=False)
def getCalibType(self, filename)
def getDestination(self, butler, info, filename)
def fixSubsetValidity(self, conn, table, detectorData, validity)
def addRow(self, conn, info, *args, **kwargs)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
def createTable(self, conn, forceCreateTables=False)
def updateValidityRanges(self, conn, validity, tables=None)
def __init__(self, *args, **kwargs)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=DEFAULT_HDU, bool strip=false)
Read FITS header.