LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
calibType.py
Go to the documentation of this file.
1# This file is part of ip_isr.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21__all__ = ["IsrCalib", "IsrProvenance"]
22
23import abc
24import datetime
25import logging
26import os.path
27import warnings
28import yaml
29import json
30import numpy as np
31
32from astro_metadata_translator import merge_headers
33from astropy.table import Table, Column
34from astropy.io import fits
35
36from lsst.daf.base import PropertyList
37from lsst.utils.introspection import get_full_type_name
38from lsst.utils import doImport
39
40
41class IsrCalib(abc.ABC):
42 """Generic calibration type.
43
44 Subclasses must implement the toDict, fromDict, toTable, fromTable
45 methods that allow the calibration information to be converted
46 from dictionaries and afw tables. This will allow the calibration
47 to be persisted using the base class read/write methods.
48
49 The validate method is intended to provide a common way to check
50 that the calibration is valid (internally consistent) and
51 appropriate (usable with the intended data). The apply method is
52 intended to allow the calibration to be applied in a consistent
53 manner.
54
55 Parameters
56 ----------
57 camera : `lsst.afw.cameraGeom.Camera`, optional
58 Camera to extract metadata from.
59 detector : `lsst.afw.cameraGeom.Detector`, optional
60 Detector to extract metadata from.
61 log : `logging.Logger`, optional
62 Log for messages.
63 """
64 _OBSTYPE = "generic"
65 _SCHEMA = "NO SCHEMA"
66 _VERSION = 0
67
68 def __init__(self, camera=None, detector=None, log=None, **kwargs):
69 self._instrument = None
70 self._raftName = None
71 self._slotName = None
72 self._detectorName = None
73 self._detectorSerial = None
74 self._detectorId = None
75 self._filter = None
76 self._calibId = None
77 self._seqfile = None
78 self._seqname = None
79 self._seqcksum = None
82 self.calibInfoFromDict(kwargs)
83
84 # Define the required attributes for this calibration. These
85 # are entries that are automatically filled and propagated
86 # from the metadata. The existence of the attribute is
87 # required (they're checked for equivalence), but they do not
88 # necessarily need to have a value (None == None in this
89 # case).
90 self.requiredAttributes = set(["_OBSTYPE", "_SCHEMA", "_VERSION"])
91 self.requiredAttributes.update(["_instrument", "_raftName", "_slotName",
92 "_detectorName", "_detectorSerial", "_detectorId",
93 "_filter", "_calibId", "_seqfile", "_seqname", "_seqcksum",
94 "_metadata"])
95
96 self.log = log if log else logging.getLogger(__name__)
97
98 if detector:
99 self.fromDetector(detector)
100 self.updateMetadata(camera=camera, detector=detector)
101
102 def __str__(self):
103 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, detector={self._detectorName}, )"
104
105 def __eq__(self, other):
106 """Calibration equivalence.
107
108 Running ``calib.log.setLevel(0)`` enables debug statements to
109 identify problematic fields.
110 """
111 if not isinstance(other, self.__class__):
112 self.log.debug("Incorrect class type: %s %s", self.__class__, other.__class__)
113 return False
114
115 for attr in self._requiredAttributes:
116 attrSelf = getattr(self, attr)
117 attrOther = getattr(other, attr)
118
119 if isinstance(attrSelf, dict):
120 # Dictionary of arrays.
121 if attrSelf.keys() != attrOther.keys():
122 self.log.debug("Dict Key Failure: %s %s %s", attr, attrSelf.keys(), attrOther.keys())
123 return False
124 for key in attrSelf:
125 try:
126 if not np.allclose(attrSelf[key], attrOther[key], equal_nan=True):
127 self.log.debug("Array Failure: %s %s %s", key, attrSelf[key], attrOther[key])
128 return False
129 except TypeError:
130 # If it is not something numpy can handle
131 # (it's not a number or array of numbers),
132 # then it needs to have its own equivalence
133 # operator.
134 if np.all(attrSelf[key] != attrOther[key]):
135 return False
136 elif (isinstance(attrSelf, np.ndarray) or isinstance(attrSelf, Column)
137 or isinstance(attrOther, np.ndarray) or isinstance(attrOther, Column)):
138 # Bare array.
139 if isinstance(attrSelf[0], (str, np.str_, np.bytes_)):
140 if not np.all(attrSelf == attrOther):
141 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther)
142 return False
143 else:
144 if not np.allclose(attrSelf, attrOther, equal_nan=True):
145 self.log.debug("Array Failure: %s %s %s", attr, attrSelf, attrOther)
146 return False
147 elif type(attrSelf) is not type(attrOther):
148 if set([attrSelf, attrOther]) == set([None, ""]):
149 # Fits converts None to "", but None is not "".
150 continue
151 self.log.debug("Type Failure: %s %s %s %s %s", attr, type(attrSelf), type(attrOther),
152 attrSelf, attrOther)
153 return False
154 else:
155 if attrSelf != attrOther:
156 self.log.debug("Value Failure: %s %s %s", attr, attrSelf, attrOther)
157 return False
158
159 return True
160
161 @property
163 return self._requiredAttributes
164
165 @requiredAttributes.setter
166 def requiredAttributes(self, value):
167 self._requiredAttributes = value
168
169 # Property accessor associated with getMetadata().
170 @property
171 def metadata(self):
172 return self._metadata
173
174 def getMetadata(self):
175 """Retrieve metadata associated with this calibration.
176
177 Returns
178 -------
179 meta : `lsst.daf.base.PropertyList`
180 Metadata. The returned `~lsst.daf.base.PropertyList` can be
181 modified by the caller and the changes will be written to
182 external files.
183 """
184 return self._metadata
185
186 def setMetadata(self, metadata):
187 """Store a copy of the supplied metadata with this calibration.
188
189 Parameters
190 ----------
191 metadata : `lsst.daf.base.PropertyList`
192 Metadata to associate with the calibration. Will be copied and
193 overwrite existing metadata.
194 """
195 if metadata is not None:
196 self._metadata.update(metadata)
197
198 # Ensure that we have the obs type required by calibration ingest
199 self._metadata["OBSTYPE"] = self._OBSTYPE
200 self._metadata[self._OBSTYPE + "_SCHEMA"] = self._SCHEMA
201 self._metadata[self._OBSTYPE + "_VERSION"] = self._VERSION
202
203 if isinstance(metadata, dict):
204 self.calibInfoFromDict(metadata)
205 elif isinstance(metadata, PropertyList):
206 self.calibInfoFromDict(metadata.toDict())
207
208 def updateMetadata(self, camera=None, detector=None, filterName=None,
209 setCalibId=False, setCalibInfo=False, setDate=False,
210 **kwargs):
211 """Update metadata keywords with new values.
212
213 Parameters
214 ----------
215 camera : `lsst.afw.cameraGeom.Camera`, optional
216 Reference camera to use to set ``_instrument`` field.
217 detector : `lsst.afw.cameraGeom.Detector`, optional
218 Reference detector to use to set ``_detector*`` fields.
219 filterName : `str`, optional
220 Filter name to assign to this calibration.
221 setCalibId : `bool`, optional
222 Construct the ``_calibId`` field from other fields.
223 setCalibInfo : `bool`, optional
224 Set calibration parameters from metadata.
225 setDate : `bool`, optional
226 Ensure the metadata ``CALIBDATE`` fields are set to the current
227 datetime.
228 kwargs : `dict` or `collections.abc.Mapping`, optional
229 Set of ``key=value`` pairs to assign to the metadata.
230 """
231 mdOriginal = self.getMetadata()
232 mdSupplemental = dict()
233
234 for k, v in kwargs.items():
235 if isinstance(v, fits.card.Undefined):
236 kwargs[k] = None
237
238 if setCalibInfo:
239 self.calibInfoFromDict(kwargs)
240
241 if camera:
242 self._instrument = camera.getName()
243
244 if detector:
245 self._detectorName = detector.getName()
246 self._detectorSerial = detector.getSerial()
247 self._detectorId = detector.getId()
248 if "_" in self._detectorName:
249 (self._raftName, self._slotName) = self._detectorName.split("_")
250
251 if filterName:
252 # TOD0 DM-28093: I think this whole comment can go away, if we
253 # always use physicalLabel everywhere in ip_isr.
254 # If set via:
255 # exposure.getInfo().getFilter().getName()
256 # then this will hold the abstract filter.
257 self._filter = filterName
258
259 if setDate:
260 date = datetime.datetime.now()
261 mdSupplemental["CALIBDATE"] = date.isoformat()
262 mdSupplemental["CALIB_CREATION_DATE"] = date.date().isoformat()
263 mdSupplemental["CALIB_CREATION_TIME"] = date.time().isoformat()
264
265 if setCalibId:
266 values = []
267 values.append(f"instrument={self._instrument}") if self._instrument else None
268 values.append(f"raftName={self._raftName}") if self._raftName else None
269 values.append(f"detectorName={self._detectorName}") if self._detectorName else None
270 values.append(f"detector={self._detectorId}") if self._detectorId else None
271 values.append(f"filter={self._filter}") if self._filter else None
272
273 calibDate = mdOriginal.get("CALIBDATE", mdSupplemental.get("CALIBDATE", None))
274 values.append(f"calibDate={calibDate}") if calibDate else None
275
276 self._calibId = " ".join(values)
277
278 self._metadata["INSTRUME"] = self._instrument if self._instrument else None
279 self._metadata["RAFTNAME"] = self._raftName if self._raftName else None
280 self._metadata["SLOTNAME"] = self._slotName if self._slotName else None
281 self._metadata["DETECTOR"] = self._detectorId
282 self._metadata["DET_NAME"] = self._detectorName if self._detectorName else None
283 self._metadata["DET_SER"] = self._detectorSerial if self._detectorSerial else None
284 self._metadata["FILTER"] = self._filter if self._filter else None
285 self._metadata["CALIB_ID"] = self._calibId if self._calibId else None
286 self._metadata["SEQFILE"] = self._seqfile if self._seqfile else None
287 self._metadata["SEQNAME"] = self._seqname if self._seqname else None
288 self._metadata["SEQCKSUM"] = self._seqcksum if self._seqcksum else None
289 self._metadata["CALIBCLS"] = get_full_type_name(self)
290
291 mdSupplemental.update(kwargs)
292 mdOriginal.update(mdSupplemental)
293
294 def updateMetadataFromExposures(self, exposures):
295 """Extract and unify metadata information.
296
297 Parameters
298 ----------
299 exposures : `list`
300 Exposures or other calibrations to scan.
301 """
302 # This list of keywords is the set of header entries that
303 # should be checked and propagated. Not having an entry is
304 # not a failure, as they may not be defined for the exposures
305 # being used.
306 keywords = ["SEQNAME", "SEQFILE", "SEQCKSUM", "ODP", "AP0_RC"]
307 metadata = {}
308
309 for exp in exposures:
310 try:
311 expMeta = exp.getMetadata()
312 except AttributeError:
313 continue
314 for key in keywords:
315 if key in expMeta:
316 if key in metadata:
317 if metadata[key] != expMeta[key]:
318 self.log.warning("Metadata mismatch! Have: %s Found %s",
319 metadata[key], expMeta[key])
320 else:
321 metadata[key] = expMeta[key]
322
323 self.updateMetadata(**metadata, setCalibInfo=True)
324
325 def calibInfoFromDict(self, dictionary):
326 """Handle common keywords.
327
328 This isn't an ideal solution, but until all calibrations
329 expect to find everything in the metadata, they still need to
330 search through dictionaries.
331
332 Parameters
333 ----------
334 dictionary : `dict` or `lsst.daf.base.PropertyList`
335 Source for the common keywords.
336
337 Raises
338 ------
339 RuntimeError
340 Raised if the dictionary does not match the expected OBSTYPE.
341 """
342
343 def search(haystack, needles):
344 """Search dictionary 'haystack' for an entry in 'needles'
345 """
346 test = [haystack.get(x) for x in needles]
347 test = set([x for x in test if x is not None])
348 if len(test) == 0:
349 if "metadata" in haystack:
350 return search(haystack["metadata"], needles)
351 else:
352 return None
353 elif len(test) == 1:
354 value = list(test)[0]
355 if value == "":
356 return None
357 else:
358 return value
359 else:
360 raise ValueError(f"Too many values found: {len(test)} {test} {needles}")
361
362 if "metadata" in dictionary:
363 metadata = dictionary["metadata"]
364
365 if self._OBSTYPE != metadata["OBSTYPE"]:
366 raise RuntimeError(f"Incorrect calibration supplied. Expected {self._OBSTYPE}, "
367 f"found {metadata['OBSTYPE']}")
368
369 if (value := search(dictionary, ["INSTRUME", "instrument"])) is not None:
370 self._instrument = value
371 if (value := search(dictionary, ["RAFTNAME"])) is not None:
372 self._slotName = value
373 if (value := search(dictionary, ["DETECTOR", "detectorId"])) is not None:
374 self._detectorId = value
375 if (value := search(dictionary, ["DET_NAME", "DETECTOR_NAME", "detectorName"])) is not None:
376 self._detectorName = value
377 if (value := search(dictionary, ["DET_SER", "DETECTOR_SERIAL", "detectorSerial"])) is not None:
378 self._detectorSerial = value
379 if (value := search(dictionary, ["FILTER", "filterName"])) is not None:
380 self._filter = value
381 if (value := search(dictionary, ["CALIB_ID"])) is not None:
382 self._calibId = value
383 if (value := search(dictionary, ["SEQFILE"])) is not None:
384 self._seqfile = value
385 if (value := search(dictionary, ["SEQNAME"])) is not None:
386 self._seqname = value
387 if (value := search(dictionary, ["SEQCKSUM"])) is not None:
388 self._seqcksum = value
389
390 @classmethod
391 def determineCalibClass(cls, metadata, message):
392 """Attempt to find calibration class in metadata.
393
394 Parameters
395 ----------
396 metadata : `dict` or `lsst.daf.base.PropertyList`
397 Metadata possibly containing a calibration class entry.
398 message : `str`
399 Message to include in any errors.
400
401 Returns
402 -------
403 calibClass : `object`
404 The class to use to read the file contents. Should be an
405 `lsst.ip.isr.IsrCalib` subclass.
406
407 Raises
408 ------
409 ValueError
410 Raised if the resulting calibClass is the base
411 `lsst.ip.isr.IsrClass` (which does not implement the
412 content methods).
413 """
414 calibClassName = metadata.get("CALIBCLS")
415 if calibClassName is None:
416 calibClassName = metadata.get("fileType")
417 if calibClassName == "shutterMotionProfile":
418 calibClassName = "lsst.ip.isr.ShutterMotionProfile"
419 calibClass = doImport(calibClassName) if calibClassName is not None else cls
420 if calibClass is IsrCalib:
421 raise ValueError(f"Cannot use base class to read calibration data: {message}")
422 return calibClass
423
424 @classmethod
425 def readText(cls, filename, **kwargs):
426 """Read calibration representation from a yaml/ecsv file.
427
428 Parameters
429 ----------
430 filename : `str`
431 Name of the file containing the calibration definition.
432 kwargs : `dict` or collections.abc.Mapping`, optional
433 Set of key=value pairs to pass to the ``fromDict`` or
434 ``fromTable`` methods.
435
436 Returns
437 -------
438 calib : `~lsst.ip.isr.IsrCalibType`
439 Calibration class.
440
441 Raises
442 ------
443 RuntimeError
444 Raised if the filename does not end in ".ecsv" or ".yaml".
445 """
446 if filename.endswith((".ecsv", ".ECSV")):
447 data = Table.read(filename, format="ascii.ecsv")
448 calibClass = cls.determineCalibClass(data.meta, "readText/ECSV")
449 return calibClass.fromTable([data], **kwargs)
450 elif filename.endswith((".yaml", ".YAML")):
451 with open(filename, "r") as f:
452 data = yaml.load(f, Loader=yaml.CLoader)
453 calibClass = cls.determineCalibClass(data["metadata"], "readText/YAML")
454 return calibClass.fromDict(data, **kwargs)
455 elif filename.endswith((".json", ".JSON")):
456 with open(filename, "r") as f:
457 data = json.load(f)
458 calibClass = cls.determineCalibClass(data, "readText/JSON")
459 return calibClass.fromDict(data, **kwargs)
460 else:
461 raise RuntimeError(f"Unknown filename extension: {filename}")
462
463 def writeText(self, filename, format="auto"):
464 """Write the calibration data to a text file.
465
466 Parameters
467 ----------
468 filename : `str`
469 Name of the file to write.
470 format : `str`
471 Format to write the file as. Supported values are:
472 ``"auto"`` : Determine filetype from filename.
473 ``"yaml"`` : Write as yaml.
474 ``"ecsv"`` : Write as ecsv.
475
476 Returns
477 -------
478 used : `str`
479 The name of the file used to write the data. This may
480 differ from the input if the format is explicitly chosen.
481
482 Raises
483 ------
484 RuntimeError
485 Raised if filename does not end in a known extension, or
486 if all information cannot be written.
487
488 Notes
489 -----
490 The file is written to YAML/ECSV format and will include any
491 associated metadata.
492 """
493 if format == "yaml" or (format == "auto" and filename.lower().endswith((".yaml", ".YAML"))):
494 outDict = self.toDict()
495 path, ext = os.path.splitext(filename)
496 filename = path + ".yaml"
497 with open(filename, "w") as f:
498 yaml.dump(outDict, f)
499 elif format == "ecsv" or (format == "auto" and filename.lower().endswith((".ecsv", ".ECSV"))):
500 tableList = self.toTable()
501 if len(tableList) > 1:
502 # ECSV doesn't support multiple tables per file, so we
503 # can only write the first table.
504 raise RuntimeError(f"Unable to persist {len(tableList)}tables in ECSV format.")
505
506 table = tableList[0]
507 path, ext = os.path.splitext(filename)
508 filename = path + ".ecsv"
509 table.write(filename, format="ascii.ecsv")
510 else:
511 raise RuntimeError(f"Attempt to write to a file {filename} "
512 "that does not end in '.yaml' or '.ecsv'")
513
514 return filename
515
516 @classmethod
517 def readFits(cls, filename, **kwargs):
518 """Read calibration data from a FITS file.
519
520 Parameters
521 ----------
522 filename : `str`
523 Filename to read data from.
524 kwargs : `dict` or collections.abc.Mapping`, optional
525 Set of key=value pairs to pass to the ``fromTable``
526 method.
527
528 Returns
529 -------
530 calib : `lsst.ip.isr.IsrCalib`
531 Calibration contained within the file.
532 """
533 tableList = []
534 tableList.append(Table.read(filename, hdu=1, mask_invalid=False))
535 extNum = 2 # Fits indices start at 1, we've read one already.
536 keepTrying = True
537
538 while keepTrying:
539 with warnings.catch_warnings():
540 warnings.simplefilter("error")
541 try:
542 newTable = Table.read(filename, hdu=extNum, mask_invalid=False)
543 tableList.append(newTable)
544 extNum += 1
545 except Exception:
546 keepTrying = False
547
548 calibClass = cls.determineCalibClass(tableList[0].meta, "readFits")
549 for table in tableList:
550 if calibClass._OBSTYPE == "BF_DISTORTION_MATRIX":
551 table.convert_bytestring_to_unicode()
552 for k, v in table.meta.items():
553 if isinstance(v, fits.card.Undefined):
554 table.meta[k] = None
555
556 if calibClass._OBSTYPE in ("PHOTODIODE", ):
557 # Merge primary header, as these types store information
558 # there.
559 with fits.open(filename) as hdul:
560 primaryHeader = hdul[0].header
561 tableList[0].meta = merge_headers([tableList[0].meta, primaryHeader], mode="first")
562
563 return calibClass.fromTable(tableList, **kwargs)
564
565 def writeFits(self, filename):
566 """Write calibration data to a FITS file.
567
568 Parameters
569 ----------
570 filename : `str`
571 Filename to write data to.
572
573 Returns
574 -------
575 used : `str`
576 The name of the file used to write the data.
577 """
578 tableList = self.toTable()
579 with warnings.catch_warnings():
580 warnings.filterwarnings("ignore", category=Warning, module="astropy.io")
581 astropyList = [fits.table_to_hdu(table) for table in tableList]
582 astropyList.insert(0, fits.PrimaryHDU())
583
584 writer = fits.HDUList(astropyList)
585 writer.writeto(filename, overwrite=True)
586 return filename
587
588 def fromDetector(self, detector):
589 """Modify the calibration parameters to match the supplied detector.
590
591 Parameters
592 ----------
593 detector : `lsst.afw.cameraGeom.Detector`
594 Detector to use to set parameters from.
595
596 Raises
597 ------
598 NotImplementedError
599 Raised if not implemented by a subclass.
600 This needs to be implemented by subclasses for each
601 calibration type.
602 """
603 raise NotImplementedError("Must be implemented by subclass.")
604
605 @classmethod
606 def fromDict(cls, dictionary, **kwargs):
607 """Construct a calibration from a dictionary of properties.
608
609 Must be implemented by the specific calibration subclasses.
610
611 Parameters
612 ----------
613 dictionary : `dict`
614 Dictionary of properties.
615 kwargs : `dict` or collections.abc.Mapping`, optional
616 Set of key=value options.
617
618 Returns
619 -------
620 calib : `lsst.ip.isr.CalibType`
621 Constructed calibration.
622
623 Raises
624 ------
625 NotImplementedError
626 Raised if not implemented.
627 """
628 raise NotImplementedError("Must be implemented by subclass.")
629
630 def toDict(self):
631 """Return a dictionary containing the calibration properties.
632
633 The dictionary should be able to be round-tripped through
634 `fromDict`.
635
636 Returns
637 -------
638 dictionary : `dict`
639 Dictionary of properties.
640
641 Raises
642 ------
643 NotImplementedError
644 Raised if not implemented.
645 """
646 raise NotImplementedError("Must be implemented by subclass.")
647
648 @classmethod
649 def fromTable(cls, tableList, **kwargs):
650 """Construct a calibration from a dictionary of properties.
651
652 Must be implemented by the specific calibration subclasses.
653
654 Parameters
655 ----------
656 tableList : `list` [`lsst.afw.table.Table`]
657 List of tables of properties.
658 kwargs : `dict` or collections.abc.Mapping`, optional
659 Set of key=value options.
660
661 Returns
662 -------
663 calib : `lsst.ip.isr.CalibType`
664 Constructed calibration.
665
666 Raises
667 ------
668 NotImplementedError
669 Raised if not implemented.
670 """
671 raise NotImplementedError("Must be implemented by subclass.")
672
673 def toTable(self):
674 """Return a list of tables containing the calibration properties.
675
676 The table list should be able to be round-tripped through
677 `fromDict`.
678
679 Returns
680 -------
681 tableList : `list` [`lsst.afw.table.Table`]
682 List of tables of properties.
683
684 Raises
685 ------
686 NotImplementedError
687 Raised if not implemented.
688 """
689 raise NotImplementedError("Must be implemented by subclass.")
690
691 def validate(self, other=None):
692 """Validate that this calibration is defined and can be used.
693
694 Parameters
695 ----------
696 other : `object`, optional
697 Thing to validate against.
698
699 Returns
700 -------
701 valid : `bool`
702 Returns true if the calibration is valid and appropriate.
703 """
704 return False
705
706 def apply(self, target):
707 """Method to apply the calibration to the target object.
708
709 Parameters
710 ----------
711 target : `object`
712 Thing to validate against.
713
714 Returns
715 -------
716 valid : `bool`
717 Returns true if the calibration was applied correctly.
718
719 Raises
720 ------
721 NotImplementedError
722 Raised if not implemented.
723 """
724 raise NotImplementedError("Must be implemented by subclass.")
725
726
728 """Class for the provenance of data used to construct calibration.
729
730 Provenance is not really a calibration, but we would like to
731 record this when constructing the calibration, and it provides an
732 example of the base calibration class.
733
734 Parameters
735 ----------
736 instrument : `str`, optional
737 Name of the instrument the data was taken with.
738 calibType : `str`, optional
739 Type of calibration this provenance was generated for.
740 detectorName : `str`, optional
741 Name of the detector this calibration is for.
742 detectorSerial : `str`, optional
743 Identifier for the detector.
744
745 """
746 _OBSTYPE = "IsrProvenance"
747
748 def __init__(self, calibType="unknown",
749 **kwargs):
750 self.calibType = calibType
751 self.dimensions = set()
752 self.dataIdList = list()
753
754 super().__init__(**kwargs)
755
756 self.requiredAttributes.update(["calibType", "dimensions", "dataIdList"])
757
758 def __str__(self):
759 return f"{self.__class__.__name__}(obstype={self._OBSTYPE}, calibType={self.calibType}, )"
760
761 def __eq__(self, other):
762 return super().__eq__(other)
763
764 def updateMetadata(self, setDate=False, **kwargs):
765 """Update calibration metadata.
766
767 Parameters
768 ----------
769 setDate : `bool`, optional
770 Update the ``CALIBDATE`` fields in the metadata to the current
771 time. Defaults to False.
772 kwargs : `dict` or `collections.abc.Mapping`, optional
773 Other keyword parameters to set in the metadata.
774 """
775 kwargs["calibType"] = self.calibType
776 super().updateMetadata(setDate=setDate, **kwargs)
777
778 def fromDataIds(self, dataIdList):
779 """Update provenance from dataId List.
780
781 Parameters
782 ----------
783 dataIdList : `list` [`lsst.daf.butler.DataId`]
784 List of dataIds used in generating this calibration.
785 """
786 for dataId in dataIdList:
787 for key in dataId:
788 if key not in self.dimensions:
789 self.dimensions.add(key)
790 self.dataIdList.append(dataId)
791
792 @classmethod
793 def fromTable(cls, tableList):
794 """Construct provenance from table list.
795
796 Parameters
797 ----------
798 tableList : `list` [`lsst.afw.table.Table`]
799 List of tables to construct the provenance from.
800
801 Returns
802 -------
803 provenance : `lsst.ip.isr.IsrProvenance`
804 The provenance defined in the tables.
805 """
806 table = tableList[0]
807 metadata = table.meta
808 inDict = dict()
809 inDict["metadata"] = metadata
810 inDict["calibType"] = metadata["calibType"]
811 inDict["dimensions"] = set()
812 inDict["dataIdList"] = list()
813
814 schema = dict()
815 for colName in table.columns:
816 schema[colName.lower()] = colName
817 inDict["dimensions"].add(colName.lower())
818 inDict["dimensions"] = sorted(inDict["dimensions"])
819
820 for row in table:
821 entry = dict()
822 for dim in sorted(inDict["dimensions"]):
823 entry[dim] = row[schema[dim]]
824 inDict["dataIdList"].append(entry)
825
826 return cls.fromDict(inDict)
827
828 @classmethod
829 def fromDict(cls, dictionary):
830 """Construct provenance from a dictionary.
831
832 Parameters
833 ----------
834 dictionary : `dict`
835 Dictionary of provenance parameters.
836
837 Returns
838 -------
839 provenance : `lsst.ip.isr.IsrProvenance`
840 The provenance defined in the tables.
841 """
842 calib = cls()
843 if calib._OBSTYPE != dictionary["metadata"]["OBSTYPE"]:
844 raise RuntimeError(f"Incorrect calibration supplied. Expected {calib._OBSTYPE}, "
845 f"found {dictionary['metadata']['OBSTYPE']}")
846 calib.updateMetadata(setDate=False, setCalibInfo=True, **dictionary["metadata"])
847
848 # These properties should be in the metadata, but occasionally
849 # are found in the dictionary itself. Check both places,
850 # ending with `None` if neither contains the information.
851 calib.calibType = dictionary["calibType"]
852 calib.dimensions = set(dictionary["dimensions"])
853 calib.dataIdList = dictionary["dataIdList"]
854
855 calib.updateMetadata()
856 return calib
857
858 def toDict(self):
859 """Return a dictionary containing the provenance information.
860
861 Returns
862 -------
863 dictionary : `dict`
864 Dictionary of provenance.
865 """
866 self.updateMetadata()
867
868 outDict = {}
869
870 metadata = self.getMetadata()
871 outDict["metadata"] = metadata
872 outDict["detectorName"] = self._detectorName
873 outDict["detectorSerial"] = self._detectorSerial
874 outDict["detectorId"] = self._detectorId
875 outDict["instrument"] = self._instrument
876 outDict["calibType"] = self.calibType
877 outDict["dimensions"] = list(self.dimensions)
878 outDict["dataIdList"] = self.dataIdList
879
880 return outDict
881
882 def toTable(self):
883 """Return a list of tables containing the provenance.
884
885 This seems inefficient and slow, so this may not be the best
886 way to store the data.
887
888 Returns
889 -------
890 tableList : `list` [`lsst.afw.table.Table`]
891 List of tables containing the provenance information
892 """
893 tableList = []
894 self.updateMetadata(setDate=True, setCalibInfo=True)
895
896 catalog = Table(rows=self.dataIdList,
897 names=self.dimensions)
898 filteredMetadata = {k: v for k, v in self.getMetadata().toDict().items() if v is not None}
899 catalog.meta = filteredMetadata
900 tableList.append(catalog)
901 return tableList
Class for storing ordered metadata with comments.
fromDict(cls, dictionary, **kwargs)
Definition calibType.py:606
writeText(self, filename, format="auto")
Definition calibType.py:463
readText(cls, filename, **kwargs)
Definition calibType.py:425
calibInfoFromDict(self, dictionary)
Definition calibType.py:325
updateMetadataFromExposures(self, exposures)
Definition calibType.py:294
determineCalibClass(cls, metadata, message)
Definition calibType.py:391
__init__(self, camera=None, detector=None, log=None, **kwargs)
Definition calibType.py:68
updateMetadata(self, camera=None, detector=None, filterName=None, setCalibId=False, setCalibInfo=False, setDate=False, **kwargs)
Definition calibType.py:210
fromTable(cls, tableList, **kwargs)
Definition calibType.py:649
readFits(cls, filename, **kwargs)
Definition calibType.py:517
__init__(self, calibType="unknown", **kwargs)
Definition calibType.py:749
updateMetadata(self, setDate=False, **kwargs)
Definition calibType.py:764