LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
shutterMotion.py
Go to the documentation of this file.
1# This file is part of ip_isr.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21"""
22Shutter motion profile storage class
23"""
24
25__all__ = ["ShutterMotionProfile"]
26
27from astropy.table import Table
28from scipy.optimize import newton
29import numpy as np
30
31from lsst.ip.isr import IsrCalib
32
33
35 """Shutter motion profile measurements.
36
37 Parameters
38 ----------
39 log : `logging.Logger`, optional
40 Log to write messages to. If `None` a default logger will be used.
41 **kwargs :
42 Additional parameters.
43 """
44
45 _OBSTYPE = "shutterMotionProfile"
46 _SCHEMA = "ShutterMotionProfile"
47 _VERSION = 1.0
48
49 def __init__(self, **kwargs):
50 super().__init__(**kwargs)
51
52 # Quantities that come from `encodeSamples`
53 self.time_tai = []
54 self.time_mjd = []
55 self.position = []
56 self.hall_time_tai = []
57 self.hall_time_mjd = []
58 self.hall_position = []
59 self.hall_sensorId = []
60 self.hall_isOn = []
61 self.fit_name = []
63 self.fit_pivot1 = []
64 self.fit_pivot2 = []
65 self.fit_jerk0 = []
66 self.fit_jerk1 = []
67 self.fit_jerk2 = []
68
69 self.requiredAttributes.update(["time_tai", "time_mjd", "position",
70 "hall_time_tai", "hall_time_mjd", "hall_position",
71 "hall_sensorId", "hall_isOn",
72 "fit_name", "fit_start_time", "fit_pivot1",
73 "fit_pivot2", "fit_jerk0", "fit_jerk1", "fit_jerk2",
74 ])
75
76 def calculateMidpoint(self, modelName="hallSensorFit"):
77 """Calculate time of midpoint of travel for this profile.
78
79 Derived from Shuang Liang's CTN-002 (https://ctn-002.lsst.io).
80 Equation numbers listed are from this document. As the fits
81 have already been done, we can ignore the raw position/Hall
82 sensor data.
83
84 Parameters
85 ----------
86 modelName : `str`
87 Fit model to use to calculate the midpoint.
88
89 Returns
90 -------
91 tm_accel : `float`
92 The time of the midpoint from the start of motion in
93 seconds, as derived from the point where the acceleration
94 on the shutter is zero.
95 tm_position : `float`
96 The time of the midpoint from the start of motion in
97 seconds, as derived from the point where the shutter
98 position is midway between its starting and ending
99 locations.
100
101 Raises
102 ------
103 RuntimeError
104 Raised if the requested ``modelName`` is not found in the
105 calibration.
106 """
107 modelIndex = -1
108 for idx, name in enumerate(self.fit_name):
109 if name == modelName:
110 modelIndex = idx
111 if modelIndex == -1:
112 raise RuntimeError(f"Unknown model {modelName} requested.")
113
114 # Alias to follow technote
115 t0 = self.fit_start_time[modelIndex]
116 t1 = self.fit_pivot1[modelIndex]
117 t2 = self.fit_pivot2[modelIndex]
118
119 # Equation (3.1)
120 j0 = self.fit_jerk0[modelIndex]
121 j1 = self.fit_jerk1[modelIndex]
122
123 # Equation (3.2)
124 a1 = j0*t1
125
126 # Equation (3.4)
127 A1 = a1 - j1*t1
128
129 # First estimate of midpoint, where acceleration is zero.
130 # a = 0 = A1 + j1*t (Equation 5.1)
131 def acc(t):
132 return A1 + j1 * t
133
134 try:
135 tm_accel = newton(acc, 0.5*(t2 + t1))
136 except Exception as e:
137 self.log.warn(f"Midpoint calculation (from acceleration) failed to converge: {e}")
138 tm_accel = np.nan
139
140 # Second estimate of midpoint, when s is halfway betweeen
141 # start and final position. Equation (5.2).
142 V1 = t1**2 * (j0 - j1)/2. - t1*A1
143 S1 = t1**3 * (j0 - j1)/6. - t1**2 * A1/2. - t1*V1
144 Smid = 0.5*(self.metadata["startPosition"] + self.metadata["endPosition"])
145
146 def pos(t):
147 return j1*(t**3)/6. + A1*(t**2)/2. + V1*t + S1 - Smid
148
149 try:
150 tm_position = newton(pos, tm_accel)
151 except Exception as e:
152 self.log.warn(f"Midpoint calculation (from position) failed to converge: {e}")
153 tm_position = np.nan
154
155 # Restore t0 so these can be compared to raw timestamps.
156 return tm_accel + t0, tm_position + t0
157
158 @classmethod
159 def fromDict(cls, dictionary):
160 """Construct a ShutterMotionProfile from a dictionary of properties.
161
162 Parameters
163 ----------
164 dictionary : `dict`
165 Dictionary of properties.
166
167 Returns
168 -------
169 calib : `lsst.ip.isr.ShutterMotionProfile
170 Constructed calibration.
171
172 Raises
173 ------
174 RuntimeError
175 Raised if the supplied dictionary is for a different
176 calibration type.
177 """
178 calib = cls()
179
180 if calib._OBSTYPE != dictionary["fileType"]:
181 raise RuntimeError(f"Incorrect calibration supplied. Expected {calib._OBSTYPE}, "
182 f"found {dictionary['OBSTYPE']}")
183 motionProfile = dictionary.pop("motionProfile")
184
185 encodeSamples = motionProfile.pop("encodeSamples")
186 hallTransitions = motionProfile.pop("hallTransitions")
187 fitResults = motionProfile.pop("fitResults")
188
189 if "metadata" in dictionary:
190 metadata = dictionary.pop("metadata")
191 for key, value in metadata.items():
192 dictionary[key] = value
193 calib.setMetadata(dictionary)
194
195 formatVersion = calib.metadata["version"]
196
197 startTime = motionProfile.pop("startTime")
198 if formatVersion == 1.0:
199 # Original format.
200 motionProfile["startTime_tai"] = startTime["tai"]
201 motionProfile["startTime_mjd"] = startTime["mjd"]
202 else:
203 # Update to clarify all times are in the TAI system.
204 motionProfile["startTime_tai"] = startTime["tai"]["isot"]
205 motionProfile["startTime_mjd"] = startTime["tai"]["mjd"]
206
207 calib.readEncodeSamples(encodeSamples, formatVersion)
208 calib.readHallTransitions(hallTransitions, formatVersion)
209 calib.readFitResults(fitResults)
210
211 calib.updateMetadata(**motionProfile)
212 return calib
213
214 def toDict(self):
215 """Return a dictionary containing the calibration properties.
216
217 The dictionary should be able to be round-tripped through
218 `fromDict`.
219
220 Returns
221 -------
222 dictionary : `dict`
223 Dictionary of properties.
224 """
225 self.updateMetadata()
226 formatVersion = self.metadata["version"]
227
228 if formatVersion == 1.0:
229 outDict = {
230 "fileName": self.metadata["fileName"],
231 "fileType": self.metadata["fileType"],
232 "metadata": {
233 "CALIBCLS": "lsst.ip.isr.ShutterMotionProfile",
234 "OBSTYPE": self._OBSTYPE,
235 },
236 "obsId": self.metadata["obsId"],
237 "version": self.metadata.get("version", -1),
238 "motionProfile": {
239 "startTime": {
240 "tai": self.metadata["startTime_tai"],
241 "mjd": self.metadata["startTime_mjd"],
242 },
243 "startPosition": self.metadata["startPosition"],
244 "targetPosition": self.metadata["targetPosition"],
245 "endPosition": self.metadata["endPosition"],
246 "targetDuration": self.metadata["targetDuration"],
247 "actionDuration": self.metadata["actionDuration"],
248 "side": self.metadata["side"],
249 "isOpen": self.metadata["isOpen"],
250 "encodeSamples": self.writeEncodeSamples(),
251 "hallTransitions": self.writeHallTransitions(),
252 "fitResults": self.writeFitResults(),
253 },
254 }
255 elif formatVersion == 2.0:
256 outDict = {
257 "fileName": self.metadata["fileName"],
258 "fileType": self.metadata["fileType"],
259 "metadata": {
260 "CALIBCLS": "lsst.ip.isr.ShutterMotionProfile",
261 "OBSTYPE": self._OBSTYPE,
262 },
263 "obsId": self.metadata["obsId"],
264 "version": self.metadata.get("version", -1),
265 "motionProfile": {
266 "startTime": {
267 "tai": {
268 "isot": self.metadata["startTime_tai"],
269 "mjd": self.metadata["startTime_mjd"],
270 },
271 },
272 "startPosition": self.metadata["startPosition"],
273 "targetPosition": self.metadata["targetPosition"],
274 "endPosition": self.metadata["endPosition"],
275 "targetDuration": self.metadata["targetDuration"],
276 "actionDuration": self.metadata["actionDuration"],
277 "side": self.metadata["side"],
278 "isOpen": self.metadata["isOpen"],
279 "encodeSamples": self.writeEncodeSamples(),
280 "hallTransitions": self.writeHallTransitions(),
281 "fitResults": self.writeFitResults(),
282 },
283 }
284 else:
285 raise RuntimeError(f"Unknown file version: {formatVersion}")
286 return outDict
287
288 @classmethod
289 def fromTable(cls, tableList):
290 """Construct calibration from a list of tables.
291
292 This method uses the `fromDict` method to create the
293 calibration, after constructing an appropriate dictionary from
294 the input tables.
295
296 Parameters
297 ----------
298 tableList : `list` [`lsst.afw.table.Table`]
299 List of tables to use to construct the crosstalk
300 calibration. For shutter motion profiles, the first table
301 contains the samples, the second the Hall transition data,
302 and the third the model fits.
303
304 Returns
305 -------
306 calib : `lsst.ip.isr.ShutterMotionProfile`
307 The calibration defined in the tables.
308 """
309 samples = tableList[0]
310 transitions = tableList[1]
311 modelFits = tableList[2]
312
313 metadata = samples.meta
314
315 calib = cls()
316 calib.time_tai = np.squeeze(samples["TIME_TAI"].data).tolist()
317 if hasattr(calib.time_tai[0], "decode"):
318 calib.time_tai = [time.decode("utf-8") for time in calib.time_tai]
319 calib.time_mjd = np.squeeze(samples["TIME_MJD"].data).tolist()
320 calib.position = np.squeeze(samples["POSITION"].data).tolist()
321
322 calib.hall_time_tai = np.squeeze(transitions["HALL_TIME_TAI"].data).tolist()
323 if hasattr(calib.hall_time_tai[0], "decode"):
324 calib.hall_time_tai = [time.decode("utf-8") for time in calib.hall_time_tai]
325 calib.hall_time_mjd = np.squeeze(transitions["HALL_TIME_MJD"].data).tolist()
326 calib.hall_position = np.squeeze(transitions["HALL_POSITION"].data).tolist()
327 calib.hall_sensorId = np.squeeze(transitions["HALL_SENSORID"].data).tolist()
328 calib.hall_isOn = np.squeeze(transitions["HALL_ISON"].data).tolist()
329
330 calib.fit_model = modelFits.meta["FIT_MODEL"]
331
332 calib.fit_name = np.squeeze(modelFits["FIT_NAME"].data).tolist()
333 if hasattr(calib.fit_name[0], "decode"):
334 calib.fit_name = [fit.decode("utf-8") for fit in calib.fit_name]
335 calib.fit_start_time = np.squeeze(modelFits["FIT_START_TIME"].data).tolist()
336 calib.fit_pivot1 = np.squeeze(modelFits["FIT_PIVOT1"].data).tolist()
337 calib.fit_pivot2 = np.squeeze(modelFits["FIT_PIVOT2"].data).tolist()
338 calib.fit_jerk0 = np.squeeze(modelFits["FIT_JERK0"].data).tolist()
339 calib.fit_jerk1 = np.squeeze(modelFits["FIT_JERK1"].data).tolist()
340 calib.fit_jerk2 = np.squeeze(modelFits["FIT_JERK2"].data).tolist()
341
342 if "OBSTYPE" not in metadata:
343 metadata["OBSTYPE"] = cls._OBSTYPE
344
345 # This translation is needed to support correct
346 # round-tripping. It's not an elegant solution.
347 for key in ("fileName", "fileType", "obsId", "version", "side", "isOpen"):
348 if key.upper() in metadata:
349 value = metadata.pop(key.upper())
350 metadata[key] = value
351 for key in ("CALIB_ID", "DETECTOR", "DET_NAME", "DET_SER", "FILTER", "INSTRUME",
352 "RAFTNAME", "SEQCKSUM", "SEQFILE", "SEQNAME", "SLOTNAME"):
353 if key in metadata:
354 if metadata[key] == "":
355 metadata[key] = None
356
357 calib.updateMetadata(**metadata)
358 return calib
359
360 def toTable(self):
361 """Construct a list of tables containing the information in this
362 calibration.
363
364 The list of tables should create an identical calibration
365 after being passed to this class's fromTable method.
366
367 Returns
368 -------
369 tableList : `list` [`lsst.afw.table.Table`]
370 List of tables containing the shutter motion profile
371 information.
372 """
373 self.updateMetadata()
374
375 samples = Table(
376 {"TIME_TAI": np.array(self.time_tai).tolist(),
377 "TIME_MJD": np.array(self.time_mjd).tolist(),
378 "POSITION": np.array(self.position).tolist()},
379 names=("TIME_TAI", "TIME_MJD", "POSITION"),
380 dtype=("U32", "f8", "f8")
381 )
382 transitions = Table(
383 {"HALL_TIME_TAI": np.array(self.hall_time_tai).tolist(),
384 "HALL_TIME_MJD": np.array(self.hall_time_mjd).tolist(),
385 "HALL_POSITION": np.array(self.hall_position).tolist(),
386 "HALL_SENSORID": np.array(self.hall_sensorId).tolist(),
387 "HALL_ISON": np.array(self.hall_isOn).tolist()},
388 names=("HALL_TIME_TAI", "HALL_TIME_MJD", "HALL_POSITION",
389 "HALL_SENSORID", "HALL_ISON"),
390 dtype=("U32", "f8", "f8", "i4", "?")
391 )
392 modelFits = Table(
393 {"FIT_NAME": np.array(self.fit_name).tolist(),
394 "FIT_START_TIME": np.array(self.fit_start_time).tolist(),
395 "FIT_PIVOT1": np.array(self.fit_pivot1).tolist(),
396 "FIT_PIVOT2": np.array(self.fit_pivot2).tolist(),
397 "FIT_JERK0": np.array(self.fit_jerk0).tolist(),
398 "FIT_JERK1": np.array(self.fit_jerk1).tolist(),
399 "FIT_JERK2": np.array(self.fit_jerk2).tolist()},
400 names=("FIT_NAME", "FIT_START_TIME", "FIT_PIVOT1", "FIT_PIVOT2",
401 "FIT_JERK0", "FIT_JERK1", "FIT_JERK2"),
402 dtype=("U32", "f8", "f8", "f8", "f8", "f8", "f8")
403 )
404 modelFits.meta["FIT_MODEL"] = self.fit_model
405
406 inMeta = self.getMetadata().toDict()
407 outMeta = {k: v for k, v in inMeta.items() if v is not None}
408 outMeta.update({k: "" for k, v in inMeta.items() if v is None})
409 samples.meta = outMeta
410
411 return [samples, transitions, modelFits]
412
413 def readEncodeSamples(self, inputSamples, formatVersion):
414 """Read a list of input samples into the calibration.
415
416 Parameters
417 ----------
418 inputSamples : `list` [`dict` [`str` `str`]]
419 List of dictionaries of samples.
420 formatVersion : `float`
421 Version of the file format to read.
422
423 Raises
424 ------
425 RuntimeError
426 Raised if the calibration has already read samples, or if
427 the format is not known.
428 """
429 if len(self.time_tai) != 0:
430 raise RuntimeError("Cannot re-read already-read calibration.")
431
432 if formatVersion == 1.0:
433 for sample in inputSamples:
434 self.time_tai.append(sample["time"]["tai"])
435 self.time_mjd.append(sample["time"]["mjd"])
436 self.position.append(sample["position"])
437 elif formatVersion == 2.0:
438 for sample in inputSamples:
439 self.time_tai.append(sample["tai"]["isot"])
440 self.time_mjd.append(sample["tai"]["mjd"])
441 self.position.append(sample["position"])
442 else:
443 raise RuntimeError(f"Unknown file version: {formatVersion}")
444
446 """Return list of samples as dictionaries.
447
448 Returns
449 -------
450 inputSamples : `list` [`dict` [`str` `str`]]
451 List of dictionaries of samples.
452
453 Raises
454 ------
455 RuntimeError
456 Raised if the calibration has not read samples.
457 """
458 if len(self.time_tai) == 0:
459 raise RuntimeError("Cannot export empty calibration.")
460
461 formatVersion = self.metadata["version"]
462
463 samples = []
464 if formatVersion == 1.0:
465 for tai, mjd, position in zip(self.time_tai, self.time_mjd, self.position):
466 sample = {"time": {"tai": tai, "mjd": mjd},
467 "position": position}
468 samples.append(sample)
469 elif formatVersion == 2.0:
470 for tai, mjd, position in zip(self.time_tai, self.time_mjd, self.position):
471 sample = {"tai": {"isot": tai, "mjd": mjd},
472 "position": position}
473 samples.append(sample)
474 else:
475 raise RuntimeError(f"Unknown file version: {formatVersion}")
476
477 return samples
478
479 def readHallTransitions(self, inputTransitions, formatVersion):
480 """Read a list of input samples into the calibration.
481
482 Parameters
483 ----------
484 inputTransitions : `list` [`dict` [`str` `str`]]
485 List of dictionaries of transitions.
486 formatVersion : `float`
487 Version of the file format to read.
488
489 Raises
490 ------
491 RuntimeError
492 Raised if the calibration has already read samples, or if
493 the format is not known.
494 """
495 if len(self.hall_time_tai) != 0:
496 raise RuntimeError("Cannot re-read alreday-read calibration.")
497
498 if formatVersion == 1.0:
499 for transition in inputTransitions:
500 self.hall_time_tai.append(transition["time"]["tai"])
501 self.hall_time_mjd.append(transition["time"]["mjd"])
502 self.hall_position.append(transition["position"])
503 self.hall_sensorId.append(transition["sensorId"])
504 self.hall_isOn.append(bool(transition["isOn"]))
505 elif formatVersion == 2.0:
506 for transition in inputTransitions:
507 self.hall_time_tai.append(transition["tai"]["isot"])
508 self.hall_time_mjd.append(transition["tai"]["mjd"])
509 self.hall_position.append(transition["position"])
510 self.hall_sensorId.append(transition["sensorId"])
511 self.hall_isOn.append(bool(transition["isOn"]))
512 else:
513 raise RuntimeError(f"Unknown file version: {formatVersion}")
514
516 """Return list of samples as dictionaries.
517
518 Returns
519 -------
520 inputTransitions : `list` [`dict` [`str` `str`]]
521 List of dictionaries of Hall transitions
522
523 Raises
524 ------
525 RuntimeError
526 Raised if the calibration has not read Hall
527 transitions.
528 """
529 if len(self.hall_time_tai) == 0:
530 raise RuntimeError("Cannot export empty calibration.")
531
532 formatVersion = self.metadata["version"]
533 if formatVersion not in (1.0, 2.0):
534 raise RuntimeError(f"Unknown file version: {formatVersion}")
535 transitions = []
536
537 for tai, mjd, position, sensorId, isOn in zip(
538 self.hall_time_tai,
539 self.hall_time_mjd,
540 self.hall_position,
541 self.hall_sensorId,
542 self.hall_isOn):
543 if formatVersion == 1.0:
544 transition = {"time": {"tai": tai, "mjd": mjd},
545 "position": position,
546 "sensorId": sensorId,
547 "isOn": isOn}
548 elif formatVersion == 2.0:
549 transition = {"tai": {"isot": tai, "mjd": mjd},
550 "position": position,
551 "sensorId": sensorId,
552 "isOn": isOn}
553 transitions.append(transition)
554 return transitions
555
556 def readFitResults(self, fitResults):
557 """Read a list of fit results into the calibration.
558
559 Parameters
560 ----------
561 inputTransitions : `list` [`dict` [`str` `str`]]
562 List of dictionaries of fit results.
563
564 Raises
565 ------
566 RuntimeError
567 Raised if the calibration has already read fit results.
568 """
569 if len(self.fit_name) != 0:
570 raise RuntimeError("Cannot re-read already-read fit results.")
571 self.fit_model = fitResults.pop("Model")
572
573 for fitName, fitModel in fitResults.items():
574 if hasattr(fitName, "decode"):
575 fitName = fitName.decode("utf-8")
576 self.fit_name.append(fitName)
577 self.fit_start_time.append(fitModel["ModelStartTime"])
578 self.fit_pivot1.append(fitModel["PivotPoint1"])
579 self.fit_pivot2.append(fitModel["PivotPoint2"])
580 self.fit_jerk0.append(fitModel["Jerk0"])
581 self.fit_jerk1.append(fitModel["Jerk1"])
582 self.fit_jerk2.append(fitModel["Jerk2"])
583
585 """Return list of samples as dictionaries.
586
587 Returns
588 -------
589 inputTransitions : `list` [`dict` [`str` `str`]]
590 List of dictionaries of Hall transitions
591
592 Raises
593 ------
594 RuntimeError
595 Raised if the calibration has not read Hall
596 transitions.
597 """
598 if len(self.fit_name) == 0:
599 raise RuntimeError("Cannot export empty calibration.")
600
601 fitResults = {"Model": self.fit_model}
602 for fitName, startTime, pivot1, pivot2, jerk0, jerk1, jerk2 in zip(
603 self.fit_name, self.fit_start_time,
604 self.fit_pivot1, self.fit_pivot2,
605 self.fit_jerk0, self.fit_jerk1, self.fit_jerk2):
606 fitResults[fitName] = {"ModelStartTime": startTime,
607 "PivotPoint1": pivot1,
608 "PivotPoint2": pivot2,
609 "Jerk0": jerk0,
610 "Jerk1": jerk1,
611 "Jerk2": jerk2}
612 return fitResults
updateMetadata(self, camera=None, detector=None, filterName=None, setCalibId=False, setCalibInfo=False, setDate=False, **kwargs)
Definition calibType.py:210
calculateMidpoint(self, modelName="hallSensorFit")
readEncodeSamples(self, inputSamples, formatVersion)
readHallTransitions(self, inputTransitions, formatVersion)