LSSTApplications  21.0.0+1b62c9342b,21.0.0+45a059f35e,21.0.0-1-ga51b5d4+ceb9cf20a3,21.0.0-2-g103fe59+4d13aff7ba,21.0.0-2-g1367e85+571a348718,21.0.0-2-g2909d54+45a059f35e,21.0.0-2-g45278ab+1b62c9342b,21.0.0-2-g4bc9b9f+ebfe466dad,21.0.0-2-g5242d73+571a348718,21.0.0-2-g54e2caa+07cebfb09d,21.0.0-2-g66bcc37+0b2c5d3971,21.0.0-2-g7f82c8f+08f1f55c36,21.0.0-2-g8dde007+5d1b9cb3f5,21.0.0-2-g8f08a60+73884b2cf5,21.0.0-2-ga326454+08f1f55c36,21.0.0-2-ga63a54e+458e82fbcd,21.0.0-2-gc738bc1+8c4731df06,21.0.0-2-gde069b7+5a8f2956b8,21.0.0-2-ge17e5af+571a348718,21.0.0-2-ge712728+cfa36ee5f9,21.0.0-2-gecfae73+e597808034,21.0.0-2-gfc62afb+571a348718,21.0.0-20-g4449a12+6d1341e0f3,21.0.0-22-gf0532904+1cd928f0c5,21.0.0-3-g4c5b185+c3794955c6,21.0.0-3-g6d51c4a+0b2c5d3971,21.0.0-3-g8076721+5adeb471db,21.0.0-3-gaa929c8+01f4b7cfca,21.0.0-3-gd222c45+afc8332dbe,21.0.0-4-g1383c07+0b2c5d3971,21.0.0-4-g3300ddd+1b62c9342b,21.0.0-4-g5873dc9+9a92674037,21.0.0-4-g8a80011+bd904b6426,21.0.0-5-gcff38f6+844b7f7b93,21.0.0-6-g463d161+18af5fb57b,21.0.0-6-gd3283ba+01f4b7cfca,21.0.0-8-g19111d86+8234efb485,21.0.0-9-g7bed000b9+c7d3cce47e,w.2021.04
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 from dateutil import parser
5 
6 from lsst.afw.fits import readMetadata
7 from lsst.pex.config import Config, Field, ListField, ConfigurableField
8 from lsst.pipe.base import InputOnlyArgumentParser
9 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
10 
11 
12 def _convertToDate(dateString):
13  """Convert a string into a date object"""
14  return parser.parse(dateString).date()
15 
16 
18  """Task that will parse the filename and/or its contents to get the
19  required information to populate the calibration registry."""
20 
21  def getCalibType(self, filename):
22  """Return a a known calibration dataset type using
23  the observation type in the header keyword OBSTYPE
24 
25  @param filename: Input filename
26  """
27  md = readMetadata(filename, self.config.hdu)
28  if not md.exists("OBSTYPE"):
29  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30  (filename, self.config.hdu))
31  obstype = md.getScalar("OBSTYPE").strip().lower()
32  if "flat" in obstype:
33  obstype = "flat"
34  elif "zero" in obstype or "bias" in obstype:
35  obstype = "bias"
36  elif "dark" in obstype:
37  obstype = "dark"
38  elif "fringe" in obstype:
39  obstype = "fringe"
40  elif "sky" in obstype:
41  obstype = "sky"
42  elif "illumcor" in obstype:
43  obstype = "illumcor"
44  elif "defects" in obstype:
45  obstype = "defects"
46  elif "qe_curve" in obstype:
47  obstype = "qe_curve"
48  elif "linearizer" in obstype:
49  obstype = "linearizer"
50  elif "crosstalk" in obstype:
51  obstype = "crosstalk"
52  return obstype
53 
54  def getDestination(self, butler, info, filename):
55  """Get destination for the file
56 
57  @param butler Data butler
58  @param info File properties, used as dataId for the butler
59  @param filename Input filename
60  @return Destination filename
61  """
62  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
63  # The Butler should handle this behind-the-scenes in the future.
64  # Please reference DM-9873 and delete this comment once it is resolved.
65  tempinfo = {k: v for (k, v) in info.items() if v is not None}
66  calibType = self.getCalibType(filename)
67  raw = butler.get(calibType + "_filename", tempinfo)[0]
68  # Ensure filename is devoid of cfitsio directions about HDUs
69  c = raw.find("[")
70  if c > 0:
71  raw = raw[:c]
72  return raw
73 
74 
76  """Configuration for the CalibsRegisterTask"""
77  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky", "defects", "qe_curve",
78  "linearizer", "crosstalk"], doc="Names of tables")
79  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
80  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
81  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
82  detector = ListField(dtype=str, default=["filter", "ccd"],
83  doc="Columns that identify individual detectors")
84  validityUntilSuperseded = ListField(dtype=str, default=["defects", "qe_curve", "linearizer", "crosstalk"],
85  doc="Tables for which to set validity for a calib from when it is "
86  "taken until it is superseded by the next; validity in other tables "
87  "is calculated by applying the validity range.")
88  incrementValidEnd = Field(
89  dtype=bool,
90  default=True,
91  doc="Fix the off-by-one error by incrementing validEnd. See "
92  "fixSubsetValidity for more details.",
93  )
94 
95 
97  """Task that will generate the calibration registry for the Mapper"""
98  ConfigClass = CalibsRegisterConfig
99 
100  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
101  """Open the registry and return the connection handle"""
102  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
103 
104  def createTable(self, conn, forceCreateTables=False):
105  """Create the registry tables"""
106  for table in self.config.tables:
107  RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
108 
109  def addRow(self, conn, info, *args, **kwargs):
110  """Add a row to the file table"""
111  info[self.config.validStart] = None
112  info[self.config.validEnd] = None
113  RegisterTask.addRow(self, conn, info, *args, **kwargs)
114 
115  def updateValidityRanges(self, conn, validity, tables=None):
116  """Loop over all tables, filters, and ccdnums,
117  and update the validity ranges in the registry.
118 
119  @param conn: Database connection
120  @param validity: Validity range (days)
121  """
122  conn.row_factory = sqlite3.Row
123  cursor = conn.cursor()
124  if tables is None:
125  tables = self.config.tables
126  for table in tables:
127  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
128  cursor.execute(sql)
129  rows = cursor.fetchall()
130  for row in rows:
131  self.fixSubsetValidity(conn, table, row, validity)
132 
133  def fixSubsetValidity(self, conn, table, detectorData, validity):
134  """Update the validity ranges among selected rows in the registry.
135 
136  For defects and qe_curve, the products are valid from their start date until
137  they are superseded by subsequent defect data.
138  For other calibration products, the validity ranges are checked and
139  if there are overlaps, a midpoint is used to fix the overlaps,
140  so that the calibration data with whose date is nearest the date
141  of the observation is used.
142 
143  DM generated calibrations contain a CALIB_ID header
144  keyword. These calibrations likely require the
145  incrementValidEnd configuration option set to True. Other
146  calibrations generate the calibDate via the DATE-OBS header
147  keyword, and likely require incrementValidEnd=False.
148 
149  @param conn: Database connection
150  @param table: Name of table to be selected
151  @param detectorData: Values identifying a detector (from columns in self.config.detector)
152  @param validity: Validity range (days)
153  """
154  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
155  sql = "SELECT id, %s FROM %s" % (columns, table)
156  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
157  sql += " ORDER BY " + self.config.calibDate
158  cursor = conn.cursor()
159  cursor.execute(sql, detectorData)
160  rows = cursor.fetchall()
161 
162  try:
163  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
164  row in rows])
165  except Exception:
166  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
167  # Sqlite returns unicode strings, which cannot be passed through SWIG.
168  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
169  (table, det)))
170  return
171  dates = list(valids.keys())
172  if table in self.config.validityUntilSuperseded:
173  # A calib is valid until it is superseded
174  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
175  valids[thisDate][0] = thisDate
176  valids[thisDate][1] = nextDate
177  valids[dates[-1]][0] = dates[-1]
178  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
179  else:
180  # A calib is valid within the validity range (in days) specified.
181  for dd in dates:
182  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
183  # Fix the dates so that they do not overlap, which can cause the butler to find a
184  # non-unique calib.
185  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
186  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
187  if valids[date][1] > midpoint:
188  nextDate = dates[i + 1]
189  valids[nextDate][0] = midpoint + datetime.timedelta(1)
190  if self.config.incrementValidEnd:
191  valids[date][1] = midpoint + datetime.timedelta(1)
192  else:
193  valids[date][1] = midpoint
194  del midpoints
195  del dates
196  # Update the validity data in the registry
197  for row in rows:
198  calibDate = _convertToDate(row[self.config.calibDate])
199  validStart = valids[calibDate][0].isoformat()
200  validEnd = valids[calibDate][1].isoformat()
201  sql = "UPDATE %s" % table
202  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
203  sql += " WHERE id=?"
204  conn.execute(sql, (validStart, validEnd, row["id"]))
205 
206 
208  """Argument parser to support ingesting calibration images into the repository"""
209 
210  def __init__(self, *args, **kwargs):
211  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
212  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
213  default=False, help="Don't perform any action?")
214  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
215  help="Mode of delivering the files to their destination")
216  self.add_argument("--create", action="store_true", help="Create new registry?")
217  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
218  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
219  help="Don't register files that have already been registered")
220  self.add_argument("files", nargs="+", help="Names of file")
221 
222 
224  """Configuration for IngestCalibsTask"""
225  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
226  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
227  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
228  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
229 
230 
232  """Task that generates registry for calibration images"""
233  ConfigClass = IngestCalibsConfig
234  ArgumentParser = IngestCalibsArgumentParser
235  _DefaultName = "ingestCalibs"
236 
237  def run(self, args):
238  """Ingest all specified files and add them to the registry"""
239  calibRoot = args.calib if args.calib is not None else args.output
240  filenameList = self.expandFiles(args.files)
241  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
242  calibTypes = set()
243  for infile in filenameList:
244  fileInfo, hduInfoList = self.parse.getInfo(infile)
245  calibType = self.parse.getCalibType(infile)
246  if calibType not in self.register.config.tables:
247  self.log.warn(str("Skipped adding %s of observation type '%s' to registry "
248  "(must be one of %s)" %
249  (infile, calibType, ", ".join(self.register.config.tables))))
250  continue
251  calibTypes.add(calibType)
252  if args.mode != 'skip':
253  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
254  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
255  if not ingested:
256  self.log.warn(str("Failed to ingest %s of observation type '%s'" %
257  (infile, calibType)))
258  continue
259  if self.register.check(registry, fileInfo, table=calibType):
260  if args.ignoreIngested:
261  continue
262 
263  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
264  for info in hduInfoList:
265  self.register.addRow(registry, info, dryrun=args.dryrun,
266  create=args.create, table=calibType)
267  if not args.dryrun:
268  self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
269  else:
270  self.log.info("Would update validity ranges here, but dryrun")
lsst::afw::fits::readMetadata
std::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=DEFAULT_HDU, bool strip=false)
Read FITS header.
Definition: fits.cc:1657
lsst.pipe.tasks.ingestCalibs.IngestCalibsTask
Definition: ingestCalibs.py:231
lsst::log.log.logContinued.warn
def warn(fmt, *args)
Definition: logContinued.py:205
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:201
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask
Definition: ingestCalibs.py:96
lsst.pipe.tasks.ingestCalibs.CalibsParseTask.getDestination
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:54
lsst.pex.config.listField.ListField
Definition: listField.py:216
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.createTable
def createTable(self, conn, forceCreateTables=False)
Definition: ingestCalibs.py:104
lsst.pipe.tasks.ingest.IngestTask.expandFiles
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:557
lsst.pipe.base.argumentParser.InputOnlyArgumentParser
Definition: argumentParser.py:917
lsst.pex.config.configurableField.ConfigurableField
Definition: configurableField.py:170
lsst.pipe.tasks.ingest.IngestTask
Definition: ingest.py:406
strip
bool strip
Definition: fits.cc:911
lsst.pipe.tasks.ingest.ParseTask
Definition: ingest.py:67
lsst.pipe.tasks.ingestCalibs.CalibsRegisterConfig
Definition: ingestCalibs.py:75
lsst.pipe.tasks.ingest
Definition: ingest.py:1
lsst.pex.config
Definition: __init__.py:1
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.fixSubsetValidity
def fixSubsetValidity(self, conn, table, detectorData, validity)
Definition: ingestCalibs.py:133
lsst.pipe.base.task.Task.config
config
Definition: task.py:162
lsst.pipe.tasks.ingestCalibs.IngestCalibsArgumentParser
Definition: ingestCalibs.py:207
lsstDebug.getInfo
getInfo
Definition: lsstDebug.py:87
lsst.pipe.base.task.Task.log
log
Definition: task.py:161
lsst.pipe.tasks.ingest.RegisterTask
Definition: ingest.py:280
lsst.pipe.tasks.cli.cmd.commands.str
str
Definition: commands.py:50
lsst.pipe.tasks.ingestCalibs.IngestCalibsArgumentParser.__init__
def __init__(self, *args, **kwargs)
Definition: ingestCalibs.py:210
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.addRow
def addRow(self, conn, info, *args, **kwargs)
Definition: ingestCalibs.py:109
lsst.pipe.tasks.ingest.IngestTask.ingest
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:478
lsst.pipe.tasks.ingestCalibs.IngestCalibsConfig
Definition: ingestCalibs.py:223
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.updateValidityRanges
def updateValidityRanges(self, conn, validity, tables=None)
Definition: ingestCalibs.py:115
lsst::afw::fits
Definition: fits.h:31
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pex.config.config.Config
Definition: config.py:736
lsst.pipe.tasks.ingestCalibs.CalibsParseTask
Definition: ingestCalibs.py:17
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.openRegistry
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
Definition: ingestCalibs.py:100
lsst.pex.config.config.Field
Definition: config.py:247
lsst.pipe.base
Definition: __init__.py:1
lsst.pipe.tasks.ingest.RegisterConfig
Definition: ingest.py:223
lsst.pipe.tasks.ingestCalibs.CalibsParseTask.getCalibType
def getCalibType(self, filename)
Definition: ingestCalibs.py:21
lsst.pipe.tasks.ingestCalibs.IngestCalibsTask.run
def run(self, args)
Definition: ingestCalibs.py:237
set
daf::base::PropertySet * set
Definition: fits.cc:912