4 from dateutil
import parser
7 from lsst.pex.config
import Config, Field, ListField, ConfigurableField
12 def _convertToDate(dateString):
13 """Convert a string into a date object""" 14 return parser.parse(dateString).date()
18 """Task that will parse the filename and/or its contents to get the 19 required information to populate the calibration registry.""" 22 """Return a a known calibration dataset type using 23 the observation type in the header keyword OBSTYPE 25 @param filename: Input filename 28 if not md.exists(
"OBSTYPE"):
29 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30 (filename, self.
config.hdu))
31 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
34 elif "zero" in obstype
or "bias" in obstype:
36 elif "dark" in obstype:
38 elif "fringe" in obstype:
40 elif "sky" in obstype:
42 elif "illumcor" in obstype:
44 elif "defects" in obstype:
49 """Get destination for the file 51 @param butler Data butler 52 @param info File properties, used as dataId for the butler 53 @param filename Input filename 54 @return Destination filename 59 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
61 raw = butler.get(calibType +
"_filename", tempinfo)[0]
70 """Configuration for the CalibsRegisterTask""" 71 tables = ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky",
"defects"],
72 doc=
"Names of tables")
73 calibDate = Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
74 validStart = Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
75 validEnd = Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
76 detector = ListField(dtype=str, default=[
"filter",
"ccd"],
77 doc=
"Columns that identify individual detectors")
78 validityUntilSuperseded = ListField(dtype=str, default=[
"defects"],
79 doc=
"Tables for which to set validity for a calib from when it is " 80 "taken until it is superseded by the next; validity in other tables " 81 "is calculated by applying the validity range.")
85 """Task that will generate the calibration registry for the Mapper""" 86 ConfigClass = CalibsRegisterConfig
88 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
89 """Open the registry and return the connection handle""" 90 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
93 """Create the registry tables""" 94 for table
in self.
config.tables:
95 RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
97 def addRow(self, conn, info, *args, **kwargs):
98 """Add a row to the file table""" 99 info[self.
config.validStart] =
None 100 info[self.
config.validEnd] =
None 101 RegisterTask.addRow(self, conn, info, *args, **kwargs)
104 """Loop over all tables, filters, and ccdnums, 105 and update the validity ranges in the registry. 107 @param conn: Database connection 108 @param validity: Validity range (days) 110 conn.row_factory = sqlite3.Row
111 cursor = conn.cursor()
113 tables = self.
config.tables
115 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.
config.detector), table)
117 rows = cursor.fetchall()
122 """Update the validity ranges among selected rows in the registry. 124 For defects, the products are valid from their start date until 125 they are superseded by subsequent defect data. 126 For other calibration products, the validity ranges are checked and 127 if there are overlaps, a midpoint is used to fix the overlaps, 128 so that the calibration data with whose date is nearest the date 129 of the observation is used. 131 @param conn: Database connection 132 @param table: Name of table to be selected 133 @param detectorData: Values identifying a detector (from columns in self.config.detector) 134 @param validity: Validity range (days) 136 columns =
", ".join([self.
config.calibDate, self.
config.validStart, self.
config.validEnd])
137 sql =
"SELECT id, %s FROM %s" % (columns, table)
138 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.
config.detector)
139 sql +=
" ORDER BY " + self.
config.calibDate
140 cursor = conn.cursor()
141 cursor.execute(sql, detectorData)
142 rows = cursor.fetchall()
145 valids = collections.OrderedDict([(_convertToDate(row[self.
config.calibDate]), [
None,
None])
for 148 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.
config.detector, detectorData))
150 self.
log.
warn(str(
"Skipped setting the validity overlaps for %s %s: missing calibration dates" %
153 dates =
list(valids.keys())
154 if table
in self.
config.validityUntilSuperseded:
156 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
157 valids[thisDate][0] = thisDate
158 valids[thisDate][1] = nextDate - datetime.timedelta(1)
159 valids[dates[-1]][0] = dates[-1]
160 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
164 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
167 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
168 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
169 if valids[date][1] > midpoint:
170 nextDate = dates[i + 1]
171 valids[nextDate][0] = midpoint + datetime.timedelta(1)
172 valids[date][1] = midpoint
177 calibDate = _convertToDate(row[self.
config.calibDate])
178 validStart = valids[calibDate][0].isoformat()
179 validEnd = valids[calibDate][1].isoformat()
180 sql =
"UPDATE %s" % table
181 sql +=
" SET %s=?, %s=?" % (self.
config.validStart, self.
config.validEnd)
183 conn.execute(sql, (validStart, validEnd, row[
"id"]))
187 """Argument parser to support ingesting calibration images into the repository""" 190 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
191 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
192 default=
False, help=
"Don't perform any action?")
193 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
194 help=
"Mode of delivering the files to their destination")
195 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
196 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
197 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
198 help=
"Don't register files that have already been registered")
199 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
203 """Configuration for IngestCalibsTask""" 204 parse = ConfigurableField(target=CalibsParseTask, doc=
"File parsing")
205 register = ConfigurableField(target=CalibsRegisterTask, doc=
"Registry entry")
206 allowError = Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
207 clobber = Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
211 """Task that generates registry for calibration images""" 212 ConfigClass = IngestCalibsConfig
213 ArgumentParser = IngestCalibsArgumentParser
214 _DefaultName =
"ingestCalibs" 217 """Ingest all specified files and add them to the registry""" 218 calibRoot = args.calib
if args.calib
is not None else args.output
220 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
222 for infile
in filenameList:
223 fileInfo, hduInfoList = self.parse.
getInfo(infile)
224 calibType = self.parse.getCalibType(infile)
225 if calibType
not in self.register.config.tables:
226 self.
log.
warn(str(
"Skipped adding %s of observation type '%s' to registry " 227 "(must be one of %s)" %
228 (infile, calibType,
", ".join(self.register.config.tables))))
230 calibTypes.add(calibType)
231 if args.mode !=
'skip':
232 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
233 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
235 self.
log.
warn(str(
"Failed to ingest %s of observation type '%s'" %
236 (infile, calibType)))
238 if self.register.check(registry, fileInfo, table=calibType):
239 if args.ignoreIngested:
242 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
243 for info
in hduInfoList:
244 self.register.addRow(registry, info, dryrun=args.dryrun,
245 create=args.create, table=calibType)
247 self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
249 self.
log.
info(
"Would update validity ranges here, but dryrun")
def __init__(self, args, kwargs)
def ingest(self, infile, outfile, mode="move", dryrun=False)
def createTable(self, conn, forceCreateTables=False)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def addRow(self, conn, info, args, kwargs)
daf::base::PropertySet * set
def getCalibType(self, filename)
def getDestination(self, butler, info, filename)
def updateValidityRanges(self, conn, validity, tables=None)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list