LSSTApplications  17.0+42,17.0+72,17.0+8,18.0.0,18.0.0+1,18.0.0+2,18.0.0+4,18.0.0+5,18.0.0+7,18.0.0+9,18.0.0-1-g0001055+1,18.0.0-1-g1349e88+1,18.0.0-1-g2505f39,18.0.0-1-g5e4b7ea+1,18.0.0-1-g85f8cd4+1,18.0.0-1-g8d41266+4,18.0.0-1-g9a6769a+1,18.0.0-1-ge10677a,18.0.0-1-gf8bf62c+1,18.0.0-2-g000ad9a+1,18.0.0-2-g0ee56d7+4,18.0.0-2-g31c43f9,18.0.0-2-g9c63283,18.0.0-2-gdf0b915+1,18.0.0-2-gf03bb23,18.0.0-3-g230d21f,18.0.0-3-gb4677f3+7,18.0.0-4-g329782e+1,18.0.0-5-ga38416e7+6,18.0.0-5-gba33251+1,18.0.0-6-gc37ddf8+1,18.0.0-8-g01e196b65,18.0.0-8-g24ce6f0f+1,18.0.0-8-g697fdba,w.2019.29
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 from dateutil import parser
5 
6 from lsst.afw.fits import readMetadata
7 from lsst.pex.config import Config, Field, ListField, ConfigurableField
8 from lsst.pipe.base import InputOnlyArgumentParser
9 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
10 
11 
12 def _convertToDate(dateString):
13  """Convert a string into a date object"""
14  return parser.parse(dateString).date()
15 
16 
18  """Task that will parse the filename and/or its contents to get the
19  required information to populate the calibration registry."""
20 
21  def getCalibType(self, filename):
22  """Return a a known calibration dataset type using
23  the observation type in the header keyword OBSTYPE
24 
25  @param filename: Input filename
26  """
27  md = readMetadata(filename, self.config.hdu)
28  if not md.exists("OBSTYPE"):
29  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30  (filename, self.config.hdu))
31  obstype = md.getScalar("OBSTYPE").strip().lower()
32  if "flat" in obstype:
33  obstype = "flat"
34  elif "zero" in obstype or "bias" in obstype:
35  obstype = "bias"
36  elif "dark" in obstype:
37  obstype = "dark"
38  elif "fringe" in obstype:
39  obstype = "fringe"
40  elif "sky" in obstype:
41  obstype = "sky"
42  elif "illumcor" in obstype:
43  obstype = "illumcor"
44  elif "defects" in obstype:
45  obstype = "defects"
46  return obstype
47 
48  def getDestination(self, butler, info, filename):
49  """Get destination for the file
50 
51  @param butler Data butler
52  @param info File properties, used as dataId for the butler
53  @param filename Input filename
54  @return Destination filename
55  """
56  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
57  # The Butler should handle this behind-the-scenes in the future.
58  # Please reference DM-9873 and delete this comment once it is resolved.
59  tempinfo = {k: v for (k, v) in info.items() if v is not None}
60  calibType = self.getCalibType(filename)
61  raw = butler.get(calibType + "_filename", tempinfo)[0]
62  # Ensure filename is devoid of cfitsio directions about HDUs
63  c = raw.find("[")
64  if c > 0:
65  raw = raw[:c]
66  return raw
67 
68 
70  """Configuration for the CalibsRegisterTask"""
71  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky", "defects"],
72  doc="Names of tables")
73  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
74  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
75  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
76  detector = ListField(dtype=str, default=["filter", "ccd"],
77  doc="Columns that identify individual detectors")
78  validityUntilSuperseded = ListField(dtype=str, default=["defects"],
79  doc="Tables for which to set validity for a calib from when it is "
80  "taken until it is superseded by the next; validity in other tables "
81  "is calculated by applying the validity range.")
82 
83 
85  """Task that will generate the calibration registry for the Mapper"""
86  ConfigClass = CalibsRegisterConfig
87 
88  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
89  """Open the registry and return the connection handle"""
90  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
91 
92  def createTable(self, conn, forceCreateTables=False):
93  """Create the registry tables"""
94  for table in self.config.tables:
95  RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
96 
97  def addRow(self, conn, info, *args, **kwargs):
98  """Add a row to the file table"""
99  info[self.config.validStart] = None
100  info[self.config.validEnd] = None
101  RegisterTask.addRow(self, conn, info, *args, **kwargs)
102 
103  def updateValidityRanges(self, conn, validity, tables=None):
104  """Loop over all tables, filters, and ccdnums,
105  and update the validity ranges in the registry.
106 
107  @param conn: Database connection
108  @param validity: Validity range (days)
109  """
110  conn.row_factory = sqlite3.Row
111  cursor = conn.cursor()
112  if tables is None:
113  tables = self.config.tables
114  for table in tables:
115  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
116  cursor.execute(sql)
117  rows = cursor.fetchall()
118  for row in rows:
119  self.fixSubsetValidity(conn, table, row, validity)
120 
121  def fixSubsetValidity(self, conn, table, detectorData, validity):
122  """Update the validity ranges among selected rows in the registry.
123 
124  For defects, the products are valid from their start date until
125  they are superseded by subsequent defect data.
126  For other calibration products, the validity ranges are checked and
127  if there are overlaps, a midpoint is used to fix the overlaps,
128  so that the calibration data with whose date is nearest the date
129  of the observation is used.
130 
131  @param conn: Database connection
132  @param table: Name of table to be selected
133  @param detectorData: Values identifying a detector (from columns in self.config.detector)
134  @param validity: Validity range (days)
135  """
136  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
137  sql = "SELECT id, %s FROM %s" % (columns, table)
138  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
139  sql += " ORDER BY " + self.config.calibDate
140  cursor = conn.cursor()
141  cursor.execute(sql, detectorData)
142  rows = cursor.fetchall()
143 
144  try:
145  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
146  row in rows])
147  except Exception:
148  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
149  # Sqlite returns unicode strings, which cannot be passed through SWIG.
150  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
151  (table, det)))
152  return
153  dates = list(valids.keys())
154  if table in self.config.validityUntilSuperseded:
155  # A calib is valid until it is superseded
156  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
157  valids[thisDate][0] = thisDate
158  valids[thisDate][1] = nextDate - datetime.timedelta(1)
159  valids[dates[-1]][0] = dates[-1]
160  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
161  else:
162  # A calib is valid within the validity range (in days) specified.
163  for dd in dates:
164  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
165  # Fix the dates so that they do not overlap, which can cause the butler to find a
166  # non-unique calib.
167  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
168  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
169  if valids[date][1] > midpoint:
170  nextDate = dates[i + 1]
171  valids[nextDate][0] = midpoint + datetime.timedelta(1)
172  valids[date][1] = midpoint
173  del midpoints
174  del dates
175  # Update the validity data in the registry
176  for row in rows:
177  calibDate = _convertToDate(row[self.config.calibDate])
178  validStart = valids[calibDate][0].isoformat()
179  validEnd = valids[calibDate][1].isoformat()
180  sql = "UPDATE %s" % table
181  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
182  sql += " WHERE id=?"
183  conn.execute(sql, (validStart, validEnd, row["id"]))
184 
185 
187  """Argument parser to support ingesting calibration images into the repository"""
188 
189  def __init__(self, *args, **kwargs):
190  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
191  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
192  default=False, help="Don't perform any action?")
193  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
194  help="Mode of delivering the files to their destination")
195  self.add_argument("--create", action="store_true", help="Create new registry?")
196  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
197  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
198  help="Don't register files that have already been registered")
199  self.add_argument("files", nargs="+", help="Names of file")
200 
201 
203  """Configuration for IngestCalibsTask"""
204  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
205  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
206  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
207  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
208 
209 
211  """Task that generates registry for calibration images"""
212  ConfigClass = IngestCalibsConfig
213  ArgumentParser = IngestCalibsArgumentParser
214  _DefaultName = "ingestCalibs"
215 
216  def run(self, args):
217  """Ingest all specified files and add them to the registry"""
218  calibRoot = args.calib if args.calib is not None else args.output
219  filenameList = self.expandFiles(args.files)
220  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
221  calibTypes = set()
222  for infile in filenameList:
223  fileInfo, hduInfoList = self.parse.getInfo(infile)
224  calibType = self.parse.getCalibType(infile)
225  if calibType not in self.register.config.tables:
226  self.log.warn(str("Skipped adding %s of observation type '%s' to registry "
227  "(must be one of %s)" %
228  (infile, calibType, ", ".join(self.register.config.tables))))
229  continue
230  calibTypes.add(calibType)
231  if args.mode != 'skip':
232  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
233  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
234  if not ingested:
235  self.log.warn(str("Failed to ingest %s of observation type '%s'" %
236  (infile, calibType)))
237  continue
238  if self.register.check(registry, fileInfo, table=calibType):
239  if args.ignoreIngested:
240  continue
241 
242  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
243  for info in hduInfoList:
244  self.register.addRow(registry, info, dryrun=args.dryrun,
245  create=args.create, table=calibType)
246  if not args.dryrun:
247  self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
248  else:
249  self.log.info("Would update validity ranges here, but dryrun")
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:427
def createTable(self, conn, forceCreateTables=False)
Definition: ingestCalibs.py:92
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:499
def addRow(self, conn, info, args, kwargs)
Definition: ingestCalibs.py:97
daf::base::PropertySet * set
Definition: fits.cc:884
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:48
def updateValidityRanges(self, conn, validity, tables=None)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
Definition: ingestCalibs.py:88
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list
Definition: fits.cc:885
bool strip
Definition: fits.cc:883