24 __all__ = [
"Curve", 
"AmpCurve", 
"DetectorCurve", 
"ImageCurve"]
 
   26 from scipy.interpolate 
import interp1d
 
   27 from astropy.table 
import QTable
 
   28 import astropy.units 
as u
 
   29 from abc 
import ABC, abstractmethod
 
   39     """ An abstract class to represent an arbitrary curve with 
   45     def __init__(self, wavelength, efficiency, metadata):
 
   46         if not (isinstance(wavelength, u.Quantity) 
and wavelength.unit.physical_type == 
'length'):
 
   47             raise ValueError(
'The wavelength must be a quantity with a length sense.')
 
   48         if not isinstance(efficiency, u.Quantity) 
or efficiency.unit != u.percent:
 
   49             raise ValueError(
'The efficiency must be a quantity with units of percent.')
 
   53         metadata.update({
'MODE': self.
mode, 
'TYPE': 
'QE'})
 
   59         """Class method for constructing a `Curve` object. 
   63         table : `astropy.table.QTable` 
   64             Table containing metadata and columns necessary 
   65             for constructing a `Curve` object. 
   70             A `Curve` subclass of the appropriate type according 
   77         """Convert this `Curve` object to an `astropy.table.QTable`. 
   81         table : `astropy.table.QTable` 
   82             A table object containing the data from this `Curve`. 
   87     def evaluate(self, detector, position, wavelength, kind='linear', bounds_error=False, fill_value=0):
 
   88         """Interpolate the curve at the specified position and wavelength. 
   92         detector : `lsst.afw.cameraGeom.Detector` 
   93             Is used to find the appropriate curve given the position for 
   94             curves that vary over the detector.  Ignored in the case where 
   95             there is only a single curve per detector. 
   96         position : `lsst.geom.Point2D` 
   97             The position on the detector at which to evaluate the curve. 
   98         wavelength : `astropy.units.Quantity` 
   99             The wavelength(s) at which to make the interpolation. 
  100         kind : `str`, optional 
  101             The type of interpolation to do (default is 'linear'). 
  102             See documentation for `scipy.interpolate.interp1d` for 
  104         bounds_error : `bool`, optional 
  105             Raise error if interpolating outside the range of x? 
  107         fill_value : `float`, optional 
  108             Fill values outside the range of x with this value 
  113         value : `astropy.units.Quantity` 
  114             Interpolated value(s).  Number of values returned will match the 
  115             length of `wavelength`. 
  120             If the ``bounds_error`` is changed from the default, it will raise 
  121             a `ValueError` if evaluating outside the bounds of the curve. 
  127         """Register subclasses with the abstract base class""" 
  129         if cls.
mode in Curve.subclasses:
 
  130             raise ValueError(f
'Class for mode, {cls.mode}, already defined')
 
  131         Curve.subclasses[cls.
mode] = cls
 
  135         """Define equality for this class""" 
  139                          keys_to_compare=['MODE', 'TYPE', 'CALIBDATE', 'INSTRUME', 'OBSTYPE', 'DETECTOR']):
 
  140         """Compare metadata in this object to another. 
  145             The object with which to compare metadata. 
  146         keys_to_compare : `list` 
  147             List of metadata keys to compare. 
  152             Are the metadata the same? 
  154         for k 
in keys_to_compare:
 
  155             if self.metadata[k] != other.metadata[k]:
 
  159     def interpolate(self, wavelengths, values, wavelength, kind, bounds_error, fill_value):
 
  160         """Interplate the curve at the specified wavelength(s). 
  164         wavelengths : `astropy.units.Quantity` 
  165             The wavelength values for the curve. 
  166         values : `astropy.units.Quantity` 
  167             The y-values for the curve. 
  168         wavelength : `astropy.units.Quantity` 
  169             The wavelength(s) at which to make the interpolation. 
  171             The type of interpolation to do.  See documentation for 
  172             `scipy.interpolate.interp1d` for accepted values. 
  176         value : `astropy.units.Quantity` 
  177             Interpolated value(s) 
  179         if not isinstance(wavelength, u.Quantity):
 
  180             raise ValueError(
"Wavelengths at which to interpolate must be astropy quantities")
 
  181         if not (isinstance(wavelengths, u.Quantity) 
and isinstance(values, u.Quantity)):
 
  182             raise ValueError(
"Model to be interpreted must be astropy quantities")
 
  183         interp_wavelength = wavelength.to(wavelengths.unit)
 
  184         f = interp1d(wavelengths, values, kind=kind, bounds_error=bounds_error, fill_value=fill_value)
 
  185         return f(interp_wavelength.value)*values.unit
 
  193             Dictionary of metadata for this curve. 
  200         """Class method for constructing a `Curve` object from 
  201         the standardized text format. 
  206             Path to the text file to read. 
  211             A `Curve` subclass of the appropriate type according 
  212             to the table metadata 
  214         table = QTable.read(filename, format=
'ascii.ecsv')
 
  219         """Class method for constructing a `Curve` object from 
  220         the standardized FITS format. 
  225             Path to the FITS file to read. 
  230             A `Curve` subclass of the appropriate type according 
  231             to the table metadata 
  233         table = QTable.read(filename, format=
'fits')
 
  237     def _check_cols(cols, table):
 
  238         """Check that the columns are in the table""" 
  240             if col 
