LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
constructCalibs.py
Go to the documentation of this file.
1import sys
2import math
3import time
4import argparse
5import traceback
6import collections
7
8import numpy as np
9
10from astro_metadata_translator import merge_headers, ObservationGroup
11from astro_metadata_translator.serialize import dates_to_fits
12
13from lsst.pex.config import Config, ConfigurableField, Field, ListField, ConfigField
14from lsst.pipe.base import Task, Struct, TaskRunner, ArgumentParser
15import lsst.daf.base as dafBase
16import lsst.afw.math as afwMath
17import lsst.afw.detection as afwDet
18import lsst.afw.image as afwImage
19import lsst.geom as geom
20import lsst.meas.algorithms as measAlg
21from lsst.pipe.tasks.repair import RepairTask
22from lsst.ip.isr import IsrTask
23
24from lsst.ctrl.pool.parallel import BatchPoolTask
25from lsst.ctrl.pool.pool import Pool, NODE
26from lsst.pipe.drivers.background import (SkyMeasurementTask, FocalPlaneBackground,
27 FocalPlaneBackgroundConfig, MaskObjectsTask)
28from lsst.pipe.drivers.visualizeVisit import makeCameraImage
29
30from .checksum import checksum
31from .utils import getDataRef
32
33
35 """Parameters controlling the measurement of background statistics"""
36 stat = Field(doc="Statistic to use to estimate background (from lsst.afw.math)", dtype=int,
37 default=int(afwMath.MEANCLIP))
38 clip = Field(doc="Clipping threshold for background",
39 dtype=float, default=3.0)
40 nIter = Field(doc="Clipping iterations for background",
41 dtype=int, default=3)
42 maxVisitsToCalcErrorFromInputVariance = Field(
43 doc="Maximum number of visits to estimate variance from input variance, not per-pixel spread",
44 dtype=int, default=2)
45 mask = ListField(doc="Mask planes to reject",
46 dtype=str, default=["DETECTED", "BAD", "NO_DATA"])
47
48
49class CalibStatsTask(Task):
50 """Measure statistics on the background
51
52 This can be useful for scaling the background, e.g., for flats and fringe frames.
53 """
54 ConfigClass = CalibStatsConfig
55
56 def run(self, exposureOrImage):
57 """!Measure a particular statistic on an image (of some sort).
58
59 @param exposureOrImage Exposure, MaskedImage or Image.
60 @return Value of desired statistic
61 """
62 stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
63 afwImage.Mask.getPlaneBitMask(self.config.mask))
64 try:
65 image = exposureOrImage.getMaskedImage()
66 except Exception:
67 try:
68 image = exposureOrImage.getImage()
69 except Exception:
70 image = exposureOrImage
71
72 return afwMath.makeStatistics(image, self.config.stat, stats).getValue()
73
74
76 """Configuration for combining calib images"""
77 rows = Field(doc="Number of rows to read at a time",
78 dtype=int, default=512)
79 mask = ListField(doc="Mask planes to respect", dtype=str,
80 default=["SAT", "DETECTED", "INTRP"])
81 combine = Field(doc="Statistic to use for combination (from lsst.afw.math)", dtype=int,
82 default=int(afwMath.MEANCLIP))
83 clip = Field(doc="Clipping threshold for combination",
84 dtype=float, default=3.0)
85 nIter = Field(doc="Clipping iterations for combination",
86 dtype=int, default=3)
87 stats = ConfigurableField(target=CalibStatsTask,
88 doc="Background statistics configuration")
89
90
91class CalibCombineTask(Task):
92 """Task to combine calib images"""
93 ConfigClass = CalibCombineConfig
94
95 def __init__(self, *args, **kwargs):
96 Task.__init__(self, *args, **kwargs)
97 self.makeSubtask("stats")
98
99 def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD"):
100 """!Combine calib images for a single sensor
101
102 @param sensorRefList List of data references to combine (for a single sensor)
103 @param expScales List of scales to apply for each exposure
104 @param finalScale Desired scale for final combined image
105 @param inputName Data set name for inputs
106 @return combined image
107 """
108 width, height = self.getDimensions(sensorRefList)
109 stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
110 afwImage.Mask.getPlaneBitMask(self.config.mask))
111 numImages = len(sensorRefList)
112 if numImages < 1:
113 raise RuntimeError("No valid input data")
114 if numImages < self.config.stats.maxVisitsToCalcErrorFromInputVariance:
115 stats.setCalcErrorFromInputVariance(True)
116
117 # Combine images
118 combined = afwImage.MaskedImageF(width, height)
119 imageList = [None]*numImages
120 for start in range(0, height, self.config.rows):
121 rows = min(self.config.rows, height - start)
122 box = geom.Box2I(geom.Point2I(0, start),
123 geom.Extent2I(width, rows))
124 subCombined = combined.Factory(combined, box)
125
126 for i, sensorRef in enumerate(sensorRefList):
127 if sensorRef is None:
128 imageList[i] = None
129 continue
130 exposure = sensorRef.get(inputName + "_sub", bbox=box)
131 if expScales is not None:
132 self.applyScale(exposure, expScales[i])
133 imageList[i] = exposure.getMaskedImage()
134
135 self.combine(subCombined, imageList, stats)
136
137 if finalScale is not None:
138 background = self.stats.run(combined)
139 self.log.info("%s: Measured background of stack is %f; adjusting to %f" %
140 (NODE, background, finalScale))
141 combined *= finalScale / background
142
143 return combined
144
145 def getDimensions(self, sensorRefList, inputName="postISRCCD"):
146 """Get dimensions of the inputs"""
147 dimList = []
148 for sensorRef in sensorRefList:
149 if sensorRef is None:
150 continue
151 md = sensorRef.get(inputName + "_md")
152 dimList.append(afwImage.bboxFromMetadata(md).getDimensions())
153 return getSize(dimList)
154
155 def applyScale(self, exposure, scale=None):
156 """Apply scale to input exposure
157
158 This implementation applies a flux scaling: the input exposure is
159 divided by the provided scale.
160 """
161 if scale is not None:
162 mi = exposure.getMaskedImage()
163 mi /= scale
164
165 def combine(self, target, imageList, stats):
166 """!Combine multiple images
167
168 @param target Target image to receive the combined pixels
169 @param imageList List of input images
170 @param stats Statistics control
171 """
172 images = [img for img in imageList if img is not None]
173 afwMath.statisticsStack(target, images, afwMath.Property(self.config.combine), stats)
174
175
176def getSize(dimList):
177 """Determine a consistent size, given a list of image sizes"""
178 dim = set((w, h) for w, h in dimList)
179 dim.discard(None)
180 if len(dim) != 1:
181 raise RuntimeError("Inconsistent dimensions: %s" % dim)
182 return dim.pop()
183
184
185def dictToTuple(dict_, keys):
186 """!Return a tuple of specific values from a dict
187
188 This provides a hashable representation of the dict from certain keywords.
189 This can be useful for creating e.g., a tuple of the values in the DataId
190 that identify the CCD.
191
192 @param dict_ dict to parse
193 @param keys keys to extract (order is important)
194 @return tuple of values
195 """
196 return tuple(dict_[k] for k in keys)
197
198
199def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"]):
200 """!Determine a list of CCDs from exposure references
201
202 This essentially inverts the exposure-level references (which
203 provides a list of CCDs for each exposure), by providing
204 a dataId list for each CCD. Consider an input list of exposures
205 [e1, e2, e3], and each exposure has CCDs c1 and c2. Then this
206 function returns:
207
208 {(c1,): [e1c1, e2c1, e3c1], (c2,): [e1c2, e2c2, e3c2]}
209
210 This is a dict whose keys are tuples of the identifying values of a
211 CCD (usually just the CCD number) and the values are lists of dataIds
212 for that CCD in each exposure. A missing dataId is given the value
213 None.
214
215 @param expRefList List of data references for exposures
216 @param level Level for the butler to generate CCDs
217 @param ccdKeys DataId keywords that identify a CCD
218 @return dict of data identifier lists for each CCD;
219 keys are values of ccdKeys in order
220 """
221 expIdList = [[ccdRef.dataId for ccdRef in expRef.subItems(
222 level)] for expRef in expRefList]
223
224 # Determine what additional keys make a CCD from an exposure
225 if len(ccdKeys) != len(set(ccdKeys)):
226 raise RuntimeError("Duplicate keys found in ccdKeys: %s" % ccdKeys)
227 ccdNames = set() # Set of tuples which are values for each of the CCDs in an exposure
228 for ccdIdList in expIdList:
229 for ccdId in ccdIdList:
230 name = dictToTuple(ccdId, ccdKeys)
231 ccdNames.add(name)
232
233 # Turn the list of CCDs for each exposure into a list of exposures for
234 # each CCD
235 ccdLists = {}
236 for n, ccdIdList in enumerate(expIdList):
237 for ccdId in ccdIdList:
238 name = dictToTuple(ccdId, ccdKeys)
239 if name not in ccdLists:
240 ccdLists[name] = []
241 ccdLists[name].append(ccdId)
242
243 for ccd in ccdLists:
244 # Sort the list by the dataId values (ordered by key)
245 ccdLists[ccd] = sorted(ccdLists[ccd], key=lambda dd: dictToTuple(dd, sorted(dd.keys())))
246
247 return ccdLists
248
249
250def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs):
251 """Generate a matrix of results using pool.map
252
253 The function should have the call signature:
254 func(cache, dataId, *args, **kwargs)
255
256 We return a dict mapping 'ccd name' to a list of values for
257 each exposure.
258
259 @param pool Process pool
260 @param func Function to call for each dataId
261 @param ccdIdLists Dict of data identifier lists for each CCD name
262 @return matrix of results
263 """
264 dataIdList = sum(ccdIdLists.values(), [])
265 resultList = pool.map(func, dataIdList, *args, **kwargs)
266 # Piece everything back together
267 data = dict((ccdName, [None] * len(expList)) for ccdName, expList in ccdIdLists.items())
268 indices = dict(sum([[(tuple(dataId.values()) if dataId is not None else None, (ccdName, expNum))
269 for expNum, dataId in enumerate(expList)]
270 for ccdName, expList in ccdIdLists.items()], []))
271 for dataId, result in zip(dataIdList, resultList):
272 if dataId is None:
273 continue
274 ccdName, expNum = indices[tuple(dataId.values())]
275 data[ccdName][expNum] = result
276 return data
277
278
279class CalibIdAction(argparse.Action):
280 """Split name=value pairs and put the result in a dict"""
281
282 def __call__(self, parser, namespace, values, option_string):
283 output = getattr(namespace, self.dest, {})
284 for nameValue in values:
285 name, sep, valueStr = nameValue.partition("=")
286 if not valueStr:
287 parser.error("%s value %s must be in form name=value" %
288 (option_string, nameValue))
289 output[name] = valueStr
290 setattr(namespace, self.dest, output)
291
292
293class CalibArgumentParser(ArgumentParser):
294 """ArgumentParser for calibration construction"""
295
296 def __init__(self, calibName, *args, **kwargs):
297 """Add a --calibId argument to the standard pipe_base argument parser"""
298 ArgumentParser.__init__(self, *args, **kwargs)
299 self.calibName = calibName
300 self.add_id_argument("--id", datasetType="raw",
301 help="input identifiers, e.g., --id visit=123 ccd=4")
302 self.add_argument("--calibId", nargs="*", action=CalibIdAction, default={},
303 help="identifiers for calib, e.g., --calibId version=1",
304 metavar="KEY=VALUE1[^VALUE2[^VALUE3...]")
305
306 def parse_args(self, *args, **kwargs):
307 """Parse arguments
308
309 Checks that the "--calibId" provided works.
310 """
311 namespace = ArgumentParser.parse_args(self, *args, **kwargs)
312
313 keys = namespace.butler.getKeys(self.calibName)
314 parsed = {}
315 for name, value in namespace.calibId.items():
316 if name not in keys:
317 self.error(
318 "%s is not a relevant calib identifier key (%s)" % (name, keys))
319 parsed[name] = keys[name](value)
320 namespace.calibId = parsed
321
322 return namespace
323
324
326 """Configuration for constructing calibs"""
327 clobber = Field(dtype=bool, default=True,
328 doc="Clobber existing processed images?")
329 isr = ConfigurableField(target=IsrTask, doc="ISR configuration")
330 dateObs = Field(dtype=str, default="dateObs",
331 doc="Key for observation date in exposure registry")
332 dateCalib = Field(dtype=str, default="calibDate",
333 doc="Key for calib date in calib registry")
334 filter = Field(dtype=str, default="filter",
335 doc="Key for filter name in exposure/calib registries")
336 combination = ConfigurableField(
337 target=CalibCombineTask, doc="Calib combination configuration")
338 ccdKeys = ListField(dtype=str, default=["ccd"],
339 doc="DataId keywords specifying a CCD")
340 visitKeys = ListField(dtype=str, default=["visit"],
341 doc="DataId keywords specifying a visit")
342 calibKeys = ListField(dtype=str, default=[],
343 doc="DataId keywords specifying a calibration")
344 doCameraImage = Field(dtype=bool, default=True, doc="Create camera overview image?")
345 binning = Field(dtype=int, default=64, doc="Binning to apply for camera image")
346
347 def setDefaults(self):
348 self.isr.doWrite = False
349
350
351class CalibTaskRunner(TaskRunner):
352 """Get parsed values into the CalibTask.run"""
353 @staticmethod
354 def getTargetList(parsedCmd, **kwargs):
355 return [dict(expRefList=parsedCmd.id.refList, butler=parsedCmd.butler, calibId=parsedCmd.calibId)]
356
357 def __call__(self, args):
358 """Call the Task with the kwargs from getTargetList"""
359 task = self.TaskClass(config=self.config, log=self.log)
360 exitStatus = 0 # exit status for the shell
361 if self.doRaise:
362 result = task.runDataRef(**args)
363 else:
364 try:
365 result = task.runDataRef(**args)
366 except Exception as e:
367 # n.b. The shell exit value is the number of dataRefs returning
368 # non-zero, so the actual value used here is lost
369 exitStatus = 1
370
371 task.log.fatal("Failed: %s" % e)
372 traceback.print_exc(file=sys.stderr)
373
374 if self.doReturnResults:
375 return Struct(
376 exitStatus=exitStatus,
377 args=args,
378 metadata=task.metadata,
379 result=result,
380 )
381 else:
382 return Struct(
383 exitStatus=exitStatus,
384 )
385
386
388 """!Base class for constructing calibs.
389
390 This should be subclassed for each of the required calib types.
391 The subclass should be sure to define the following class variables:
392 * _DefaultName: default name of the task, used by CmdLineTask
393 * calibName: name of the calibration data set in the butler
394 The subclass may optionally set:
395 * filterName: filter name to give the resultant calib
396 """
397 ConfigClass = CalibConfig
398 RunnerClass = CalibTaskRunner
399 filterName = None
400 calibName = None
401 exposureTime = 1.0 # sets this exposureTime in the output
402
403 def __init__(self, *args, **kwargs):
404 """Constructor"""
405 BatchPoolTask.__init__(self, *args, **kwargs)
406 self.makeSubtask("isr")
407 self.makeSubtask("combination")
408
409 @classmethod
410 def batchWallTime(cls, time, parsedCmd, numCores):
411 numCcds = len(parsedCmd.butler.get("camera"))
412 numExps = len(cls.RunnerClass.getTargetList(
413 parsedCmd)[0]['expRefList'])
414 numCycles = int(numCcds/float(numCores) + 0.5)
415 return time*numExps*numCycles
416
417 @classmethod
418 def _makeArgumentParser(cls, *args, **kwargs):
419 kwargs.pop("doBatch", False)
420 return CalibArgumentParser(calibName=cls.calibName, name=cls._DefaultName, *args, **kwargs)
421
422 def runDataRef(self, expRefList, butler, calibId):
423 """!Construct a calib from a list of exposure references
424
425 This is the entry point, called by the TaskRunner.__call__
426
427 Only the master node executes this method.
428
429 @param expRefList List of data references at the exposure level
430 @param butler Data butler
431 @param calibId Identifier dict for calib
432 """
433 if len(expRefList) < 1:
434 raise RuntimeError("No valid input data")
435
436 for expRef in expRefList:
437 self.addMissingKeys(expRef.dataId, butler, self.config.ccdKeys, 'raw')
438
439 outputId = self.getOutputId(expRefList, calibId)
440 ccdIdLists = getCcdIdListFromExposures(
441 expRefList, level="sensor", ccdKeys=self.config.ccdKeys)
442 self.checkCcdIdLists(ccdIdLists)
443
444 # Ensure we can generate filenames for each output
445 outputIdItemList = list(outputId.items())
446 for ccdName in ccdIdLists:
447 dataId = dict([(k, ccdName[i]) for i, k in enumerate(self.config.ccdKeys)])
448 dataId.update(outputIdItemList)
449 self.addMissingKeys(dataId, butler)
450 dataId.update(outputIdItemList)
451
452 try:
453 butler.get(self.calibName + "_filename", dataId)
454 except Exception as e:
455 raise RuntimeError(
456 "Unable to determine output filename \"%s_filename\" from %s: %s" %
457 (self.calibName, dataId, e))
458
459 processPool = Pool("process")
460 processPool.storeSet(butler=butler)
461
462 # Scatter: process CCDs independently
463 data = self.scatterProcess(processPool, ccdIdLists)
464
465 # Gather: determine scalings
466 scales = self.scale(ccdIdLists, data)
467
468 combinePool = Pool("combine")
469 combinePool.storeSet(butler=butler)
470
471 # Scatter: combine
472 calibs = self.scatterCombine(combinePool, outputId, ccdIdLists, scales)
473
474 if self.config.doCameraImage:
475 camera = butler.get("camera")
476 # Convert indexing of calibs from "ccdName" to detector ID (as used by makeImageFromCamera)
477 calibs = {butler.get("postISRCCD_detector",
478 dict(zip(self.config.ccdKeys, ccdName))).getId(): calibs[ccdName]
479 for ccdName in ccdIdLists}
480
481 try:
482 cameraImage = self.makeCameraImage(camera, outputId, calibs)
483 butler.put(cameraImage, self.calibName + "_camera", dataId)
484 except Exception as exc:
485 self.log.warn("Unable to create camera image: %s" % (exc,))
486
487 return Struct(
488 outputId=outputId,
489 ccdIdLists=ccdIdLists,
490 scales=scales,
491 calibs=calibs,
492 processPool=processPool,
493 combinePool=combinePool,
494 )
495
496 def getOutputId(self, expRefList, calibId):
497 """!Generate the data identifier for the output calib
498
499 The mean date and the common filter are included, using keywords
500 from the configuration. The CCD-specific part is not included
501 in the data identifier.
502
503 @param expRefList List of data references at exposure level
504 @param calibId Data identifier elements for the calib provided by the user
505 @return data identifier
506 """
507 midTime = 0
508 filterName = None
509 for expRef in expRefList:
510 butler = expRef.getButler()
511 dataId = expRef.dataId
512
513 midTime += self.getMjd(butler, dataId)
514 thisFilter = self.getFilter(
515 butler, dataId) if self.filterName is None else self.filterName
516 if filterName is None:
517 filterName = thisFilter
518 elif filterName != thisFilter:
519 raise RuntimeError("Filter mismatch for %s: %s vs %s" % (
520 dataId, thisFilter, filterName))
521
522 midTime /= len(expRefList)
523 date = str(dafBase.DateTime(
524 midTime, dafBase.DateTime.MJD).toPython().date())
525
526 outputId = {self.config.filter: filterName,
527 self.config.dateCalib: date}
528 outputId.update(calibId)
529 return outputId
530
531 def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
532 """Determine the Modified Julian Date (MJD; in TAI) from a data identifier"""
533 if self.config.dateObs in dataId:
534 dateObs = dataId[self.config.dateObs]
535 else:
536 dateObs = butler.queryMetadata('raw', [self.config.dateObs], dataId)[0]
537 if "T" not in dateObs:
538 dateObs = dateObs + "T12:00:00.0Z"
539 elif not dateObs.endswith("Z"):
540 dateObs += "Z"
541
542 return dafBase.DateTime(dateObs, timescale).get(dafBase.DateTime.MJD)
543
544 def getFilter(self, butler, dataId):
545 """Determine the filter from a data identifier"""
546 filt = butler.queryMetadata('raw', [self.config.filter], dataId)[0]
547 return filt
548
549 def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None):
550 if calibName is None:
551 calibName = self.calibName
552
553 if missingKeys is None:
554 missingKeys = set(butler.getKeys(calibName).keys()) - set(dataId.keys())
555
556 for k in missingKeys:
557 try:
558 v = butler.queryMetadata('raw', [k], dataId) # n.b. --id refers to 'raw'
559 except Exception:
560 continue
561
562 if len(v) == 0: # failed to lookup value
563 continue
564
565 if len(v) == 1:
566 dataId[k] = v[0]
567 else:
568 raise RuntimeError("No unique lookup for %s: %s" % (k, v))
569
570 def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs):
571 """!Update the metadata from the VisitInfo
572
573 @param calibImage The image whose metadata is to be set
574 @param exposureTime The exposure time for the image
575 @param darkTime The time since the last read (default: exposureTime)
576 """
577
578 if darkTime is None:
579 darkTime = exposureTime # avoid warning messages when using calibration products
580
581 visitInfo = afwImage.VisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
582 md = calibImage.getMetadata()
583
584 afwImage.setVisitInfoMetadata(md, visitInfo)
585
586 def scatterProcess(self, pool, ccdIdLists):
587 """!Scatter the processing among the nodes
588
589 We scatter each CCD independently (exposures aren't grouped together),
590 to make full use of all available processors. This necessitates piecing
591 everything back together in the same format as ccdIdLists afterwards.
592
593 Only the master node executes this method.
594
595 @param pool Process pool
596 @param ccdIdLists Dict of data identifier lists for each CCD name
597 @return Dict of lists of returned data for each CCD name
598 """
599 self.log.info("Scatter processing")
600 return mapToMatrix(pool, self.process, ccdIdLists)
601
602 def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
603 """!Process a CCD, specified by a data identifier
604
605 After processing, optionally returns a result (produced by
606 the 'processResult' method) calculated from the processed
607 exposure. These results will be gathered by the master node,
608 and is a means for coordinated scaling of all CCDs for flats,
609 etc.
610
611 Only slave nodes execute this method.
612
613 @param cache Process pool cache
614 @param ccdId Data identifier for CCD
615 @param outputName Output dataset name for butler
616 @return result from 'processResult'
617 """
618 if ccdId is None:
619 self.log.warn("Null identifier received on %s" % NODE)
620 return None
621 sensorRef = getDataRef(cache.butler, ccdId)
622 if self.config.clobber or not sensorRef.datasetExists(outputName):
623 self.log.info("Processing %s on %s" % (ccdId, NODE))
624 try:
625 exposure = self.processSingle(sensorRef, **kwargs)
626 except Exception as e:
627 self.log.warn("Unable to process %s: %s" % (ccdId, e))
628 raise
629 return None
630 self.processWrite(sensorRef, exposure)
631 else:
632 self.log.info(
633 "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
634 exposure = sensorRef.get(outputName)
635 return self.processResult(exposure)
636
637 def processSingle(self, dataRef):
638 """Process a single CCD, specified by a data reference
639
640 Generally, this simply means doing ISR.
641
642 Only slave nodes execute this method.
643 """
644 return self.isr.runDataRef(dataRef).exposure
645
646 def processWrite(self, dataRef, exposure, outputName="postISRCCD"):
647 """!Write the processed CCD
648
649 We need to write these out because we can't hold them all in
650 memory at once.
651
652 Only slave nodes execute this method.
653
654 @param dataRef Data reference
655 @param exposure CCD exposure to write
656 @param outputName Output dataset name for butler.
657 """
658 dataRef.put(exposure, outputName)
659
660 def processResult(self, exposure):
661 """Extract processing results from a processed exposure
662
663 This method generates what is gathered by the master node.
664 This can be a background measurement or similar for scaling
665 flat-fields. It must be picklable!
666
667 Only slave nodes execute this method.
668 """
669 return None
670
671 def scale(self, ccdIdLists, data):
672 """!Determine scaling across CCDs and exposures
673
674 This is necessary mainly for flats, so as to determine a
675 consistent scaling across the entire focal plane. This
676 implementation is simply a placeholder.
677
678 Only the master node executes this method.
679
680 @param ccdIdLists Dict of data identifier lists for each CCD tuple
681 @param data Dict of lists of returned data for each CCD tuple
682 @return dict of Struct(ccdScale: scaling for CCD,
683 expScales: scaling for each exposure
684 ) for each CCD tuple
685 """
686 self.log.info("Scale on %s" % NODE)
687 return dict((name, Struct(ccdScale=None, expScales=[None] * len(ccdIdLists[name])))
688 for name in ccdIdLists)
689
690 def scatterCombine(self, pool, outputId, ccdIdLists, scales):
691 """!Scatter the combination of exposures across multiple nodes
692
693 In this case, we can only scatter across as many nodes as
694 there are CCDs.
695
696 Only the master node executes this method.
697
698 @param pool Process pool
699 @param outputId Output identifier (exposure part only)
700 @param ccdIdLists Dict of data identifier lists for each CCD name
701 @param scales Dict of structs with scales, for each CCD name
702 @param dict of binned images
703 """
704 self.log.info("Scatter combination")
705 data = [Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName]) for
706 ccdName in ccdIdLists]
707 images = pool.map(self.combine, data, outputId)
708 return dict(zip(ccdIdLists.keys(), images))
709
710 def getFullyQualifiedOutputId(self, ccdName, butler, outputId):
711 """Get fully-qualified output data identifier
712
713 We may need to look up keys that aren't in the output dataId.
714
715 @param ccdName Name tuple for CCD
716 @param butler Data butler
717 @param outputId Data identifier for combined image (exposure part only)
718 @return fully-qualified output dataId
719 """
720 fullOutputId = {k: ccdName[i] for i, k in enumerate(self.config.ccdKeys)}
721 fullOutputId.update(outputId)
722 self.addMissingKeys(fullOutputId, butler)
723 fullOutputId.update(outputId) # must be after the call to queryMetadata in 'addMissingKeys'
724 return fullOutputId
725
726 def combine(self, cache, struct, outputId):
727 """!Combine multiple exposures of a particular CCD and write the output
728
729 Only the slave nodes execute this method.
730
731 @param cache Process pool cache
732 @param struct Parameters for the combination, which has the following components:
733 * ccdName Name tuple for CCD
734 * ccdIdList List of data identifiers for combination
735 * scales Scales to apply (expScales are scalings for each exposure,
736 ccdScale is final scale for combined image)
737 @param outputId Data identifier for combined image (exposure part only)
738 @return binned calib image
739 """
740 outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
741 dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
742 dataId in struct.ccdIdList]
743 self.log.info("Combining %s on %s" % (outputId, NODE))
744 calib = self.combination.run(dataRefList, expScales=struct.scales.expScales,
745 finalScale=struct.scales.ccdScale)
746
747 if not hasattr(calib, "getMetadata"):
748 if hasattr(calib, "getVariance"):
749 calib = afwImage.makeExposure(calib)
750 else:
751 calib = afwImage.DecoratedImageF(calib.getImage()) # n.b. hardwires "F" for the output type
752
753 self.calculateOutputHeaderFromRaws(cache.butler, calib, struct.ccdIdList, outputId)
754
755 self.updateMetadata(calib, self.exposureTime)
756
757 self.recordCalibInputs(cache.butler, calib,
758 struct.ccdIdList, outputId)
759
760 self.interpolateNans(calib)
761
762 self.write(cache.butler, calib, outputId)
763
764 return afwMath.binImage(calib.getImage(), self.config.binning)
765
766 def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId):
767 """!Calculate the output header from the raw headers.
768
769 This metadata will go into the output FITS header. It will include all
770 headers that are identical in all inputs.
771
772 @param butler Data butler
773 @param calib Combined calib exposure.
774 @param dataIdList List of data identifiers for calibration inputs
775 @param outputId Data identifier for output
776 """
777 header = calib.getMetadata()
778
779 rawmd = [butler.get("raw_md", dataId) for dataId in dataIdList if
780 dataId is not None]
781
782 merged = merge_headers(rawmd, mode="drop")
783
784 # Place merged set into the PropertyList if a value is not
785 # present already
786 # Comments are not present in the merged version so copy them across
787 for k, v in merged.items():
788 if k not in header:
789 comment = rawmd[0].getComment(k) if k in rawmd[0] else None
790 header.set(k, v, comment=comment)
791
792 # Create an observation group so we can add some standard headers
793 # independent of the form in the input files.
794 # Use try block in case we are dealing with unexpected data headers
795 try:
796 group = ObservationGroup(rawmd, pedantic=False)
797 except Exception:
798 group = None
799
800 comments = {"TIMESYS": "Time scale for all dates",
801 "DATE-OBS": "Start date of earliest input observation",
802 "MJD-OBS": "[d] Start MJD of earliest input observation",
803 "DATE-END": "End date of oldest input observation",
804 "MJD-END": "[d] End MJD of oldest input observation",
805 "MJD-AVG": "[d] MJD midpoint of all input observations",
806 "DATE-AVG": "Midpoint date of all input observations"}
807
808 if group is not None:
809 oldest, newest = group.extremes()
810 dateCards = dates_to_fits(oldest.datetime_begin, newest.datetime_end)
811 else:
812 # Fall back to setting a DATE-OBS from the calibDate
813 dateCards = {"DATE-OBS": "{}T00:00:00.00".format(outputId[self.config.dateCalib])}
814 comments["DATE-OBS"] = "Date of start of day of calibration midpoint"
815
816 for k, v in dateCards.items():
817 header.set(k, v, comment=comments.get(k, None))
818
819 def recordCalibInputs(self, butler, calib, dataIdList, outputId):
820 """!Record metadata including the inputs and creation details
821
822 This metadata will go into the FITS header.
823
824 @param butler Data butler
825 @param calib Combined calib exposure.
826 @param dataIdList List of data identifiers for calibration inputs
827 @param outputId Data identifier for output
828 """
829 header = calib.getMetadata()
830 header.set("OBSTYPE", self.calibName) # Used by ingestCalibs.py
831
832 # date, time, host, and root
833 now = time.localtime()
834 header.set("CALIB_CREATION_DATE", time.strftime("%Y-%m-%d", now))
835 header.set("CALIB_CREATION_TIME", time.strftime("%X %Z", now))
836
837 # Inputs
838 visits = [str(dictToTuple(dataId, self.config.visitKeys)) for dataId in dataIdList if
839 dataId is not None]
840 for i, v in enumerate(sorted(set(visits))):
841 header.set("CALIB_INPUT_%d" % (i,), v)
842
843 header.set("CALIB_ID", " ".join("%s=%s" % (key, value)
844 for key, value in outputId.items()))
845 checksum(calib, header)
846
847 def interpolateNans(self, image):
848 """Interpolate over NANs in the combined image
849
850 NANs can result from masked areas on the CCD. We don't want them getting
851 into our science images, so we replace them with the median of the image.
852 """
853 if hasattr(image, "getMaskedImage"): # Deal with Exposure vs Image
854 self.interpolateNans(image.getMaskedImage().getVariance())
855 image = image.getMaskedImage().getImage()
856 if hasattr(image, "getImage"): # Deal with DecoratedImage or MaskedImage vs Image
857 image = image.getImage()
858 array = image.getArray()
859 bad = np.isnan(array)
860 array[bad] = np.median(array[np.logical_not(bad)])
861
862 def write(self, butler, exposure, dataId):
863 """!Write the final combined calib
864
865 Only the slave nodes execute this method
866
867 @param butler Data butler
868 @param exposure CCD exposure to write
869 @param dataId Data identifier for output
870 """
871 self.log.info("Writing %s on %s" % (dataId, NODE))
872 butler.put(exposure, self.calibName, dataId)
873
874 def makeCameraImage(self, camera, dataId, calibs):
875 """!Create and write an image of the entire camera
876
877 This is useful for judging the quality or getting an overview of
878 the features of the calib.
879
880 @param camera Camera object
881 @param dataId Data identifier for output
882 @param calibs Dict mapping CCD detector ID to calib image
883 """
884 return makeCameraImage(camera, calibs, self.config.binning)
885
886 def checkCcdIdLists(self, ccdIdLists):
887 """Check that the list of CCD dataIds is consistent
888
889 @param ccdIdLists Dict of data identifier lists for each CCD name
890 @return Number of exposures, number of CCDs
891 """
892 visitIdLists = collections.defaultdict(list)
893 for ccdName in ccdIdLists:
894 for dataId in ccdIdLists[ccdName]:
895 visitName = dictToTuple(dataId, self.config.visitKeys)
896 visitIdLists[visitName].append(dataId)
897
898 numExps = set(len(expList) for expList in ccdIdLists.values())
899 numCcds = set(len(ccdList) for ccdList in visitIdLists.values())
900
901 if len(numExps) != 1 or len(numCcds) != 1:
902 # Presumably a visit somewhere doesn't have the full complement available.
903 # Dump the information so the user can figure it out.
904 self.log.warn("Number of visits for each CCD: %s",
905 {ccdName: len(ccdIdLists[ccdName]) for ccdName in ccdIdLists})
906 self.log.warn("Number of CCDs for each visit: %s",
907 {vv: len(visitIdLists[vv]) for vv in visitIdLists})
908 raise RuntimeError("Inconsistent number of exposures/CCDs")
909
910 return numExps.pop(), numCcds.pop()
911
912
914 """Configuration for bias construction.
915
916 No changes required compared to the base class, but
917 subclassed for distinction.
918 """
919 pass
920
921
923 """Bias construction"""
924 ConfigClass = BiasConfig
925 _DefaultName = "bias"
926 calibName = "bias"
927 filterName = "NONE" # Sets this filter name in the output
928 exposureTime = 0.0 # sets this exposureTime in the output
929
930 @classmethod
931 def applyOverrides(cls, config):
932 """Overrides to apply for bias construction"""
933 config.isr.doBias = False
934 config.isr.doDark = False
935 config.isr.doFlat = False
936 config.isr.doFringe = False
937
938
940 """Configuration for dark construction"""
941 doRepair = Field(dtype=bool, default=True, doc="Repair artifacts?")
942 psfFwhm = Field(dtype=float, default=3.0, doc="Repair PSF FWHM (pixels)")
943 psfSize = Field(dtype=int, default=21, doc="Repair PSF size (pixels)")
944 crGrow = Field(dtype=int, default=2, doc="Grow radius for CR (pixels)")
946 target=RepairTask, doc="Task to repair artifacts")
947
948 def setDefaults(self):
949 CalibConfig.setDefaults(self)
950 self.combination.mask.append("CR")
951
952
954 """Dark construction
955
956 The only major difference from the base class is a cosmic-ray
957 identification stage, and dividing each image by the dark time
958 to generate images of the dark rate.
959 """
960 ConfigClass = DarkConfig
961 _DefaultName = "dark"
962 calibName = "dark"
963 filterName = "NONE" # Sets this filter name in the output
964
965 def __init__(self, *args, **kwargs):
966 CalibTask.__init__(self, *args, **kwargs)
967 self.makeSubtask("repair")
968
969 @classmethod
970 def applyOverrides(cls, config):
971 """Overrides to apply for dark construction"""
972 config.isr.doDark = False
973 config.isr.doFlat = False
974 config.isr.doFringe = False
975
976 def processSingle(self, sensorRef):
977 """Process a single CCD
978
979 Besides the regular ISR, also masks cosmic-rays and divides each
980 processed image by the dark time to generate images of the dark rate.
981 The dark time is provided by the 'getDarkTime' method.
982 """
983 exposure = CalibTask.processSingle(self, sensorRef)
984
985 if self.config.doRepair:
986 psf = measAlg.DoubleGaussianPsf(self.config.psfSize, self.config.psfSize,
987 self.config.psfFwhm/(2*math.sqrt(2*math.log(2))))
988 exposure.setPsf(psf)
989 self.repair.run(exposure, keepCRs=False)
990 if self.config.crGrow > 0:
991 mask = exposure.getMaskedImage().getMask().clone()
992 mask &= mask.getPlaneBitMask("CR")
993 fpSet = afwDet.FootprintSet(
994 mask, afwDet.Threshold(0.5))
995 fpSet = afwDet.FootprintSet(fpSet, self.config.crGrow, True)
996 fpSet.setMask(exposure.getMaskedImage().getMask(), "CR")
997
998 mi = exposure.getMaskedImage()
999 mi /= self.getDarkTime(exposure)
1000 return exposure
1001
1002 def getDarkTime(self, exposure):
1003 """Retrieve the dark time for an exposure"""
1004 darkTime = exposure.getInfo().getVisitInfo().getDarkTime()
1005 if not np.isfinite(darkTime):
1006 raise RuntimeError("Non-finite darkTime")
1007 return darkTime
1008
1009
1011 """Configuration for flat construction"""
1012 iterations = Field(dtype=int, default=10,
1013 doc="Number of iterations for scale determination")
1014 stats = ConfigurableField(target=CalibStatsTask,
1015 doc="Background statistics configuration")
1016
1017
1019 """Flat construction
1020
1021 The principal change from the base class involves gathering the background
1022 values from each image and using them to determine the scalings for the final
1023 combination.
1024 """
1025 ConfigClass = FlatConfig
1026 _DefaultName = "flat"
1027 calibName = "flat"
1028
1029 @classmethod
1030 def applyOverrides(cls, config):
1031 """Overrides for flat construction"""
1032 config.isr.doFlat = False
1033 config.isr.doFringe = False
1034
1035 def __init__(self, *args, **kwargs):
1036 CalibTask.__init__(self, *args, **kwargs)
1037 self.makeSubtask("stats")
1038
1039 def processResult(self, exposure):
1040 return self.stats.run(exposure)
1041
1042 def scale(self, ccdIdLists, data):
1043 """Determine the scalings for the final combination
1044
1045 We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
1046 of one CCD to all the others in an exposure, and E_j is the scaling
1047 of the exposure. We convert everything to logarithms so we can work
1048 with a linear system. We determine the C_i and E_j from B_ij by iteration,
1049 under the additional constraint that the average CCD scale is unity.
1050
1051 This algorithm comes from Eugene Magnier and Pan-STARRS.
1052 """
1053 assert len(ccdIdLists.values()) > 0, "No successful CCDs"
1054 lengths = set([len(expList) for expList in ccdIdLists.values()])
1055 assert len(lengths) == 1, "Number of successful exposures for each CCD differs"
1056 assert tuple(lengths)[0] > 0, "No successful exposures"
1057 # Format background measurements into a matrix
1058 indices = dict((name, i) for i, name in enumerate(ccdIdLists))
1059 bgMatrix = np.array([[0.0] * len(expList) for expList in ccdIdLists.values()])
1060 for name in ccdIdLists:
1061 i = indices[name]
1062 bgMatrix[i] = [d if d is not None else np.nan for d in data[name]]
1063
1064 numpyPrint = np.get_printoptions()
1065 np.set_printoptions(threshold=np.inf)
1066 self.log.info("Input backgrounds: %s" % bgMatrix)
1067
1068 # Flat-field scaling
1069 numCcds = len(ccdIdLists)
1070 numExps = bgMatrix.shape[1]
1071 # log(Background) for each exposure/component
1072 bgMatrix = np.log(bgMatrix)
1073 bgMatrix = np.ma.masked_array(bgMatrix, ~np.isfinite(bgMatrix))
1074 # Initial guess at log(scale) for each component
1075 compScales = np.zeros(numCcds)
1076 expScales = np.array([(bgMatrix[:, i0] - compScales).mean() for i0 in range(numExps)])
1077
1078 for iterate in range(self.config.iterations):
1079 compScales = np.array([(bgMatrix[i1, :] - expScales).mean() for i1 in range(numCcds)])
1080 bad = np.isnan(compScales)
1081 if np.any(bad):
1082 # Bad CCDs: just set them to the mean scale
1083 compScales[bad] = compScales[~bad].mean()
1084 expScales = np.array([(bgMatrix[:, i2] - compScales).mean() for i2 in range(numExps)])
1085
1086 avgScale = np.average(np.exp(compScales))
1087 compScales -= np.log(avgScale)
1088 self.log.debug("Iteration %d exposure scales: %s", iterate, np.exp(expScales))
1089 self.log.debug("Iteration %d component scales: %s", iterate, np.exp(compScales))
1090
1091 expScales = np.array([(bgMatrix[:, i3] - compScales).mean() for i3 in range(numExps)])
1092
1093 if np.any(np.isnan(expScales)):
1094 raise RuntimeError("Bad exposure scales: %s --> %s" % (bgMatrix, expScales))
1095
1096 expScales = np.exp(expScales)
1097 compScales = np.exp(compScales)
1098
1099 self.log.info("Exposure scales: %s" % expScales)
1100 self.log.info("Component relative scaling: %s" % compScales)
1101 np.set_printoptions(**numpyPrint)
1102
1103 return dict((ccdName, Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1104 for ccdName in ccdIdLists)
1105
1106
1108 """Configuration for fringe construction"""
1109 stats = ConfigurableField(target=CalibStatsTask,
1110 doc="Background statistics configuration")
1111 subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1112 doc="Background configuration")
1114 target=measAlg.SourceDetectionTask, doc="Detection configuration")
1115 detectSigma = Field(dtype=float, default=1.0,
1116 doc="Detection PSF gaussian sigma")
1117
1118 def setDefaults(self):
1119 CalibConfig.setDefaults(self)
1120 self.detection.reEstimateBackground = False
1121
1122
1124 """Fringe construction task
1125
1126 The principal change from the base class is that the images are
1127 background-subtracted and rescaled by the background.
1128
1129 XXX This is probably not right for a straight-up combination, as we
1130 are currently doing, since the fringe amplitudes need not scale with
1131 the continuum.
1132
1133 XXX Would like to have this do PCA and generate multiple images, but
1134 that will take a bit of work with the persistence code.
1135 """
1136 ConfigClass = FringeConfig
1137 _DefaultName = "fringe"
1138 calibName = "fringe"
1139
1140 @classmethod
1141 def applyOverrides(cls, config):
1142 """Overrides for fringe construction"""
1143 config.isr.doFringe = False
1144
1145 def __init__(self, *args, **kwargs):
1146 CalibTask.__init__(self, *args, **kwargs)
1147 self.makeSubtask("detection")
1148 self.makeSubtask("stats")
1149 self.makeSubtask("subtractBackground")
1150
1151 def processSingle(self, sensorRef):
1152 """Subtract the background and normalise by the background level"""
1153 exposure = CalibTask.processSingle(self, sensorRef)
1154 bgLevel = self.stats.run(exposure)
1155 self.subtractBackground.run(exposure)
1156 mi = exposure.getMaskedImage()
1157 mi /= bgLevel
1158 footprintSets = self.detection.detectFootprints(
1159 exposure, sigma=self.config.detectSigma)
1160 mask = exposure.getMaskedImage().getMask()
1161 detected = 1 << mask.addMaskPlane("DETECTED")
1162 for fpSet in (footprintSets.positive, footprintSets.negative):
1163 if fpSet is not None:
1164 afwDet.setMaskFromFootprintList(
1165 mask, fpSet.getFootprints(), detected)
1166 return exposure
1167
1168
1170 """Configuration for sky frame construction"""
1171 detectSigma = Field(dtype=float, default=2.0, doc="Detection PSF gaussian sigma")
1172 maskObjects = ConfigurableField(target=MaskObjectsTask,
1173 doc="Configuration for masking objects aggressively")
1174 largeScaleBackground = ConfigField(dtype=FocalPlaneBackgroundConfig,
1175 doc="Large-scale background configuration")
1176 sky = ConfigurableField(target=SkyMeasurementTask, doc="Sky measurement")
1177 maskThresh = Field(dtype=float, default=3.0, doc="k-sigma threshold for masking pixels")
1178 mask = ListField(dtype=str, default=["BAD", "SAT", "DETECTED", "NO_DATA"],
1179 doc="Mask planes to consider as contaminated")
1180
1181
1183 """Task for sky frame construction
1184
1185 The sky frame is a (relatively) small-scale background
1186 model, the response of the camera to the sky.
1187
1188 To construct, we first remove a large-scale background (e.g., caused
1189 by moonlight) which may vary from image to image. Then we construct a
1190 model of the sky, which is essentially a binned version of the image
1191 (important configuration parameters: sky.background.[xy]BinSize).
1192 It is these models which are coadded to yield the sky frame.
1193 """
1194 ConfigClass = SkyConfig
1195 _DefaultName = "sky"
1196 calibName = "sky"
1197
1198 def __init__(self, *args, **kwargs):
1199 CalibTask.__init__(self, *args, **kwargs)
1200 self.makeSubtask("maskObjects")
1201 self.makeSubtask("sky")
1202
1203 def scatterProcess(self, pool, ccdIdLists):
1204 """!Scatter the processing among the nodes
1205
1206 Only the master node executes this method, assigning work to the
1207 slaves.
1208
1209 We measure and subtract off a large-scale background model across
1210 all CCDs, which requires a scatter/gather. Then we process the
1211 individual CCDs, subtracting the large-scale background model and
1212 the residual background model measured. These residuals will be
1213 combined for the sky frame.
1214
1215 @param pool Process pool
1216 @param ccdIdLists Dict of data identifier lists for each CCD name
1217 @return Dict of lists of returned data for each CCD name
1218 """
1219 self.log.info("Scatter processing")
1220
1221 numExps = set(len(expList) for expList in ccdIdLists.values()).pop()
1222
1223 # First subtract off general gradients to make all the exposures look similar.
1224 # We want to preserve the common small-scale structure, which we will coadd.
1225 bgModelList = mapToMatrix(pool, self.measureBackground, ccdIdLists)
1226
1227 backgrounds = {}
1228 scales = {}
1229 for exp in range(numExps):
1230 bgModels = [bgModelList[ccdName][exp] for ccdName in ccdIdLists]
1231 visit = set(tuple(ccdIdLists[ccdName][exp][key] for key in sorted(self.config.visitKeys)) for
1232 ccdName in ccdIdLists)
1233 assert len(visit) == 1
1234 visit = visit.pop()
1235 bgModel = bgModels[0]
1236 for bg in bgModels[1:]:
1237 bgModel.merge(bg)
1238 self.log.info("Background model min/max for visit %s: %f %f", visit,
1239 np.min(bgModel.getStatsImage().getArray()),
1240 np.max(bgModel.getStatsImage().getArray()))
1241 backgrounds[visit] = bgModel
1242 scales[visit] = np.median(bgModel.getStatsImage().getArray())
1243
1244 return mapToMatrix(pool, self.process, ccdIdLists, backgrounds=backgrounds, scales=scales)
1245
1246 def measureBackground(self, cache, dataId):
1247 """!Measure background model for CCD
1248
1249 This method is executed by the slaves.
1250
1251 The background models for all CCDs in an exposure will be
1252 combined to form a full focal-plane background model.
1253
1254 @param cache Process pool cache
1255 @param dataId Data identifier
1256 @return Bcakground model
1257 """
1258 dataRef = getDataRef(cache.butler, dataId)
1259 exposure = self.processSingleBackground(dataRef)
1260
1261 # NAOJ prototype smoothed and then combined the entire image, but it shouldn't be any different
1262 # to bin and combine the binned images except that there's fewer pixels to worry about.
1263 config = self.config.largeScaleBackground
1264 camera = dataRef.get("camera")
1265 bgModel = FocalPlaneBackground.fromCamera(config, camera)
1266 bgModel.addCcd(exposure)
1267 return bgModel
1268
1269 def processSingleBackground(self, dataRef):
1270 """!Process a single CCD for the background
1271
1272 This method is executed by the slaves.
1273
1274 Because we're interested in the background, we detect and mask astrophysical
1275 sources, and pixels above the noise level.
1276
1277 @param dataRef Data reference for CCD.
1278 @return processed exposure
1279 """
1280 if not self.config.clobber and dataRef.datasetExists("postISRCCD"):
1281 return dataRef.get("postISRCCD")
1282 exposure = CalibTask.processSingle(self, dataRef)
1283
1284 self.maskObjects.run(exposure, self.config.mask)
1285 dataRef.put(exposure, "postISRCCD")
1286 return exposure
1287
1288 def processSingle(self, dataRef, backgrounds, scales):
1289 """Process a single CCD, specified by a data reference
1290
1291 We subtract the appropriate focal plane background model,
1292 divide by the appropriate scale and measure the background.
1293
1294 Only slave nodes execute this method.
1295
1296 @param dataRef Data reference for single CCD
1297 @param backgrounds Background model for each visit
1298 @param scales Scales for each visit
1299 @return Processed exposure
1300 """
1301 visit = tuple(dataRef.dataId[key] for key in sorted(self.config.visitKeys))
1302 exposure = dataRef.get("postISRCCD", immediate=True)
1303 image = exposure.getMaskedImage()
1304 detector = exposure.getDetector()
1305 bbox = image.getBBox()
1306
1307 bgModel = backgrounds[visit]
1308 bg = bgModel.toCcdBackground(detector, bbox)
1309 image -= bg.getImage()
1310 image /= scales[visit]
1311
1312 bg = self.sky.measureBackground(exposure.getMaskedImage())
1313 dataRef.put(bg, "icExpBackground")
1314 return exposure
1315
1316 def combine(self, cache, struct, outputId):
1317 """!Combine multiple background models of a particular CCD and write the output
1318
1319 Only the slave nodes execute this method.
1320
1321 @param cache Process pool cache
1322 @param struct Parameters for the combination, which has the following components:
1323 * ccdName Name tuple for CCD
1324 * ccdIdList List of data identifiers for combination
1325 @param outputId Data identifier for combined image (exposure part only)
1326 @return binned calib image
1327 """
1328 outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
1329 dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
1330 dataId in struct.ccdIdList]
1331 self.log.info("Combining %s on %s" % (outputId, NODE))
1332 bgList = [dataRef.get("icExpBackground", immediate=True).clone() for dataRef in dataRefList]
1333
1334 bgExp = self.sky.averageBackgrounds(bgList)
1335
1336 self.recordCalibInputs(cache.butler, bgExp, struct.ccdIdList, outputId)
1337 cache.butler.put(bgExp, "sky", outputId)
1338 return afwMath.binImage(self.sky.exposureToBackground(bgExp).getImage(), self.config.binning)
int min
table::Key< int > a
A set of Footprints, associated with a MaskedImage.
Definition: FootprintSet.h:55
A Threshold is used to pass a threshold value to detection algorithms.
Definition: Threshold.h:43
Information about a single exposure of an imaging camera.
Definition: VisitInfo.h:68
Pass parameters to a Statistics object.
Definition: Statistics.h:83
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64
An integer coordinate rectangle.
Definition: Box.h:55
def __init__(self, calibName, *args, **kwargs)
def getDimensions(self, sensorRefList, inputName="postISRCCD")
def combine(self, target, imageList, stats)
Combine multiple images.
def __call__(self, parser, namespace, values, option_string)
Base class for constructing calibs.
def scatterCombine(self, pool, outputId, ccdIdLists, scales)
Scatter the combination of exposures across multiple nodes.
def getFullyQualifiedOutputId(self, ccdName, butler, outputId)
def scale(self, ccdIdLists, data)
Determine scaling across CCDs and exposures.
def makeCameraImage(self, camera, dataId, calibs)
Create and write an image of the entire camera.
def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC)
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def processWrite(self, dataRef, exposure, outputName="postISRCCD")
Write the processed CCD.
def process(self, cache, ccdId, outputName="postISRCCD", **kwargs)
Process a CCD, specified by a data identifier.
def write(self, butler, exposure, dataId)
Write the final combined calib.
def getOutputId(self, expRefList, calibId)
Generate the data identifier for the output calib.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None)
def runDataRef(self, expRefList, butler, calibId)
Construct a calib from a list of exposure references.
def combine(self, cache, struct, outputId)
Combine multiple exposures of a particular CCD and write the output.
def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs)
Update the metadata from the VisitInfo.
def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId)
Calculate the output header from the raw headers.
def recordCalibInputs(self, butler, calib, dataIdList, outputId)
Record metadata including the inputs and creation details.
def measureBackground(self, cache, dataId)
Measure background model for CCD.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def processSingle(self, dataRef, backgrounds, scales)
def processSingleBackground(self, dataRef)
Process a single CCD for the background.
def combine(self, cache, struct, outputId)
Combine multiple background models of a particular CCD and write the output.
daf::base::PropertyList * list
Definition: fits.cc:928
daf::base::PropertySet * set
Definition: fits.cc:927
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:721
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
Definition: Exposure.h:445
std::shared_ptr< lsst::afw::image::Image< PixelT > > statisticsStack(std::vector< std::shared_ptr< lsst::afw::image::Image< PixelT > > > &images, Property flags, StatisticsControl const &sctrl=StatisticsControl(), std::vector< lsst::afw::image::VariancePixel > const &wvector=std::vector< lsst::afw::image::VariancePixel >(0))
A function to compute some statistics of a stack of Images.
Statistics makeStatistics(lsst::afw::image::Image< Pixel > const &img, lsst::afw::image::Mask< image::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl=StatisticsControl())
Handle a watered-down front-end to the constructor (no variance)
Definition: Statistics.h:361
Property
control what is calculated
Definition: Statistics.h:53
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binX, int const binY, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:44
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.
def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs)