LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import itertools
4 import sqlite3
5 from glob import glob
6 import lsst.afw.image as afwImage
7 from lsst.pex.config import Config, Field, ListField, ConfigurableField
8 from lsst.pipe.base import InputOnlyArgumentParser
9 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
10 
11 
12 def _convertToDate(dateString):
13  """Convert a string into a date object"""
14  return datetime.datetime.strptime(dateString, "%Y-%m-%d").date()
15 
16 
18  """Task that will parse the filename and/or its contents to get the
19  required information to populate the calibration registry."""
20  def getCalibType(self, filename):
21  """Return a a known calibration dataset type using
22  the observation type in the header keyword OBSTYPE
23 
24  @param filename: Input filename
25  """
26  md = afwImage.readMetadata(filename, self.config.hdu)
27  if not md.exists("OBSTYPE"):
28  raise RuntimeError("Unable to find the required header keyword OBSTYPE")
29  obstype = md.get("OBSTYPE").strip().lower()
30  if "flat" in obstype:
31  obstype = "flat"
32  elif "zero" in obstype or "bias" in obstype:
33  obstype = "bias"
34  elif "dark" in obstype:
35  obstype = "dark"
36  elif "fringe" in obstype:
37  obstype = "fringe"
38  return obstype
39 
40 
42  """Configuration for the CalibsRegisterTask"""
43  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe"], doc="Names of tables")
44  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
45  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
46  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
47  detector = ListField(dtype=str, default=["filter", "ccd"],
48  doc="Columns that identify individual detectors")
49  validityUntilSuperseded = ListField(dtype=str, default=["defect"],
50  doc="Tables for which to set validity for a calib from when it is "
51  "taken until it is superseded by the next; validity in other tables "
52  "is calculated by applying the validity range.")
53 
55  """Task that will generate the calibration registry for the Mapper"""
56  ConfigClass = CalibsRegisterConfig
57 
58  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
59  """Open the registry and return the connection handle"""
60  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
61 
62  def createTable(self, conn):
63  """Create the registry tables"""
64  for table in self.config.tables:
65  RegisterTask.createTable(self, conn, table=table)
66 
67  def addRow(self, conn, info, *args, **kwargs):
68  """Add a row to the file table"""
69  info[self.config.validStart] = None
70  info[self.config.validEnd] = None
71  RegisterTask.addRow(self, conn, info, *args, **kwargs)
72 
73  def updateValidityRanges(self, conn, validity):
74  """Loop over all tables, filters, and ccdnums,
75  and update the validity ranges in the registry.
76 
77  @param conn: Database connection
78  @param validity: Validity range (days)
79  """
80  conn.row_factory = sqlite3.Row
81  cursor = conn.cursor()
82  for table in self.config.tables:
83  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
84  cursor.execute(sql)
85  rows = cursor.fetchall()
86  for row in rows:
87  self.fixSubsetValidity(conn, table, row, validity)
88 
89  def fixSubsetValidity(self, conn, table, detectorData, validity):
90  """Update the validity ranges among selected rows in the registry.
91 
92  For defects, the products are valid from their start date until
93  they are superseded by subsequent defect data.
94  For other calibration products, the validity ranges are checked and
95  if there are overlaps, a midpoint is used to fix the overlaps,
96  so that the calibration data with whose date is nearest the date
97  of the observation is used.
98 
99  @param conn: Database connection
100  @param table: Name of table to be selected
101  @param detectorData: Values identifying a detector (from columns in self.config.detector)
102  @param validity: Validity range (days)
103  """
104  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
105  sql = "SELECT id, %s FROM %s" % (columns, table)
106  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
107  sql += " ORDER BY " + self.config.calibDate
108  cursor = conn.cursor()
109  cursor.execute(sql, detectorData)
110  rows = cursor.fetchall()
111 
112  try:
113  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
114  row in rows])
115  except Exception as e:
116  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
117  # Sqlite returns unicode strings, which cannot be passed through SWIG.
118  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
119  (table, det)))
120  return
121  dates = valids.keys()
122  if table in self.config.validityUntilSuperseded:
123  # A calib is valid until it is superseded
124  for thisDate, nextDate in itertools.izip(dates[:-1], dates[1:]):
125  valids[thisDate][0] = thisDate
126  valids[thisDate][1] = nextDate - datetime.timedelta(1)
127  valids[dates[-1]][0] = dates[-1]
128  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
129  else:
130  # A calib is valid within the validity range (in days) specified.
131  for dd in dates:
132  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
133  # Fix the dates so that they do not overlap, which can cause the butler to find a
134  # non-unique calib.
135  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in itertools.izip(dates[:-1], dates[1:])]
136  for i, (date, midpoint) in enumerate(itertools.izip(dates[:-1], midpoints)):
137  if valids[date][1] > midpoint:
138  nextDate = dates[i + 1]
139  valids[nextDate][0] = midpoint + datetime.timedelta(1)
140  valids[date][1] = midpoint
141  del midpoints
142  del dates
143  # Update the validity data in the registry
144  for row in rows:
145  calibDate = _convertToDate(row[self.config.calibDate])
146  validStart = valids[calibDate][0].isoformat()
147  validEnd = valids[calibDate][1].isoformat()
148  sql = "UPDATE %s" % table
149  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
150  sql += " WHERE id=?"
151  conn.execute(sql, (validStart, validEnd, row["id"]))
152 
153 
154 class IngestCalibsArgumentParser(InputOnlyArgumentParser):
155  """Argument parser to support ingesting calibration images into the repository"""
156  def __init__(self, *args, **kwargs):
157  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
158  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
159  default=False, help="Don't perform any action?")
160  self.add_argument("--create", action="store_true", help="Create new registry?")
161  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
162  self.add_argument("--calibType", type=str, default=None,
163  choices=[None, "bias", "dark", "flat", "fringe", "defect"],
164  help="Type of the calibration data to be ingested;" +
165  " if omitted, the type is determined from" +
166  " the file header information")
167  self.add_argument("files", nargs="+", help="Names of file")
168 
169 
170 class IngestCalibsConfig(Config):
171  """Configuration for IngestCalibsTask"""
172  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
173  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
174 
175 
177  """Task that generates registry for calibration images"""
178  ConfigClass = IngestCalibsConfig
179  ArgumentParser = IngestCalibsArgumentParser
180  _DefaultName = "ingestCalibs"
181 
182  def run(self, args):
183  """Ingest all specified files and add them to the registry"""
184  calibRoot = args.calib if args.calib is not None else "."
185  filenameList = sum([glob(filename) for filename in args.files], [])
186  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
187  for infile in filenameList:
188  fileInfo, hduInfoList = self.parse.getInfo(infile)
189  if args.calibType is None:
190  calibType = self.parse.getCalibType(infile)
191  else:
192  calibType = args.calibType
193  if calibType not in self.register.config.tables:
194  self.log.warn(str("Skipped adding %s of observation type '%s' to registry" %
195  (infile, calibType)))
196  continue
197  for info in hduInfoList:
198  self.register.addRow(registry, info, dryrun=args.dryrun,
199  create=args.create, table=calibType)
200  if not args.dryrun:
201  self.register.updateValidityRanges(registry, args.validity)
202  else:
203  self.log.info("Would update validity ranges here, but dryrun")
boost::shared_ptr< daf::base::PropertySet > readMetadata(std::string const &fileName, int hdu=0, bool strip=false)
Return the metadata (header entries) from a FITS file.