not in table.columns:
 
  241                 raise ValueError(f
'The table must include a column named "{col}".')
 
  243     def _to_table_with_meta(self):
 
  244         """Compute standard metadata before writing file out""" 
  245         now = datetime.datetime.utcnow()
 
  247         metadata = table.meta
 
  248         metadata[
"DATE"] = now.isoformat()
 
  249         metadata[
"CALIB_CREATION_DATE"] = now.strftime(
"%Y-%m-%d")
 
  250         metadata[
"CALIB_CREATION_TIME"] = now.strftime(
"%T %Z").
strip()
 
  254         """ Write the `Curve` out to a text file. 
  259             Path to the text file to write. 
  264             Because this method forces a particular extension return 
  265             the name of the file actually written. 
  269         path, ext = os.path.splitext(filename)
 
  270         filename = path + 
".ecsv" 
  271         table.write(filename, format=
"ascii.ecsv")
 
  275         """ Write the `Curve` out to a FITS file. 
  280             Path to the FITS file to write. 
  285             Because this method forces a particular extension return 
  286             the name of the file actually written. 
  290         path, ext = os.path.splitext(filename)
 
  291         filename = path + 
".fits" 
  292         table.write(filename, format=
"fits")
 
  297     """Subclass of `Curve` that represents a single curve per detector. 
  301     wavelength : `astropy.units.Quantity` 
  302         Wavelength values for this curve 
  303     efficiency : `astropy.units.Quantity` 
  304         Quantum efficiency values for this curve 
  306         Dictionary of metadata for this curve 
  312                 and numpy.array_equal(self.
wavelength, other.wavelength)
 
  313                 and numpy.array_equal(self.
wavelength, other.wavelength))
 
  318         cls.
_check_cols([
'wavelength', 
'efficiency'], table)
 
  319         return cls(table[
'wavelength'], table[
'efficiency'], table.meta)
 
  325     def evaluate(self, detector, position, wavelength, kind='linear', bounds_error=False, fill_value=0):
 
  328                                 kind=kind, bounds_error=bounds_error, fill_value=fill_value)
 
  332     """Subclass of `Curve` that represents a curve per amp. 
  336     amp_name_list : iterable of `str` 
  337         The name of the amp for each entry 
  338     wavelength : `astropy.units.Quantity` 
  339         Wavelength values for this curve 
  340     efficiency : `astropy.units.Quantity` 
  341         Quantum efficiency values for this curve 
  343         Dictionary of metadata for this curve 
  347     def __init__(self, amp_name_list, wavelength, efficiency, metadata):
 
  348         super().
__init__(wavelength, efficiency, metadata)
 
  349         amp_names = 
set(amp_name_list)
 
  351         for amp_name 
in amp_names:
 
  352             idx = numpy.where(amp_name_list == amp_name)[0]
 
  355             if isinstance(name, bytes):
 
  357             self.
data[name] = (wavelength[idx], efficiency[idx])
 
  363             if not numpy.array_equal(self.
data[k][0], other.data[k][0]):
 
  365             if not numpy.array_equal(self.
data[k][1], other.data[k][1]):
 
  372         cls.
_check_cols([
'amp_name', 
'wavelength', 
'efficiency'], table)
 
  373         return cls(table[
'amp_name'], table[
'wavelength'],
 
  374                    table[
'efficiency'], table.meta)
 
  380         names = numpy.array([])
 
  385             if wavelength 
is None:
 
  387                 wavelength = val[0].value
 
  389                 wavelength = numpy.concatenate([wavelength, val[0].value])
 
  390             if efficiency 
is None:
 
  392                 efficiency = val[1].value
 
  394                 efficiency = numpy.concatenate([efficiency, val[1].value])
 
  395             names = numpy.concatenate([names, numpy.full(val[0].shape, amp_name)])
 
  396         names = numpy.array(names)
 
  398         return QTable({
'amp_name': names, 
'wavelength': wavelength*wunit, 
'efficiency': efficiency*eunit},
 
  401     def evaluate(self, detector, position, wavelength, kind='linear', bounds_error=False, fill_value=0):
 
  403         amp = cgUtils.findAmp(detector, 
Point2I(position))  
 
  404         w, e = self.
data[amp.getName()]
 
  405         return self.
interpolate(w, e, wavelength, kind=kind, bounds_error=bounds_error,
 
  406                                 fill_value=fill_value)
 
  414         raise NotImplementedError()
 
  418         raise NotImplementedError()
 
  420     def evaluate(self, detector, position, wavelength, kind='linear', bounds_error=False, fill_value=0):
 
  422         raise NotImplementedError()