LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 from dateutil import parser
5 
6 from lsst.afw.fits import readMetadata
7 from lsst.pex.config import Config, Field, ListField, ConfigurableField
8 from lsst.pipe.base import InputOnlyArgumentParser
9 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
10 
11 
12 def _convertToDate(dateString):
13  """Convert a string into a date object"""
14  return parser.parse(dateString).date()
15 
16 
18  """Task that will parse the filename and/or its contents to get the
19  required information to populate the calibration registry."""
20 
21  def getCalibType(self, filename):
22  """Return a a known calibration dataset type using
23  the observation type in the header keyword OBSTYPE
24 
25  @param filename: Input filename
26  """
27  md = readMetadata(filename, self.config.hdu)
28  if not md.exists("OBSTYPE"):
29  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30  (filename, self.config.hdu))
31  obstype = md.getScalar("OBSTYPE").strip().lower()
32  if "flat" in obstype:
33  obstype = "flat"
34  elif "zero" in obstype or "bias" in obstype:
35  obstype = "bias"
36  elif "dark" in obstype:
37  obstype = "dark"
38  elif "fringe" in obstype:
39  obstype = "fringe"
40  elif "sky" in obstype:
41  obstype = "sky"
42  elif "illumcor" in obstype:
43  obstype = "illumcor"
44  elif "defects" in obstype:
45  obstype = "defects"
46  elif "qe_curve" in obstype:
47  obstype = "qe_curve"
48  elif "linearizer" in obstype:
49  obstype = "linearizer"
50  elif "crosstalk" in obstype:
51  obstype = "crosstalk"
52  elif "BFK" in obstype:
53  obstype = "bfk"
54  elif "photodiode" in obstype:
55  obstype = 'photodiode'
56  return obstype
57 
58  def getDestination(self, butler, info, filename):
59  """Get destination for the file
60 
61  @param butler Data butler
62  @param info File properties, used as dataId for the butler
63  @param filename Input filename
64  @return Destination filename
65  """
66  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
67  # The Butler should handle this behind-the-scenes in the future.
68  # Please reference DM-9873 and delete this comment once it is resolved.
69  tempinfo = {k: v for (k, v) in info.items() if v is not None}
70  calibType = self.getCalibTypegetCalibType(filename)
71  raw = butler.get(calibType + "_filename", tempinfo)[0]
72  # Ensure filename is devoid of cfitsio directions about HDUs
73  c = raw.find("[")
74  if c > 0:
75  raw = raw[:c]
76  return raw
77 
78 
80  """Configuration for the CalibsRegisterTask"""
81  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky", "defects", "qe_curve",
82  "linearizer", "crosstalk", "bfk", "photodiode"],
83  doc="Names of tables")
84  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
85  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
86  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
87  detector = ListField(dtype=str, default=["filter", "ccd"],
88  doc="Columns that identify individual detectors")
89  validityUntilSuperseded = ListField(dtype=str, default=["defects", "qe_curve", "linearizer", "crosstalk",
90  "bfk", "photodiode"],
91  doc="Tables for which to set validity for a calib from when it is "
92  "taken until it is superseded by the next; validity in other tables "
93  "is calculated by applying the validity range.")
94  incrementValidEnd = Field(
95  dtype=bool,
96  default=True,
97  doc="Fix the off-by-one error by incrementing validEnd. See "
98  "fixSubsetValidity for more details.",
99  )
100 
101 
103  """Task that will generate the calibration registry for the Mapper"""
104  ConfigClass = CalibsRegisterConfig
105 
106  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
107  """Open the registry and return the connection handle"""
108  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
109 
110  def createTable(self, conn, forceCreateTables=False):
111  """Create the registry tables"""
112  for table in self.config.tables:
113  RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
114 
115  def addRow(self, conn, info, *args, **kwargs):
116  """Add a row to the file table"""
117  info[self.config.validStart] = None
118  info[self.config.validEnd] = None
119  RegisterTask.addRow(self, conn, info, *args, **kwargs)
120 
121  def updateValidityRanges(self, conn, validity, tables=None):
122  """Loop over all tables, filters, and ccdnums,
123  and update the validity ranges in the registry.
124 
125  @param conn: Database connection
126  @param validity: Validity range (days)
127  """
128  conn.row_factory = sqlite3.Row
129  cursor = conn.cursor()
130  if tables is None:
131  tables = self.config.tables
132  for table in tables:
133  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
134  cursor.execute(sql)
135  rows = cursor.fetchall()
136  for row in rows:
137  self.fixSubsetValidityfixSubsetValidity(conn, table, row, validity)
138 
139  def fixSubsetValidity(self, conn, table, detectorData, validity):
140  """Update the validity ranges among selected rows in the registry.
141 
142  For defects and qe_curve, the products are valid from their start date until
143  they are superseded by subsequent defect data.
144  For other calibration products, the validity ranges are checked and
145  if there are overlaps, a midpoint is used to fix the overlaps,
146  so that the calibration data with whose date is nearest the date
147  of the observation is used.
148 
149  DM generated calibrations contain a CALIB_ID header
150  keyword. These calibrations likely require the
151  incrementValidEnd configuration option set to True. Other
152  calibrations generate the calibDate via the DATE-OBS header
153  keyword, and likely require incrementValidEnd=False.
154 
155  @param conn: Database connection
156  @param table: Name of table to be selected
157  @param detectorData: Values identifying a detector (from columns in self.config.detector)
158  @param validity: Validity range (days)
159  """
160  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
161  sql = "SELECT id, %s FROM %s" % (columns, table)
162  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
163  sql += " ORDER BY " + self.config.calibDate
164  cursor = conn.cursor()
165  cursor.execute(sql, detectorData)
166  rows = cursor.fetchall()
167 
168  try:
169  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
170  row in rows])
171  except Exception:
172  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
173  self.log.warning("Skipped setting the validity overlaps for %s %s: missing calibration dates",
174  table, det)
175  return
176  dates = list(valids.keys())
177  if table in self.config.validityUntilSuperseded:
178  # A calib is valid until it is superseded
179  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
180  valids[thisDate][0] = thisDate
181  valids[thisDate][1] = nextDate
182  valids[dates[-1]][0] = dates[-1]
183  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
184  else:
185  # A calib is valid within the validity range (in days) specified.
186  for dd in dates:
187  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
188  # Fix the dates so that they do not overlap, which can cause the butler to find a
189  # non-unique calib.
190  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
191  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
192  if valids[date][1] > midpoint:
193  nextDate = dates[i + 1]
194  valids[nextDate][0] = midpoint + datetime.timedelta(1)
195  if self.config.incrementValidEnd:
196  valids[date][1] = midpoint + datetime.timedelta(1)
197  else:
198  valids[date][1] = midpoint
199  del midpoints
200  del dates
201  # Update the validity data in the registry
202  for row in rows:
203  calibDate = _convertToDate(row[self.config.calibDate])
204  validStart = valids[calibDate][0].isoformat()
205  validEnd = valids[calibDate][1].isoformat()
206  sql = "UPDATE %s" % table
207  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
208  sql += " WHERE id=?"
209  conn.execute(sql, (validStart, validEnd, row["id"]))
210 
211 
212 class IngestCalibsArgumentParser(InputOnlyArgumentParser):
213  """Argument parser to support ingesting calibration images into the repository"""
214 
215  def __init__(self, *args, **kwargs):
216  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
217  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
218  default=False, help="Don't perform any action?")
219  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
220  help="Mode of delivering the files to their destination")
221  self.add_argument("--create", action="store_true", help="Create new registry?")
222  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
223  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
224  help="Don't register files that have already been registered")
225  self.add_argument("files", nargs="+", help="Names of file")
226 
227 
229  """Configuration for IngestCalibsTask"""
230  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
231  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
232  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
233  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
234 
235 
237  """Task that generates registry for calibration images"""
238  ConfigClass = IngestCalibsConfig
239  ArgumentParser = IngestCalibsArgumentParser
240  _DefaultName = "ingestCalibs"
241 
242  def run(self, args):
243  """Ingest all specified files and add them to the registry"""
244  calibRoot = args.calib if args.calib is not None else args.output
245  filenameList = self.expandFilesexpandFiles(args.files)
246  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
247  calibTypes = set()
248  for infile in filenameList:
249  fileInfo, hduInfoList = self.parse.getInfo(infile)
250  calibType = self.parse.getCalibType(infile)
251  if calibType not in self.register.config.tables:
252  self.log.warning("Skipped adding %s of observation type '%s' to registry "
253  "(must be one of %s)",
254  infile, calibType, ", ".join(self.register.config.tables))
255  continue
256  calibTypes.add(calibType)
257  if args.mode != 'skip':
258  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
259  ingested = self.ingestingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
260  if not ingested:
261  self.log.warning("Failed to ingest %s of observation type '%s'",
262  infile, calibType)
263  continue
264  if self.register.check(registry, fileInfo, table=calibType):
265  if args.ignoreIngested:
266  continue
267 
268  self.log.warning("%s: already ingested: %s", infile, fileInfo)
269  for info in hduInfoList:
270  self.register.addRow(registry, info, dryrun=args.dryrun,
271  create=args.create, table=calibType)
272  if not args.dryrun:
273  self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
274  else:
275  self.log.info("Would update validity ranges here, but dryrun")
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:557
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:478
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:58
def fixSubsetValidity(self, conn, table, detectorData, validity)
def addRow(self, conn, info, *args, **kwargs)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
def createTable(self, conn, forceCreateTables=False)
def updateValidityRanges(self, conn, validity, tables=None)
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
bool strip
Definition: fits.cc:911
std::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=DEFAULT_HDU, bool strip=false)
Read FITS header.
Definition: fits.cc:1657