LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
constructCalibs.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 
3 import sys
4 import math
5 import time
6 import argparse
7 import traceback
8 import collections
9 
10 import numpy as np
11 from builtins import zip
12 from builtins import range
13 
14 from lsst.pex.config import Config, ConfigurableField, Field, ListField, ConfigField
15 from lsst.pipe.base import Task, Struct, TaskRunner, ArgumentParser
16 import lsst.daf.base as dafBase
17 import lsst.afw.math as afwMath
18 import lsst.afw.geom as afwGeom
19 import lsst.afw.detection as afwDet
20 import lsst.afw.image as afwImage
21 import lsst.meas.algorithms as measAlg
22 from lsst.pipe.tasks.repair import RepairTask
23 from lsst.ip.isr import IsrTask
24 
25 from lsst.ctrl.pool.parallel import BatchPoolTask
26 from lsst.ctrl.pool.pool import Pool, NODE
27 from lsst.pipe.drivers.background import SkyMeasurementTask, FocalPlaneBackground, FocalPlaneBackgroundConfig
28 from lsst.pipe.drivers.visualizeVisit import makeCameraImage
29 
30 from .checksum import checksum
31 from .utils import getDataRef
32 
33 
35  """Parameters controlling the measurement of background statistics"""
36  stat = Field(doc="Statistic to use to estimate background (from lsst.afw.math)", dtype=int,
37  default=int(afwMath.MEANCLIP))
38  clip = Field(doc="Clipping threshold for background",
39  dtype=float, default=3.0)
40  nIter = Field(doc="Clipping iterations for background",
41  dtype=int, default=3)
42  maxVisitsToCalcErrorFromInputVariance = Field(
43  doc="Maximum number of visits to estimate variance from input variance, not per-pixel spread",
44  dtype=int, default=2)
45  mask = ListField(doc="Mask planes to reject",
46  dtype=str, default=["DETECTED", "BAD", "NO_DATA"])
47 
48 
50  """Measure statistics on the background
51 
52  This can be useful for scaling the background, e.g., for flats and fringe frames.
53  """
54  ConfigClass = CalibStatsConfig
55 
56  def run(self, exposureOrImage):
57  """!Measure a particular statistic on an image (of some sort).
58 
59  @param exposureOrImage Exposure, MaskedImage or Image.
60  @return Value of desired statistic
61  """
62  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
63  afwImage.Mask.getPlaneBitMask(self.config.mask))
64  try:
65  image = exposureOrImage.getMaskedImage()
66  except Exception:
67  try:
68  image = exposureOrImage.getImage()
69  except Exception:
70  image = exposureOrImage
71 
72  return afwMath.makeStatistics(image, self.config.stat, stats).getValue()
73 
74 
76  """Configuration for combining calib images"""
77  rows = Field(doc="Number of rows to read at a time",
78  dtype=int, default=512)
79  mask = ListField(doc="Mask planes to respect", dtype=str,
80  default=["SAT", "DETECTED", "INTRP"])
81  combine = Field(doc="Statistic to use for combination (from lsst.afw.math)", dtype=int,
82  default=int(afwMath.MEANCLIP))
83  clip = Field(doc="Clipping threshold for combination",
84  dtype=float, default=3.0)
85  nIter = Field(doc="Clipping iterations for combination",
86  dtype=int, default=3)
87  stats = ConfigurableField(target=CalibStatsTask,
88  doc="Background statistics configuration")
89 
90 
92  """Task to combine calib images"""
93  ConfigClass = CalibCombineConfig
94 
95  def __init__(self, *args, **kwargs):
96  Task.__init__(self, *args, **kwargs)
97  self.makeSubtask("stats")
98 
99  def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD"):
100  """!Combine calib images for a single sensor
101 
102  @param sensorRefList List of data references to combine (for a single sensor)
103  @param expScales List of scales to apply for each exposure
104  @param finalScale Desired scale for final combined image
105  @param inputName Data set name for inputs
106  @return combined image
107  """
108  width, height = self.getDimensions(sensorRefList)
109  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
110  afwImage.Mask.getPlaneBitMask(self.config.mask))
111  numImages = len(sensorRefList)
112  if numImages < 1:
113  raise RuntimeError("No valid input data")
114  if numImages < self.config.stats.maxVisitsToCalcErrorFromInputVariance:
115  stats.setCalcErrorFromInputVariance(True)
116 
117  # Combine images
118  combined = afwImage.MaskedImageF(width, height)
119  imageList = [None]*numImages
120  for start in range(0, height, self.config.rows):
121  rows = min(self.config.rows, height - start)
122  box = afwGeom.Box2I(afwGeom.Point2I(0, start),
123  afwGeom.Extent2I(width, rows))
124  subCombined = combined.Factory(combined, box)
125 
126  for i, sensorRef in enumerate(sensorRefList):
127  if sensorRef is None:
128  imageList[i] = None
129  continue
130  exposure = sensorRef.get(inputName + "_sub", bbox=box)
131  if expScales is not None:
132  self.applyScale(exposure, expScales[i])
133  imageList[i] = exposure.getMaskedImage()
134 
135  self.combine(subCombined, imageList, stats)
136 
137  if finalScale is not None:
138  background = self.stats.run(combined)
139  self.log.info("%s: Measured background of stack is %f; adjusting to %f" %
140  (NODE, background, finalScale))
141  combined *= finalScale / background
142 
143  return combined
144 
145  def getDimensions(self, sensorRefList, inputName="postISRCCD"):
146  """Get dimensions of the inputs"""
147  dimList = []
148  for sensorRef in sensorRefList:
149  if sensorRef is None:
150  continue
151  md = sensorRef.get(inputName + "_md")
152  dimList.append(afwImage.bboxFromMetadata(md).getDimensions())
153  return getSize(dimList)
154 
155  def applyScale(self, exposure, scale=None):
156  """Apply scale to input exposure
157 
158  This implementation applies a flux scaling: the input exposure is
159  divided by the provided scale.
160  """
161  if scale is not None:
162  mi = exposure.getMaskedImage()
163  mi /= scale
164 
165  def combine(self, target, imageList, stats):
166  """!Combine multiple images
167 
168  @param target Target image to receive the combined pixels
169  @param imageList List of input images
170  @param stats Statistics control
171  """
172  images = [img for img in imageList if img is not None]
173  afwMath.statisticsStack(target, images, afwMath.Property(self.config.combine), stats)
174 
175 
176 def getSize(dimList):
177  """Determine a consistent size, given a list of image sizes"""
178  dim = set((w, h) for w, h in dimList)
179  dim.discard(None)
180  if len(dim) != 1:
181  raise RuntimeError("Inconsistent dimensions: %s" % dim)
182  return dim.pop()
183 
184 
185 def dictToTuple(dict_, keys):
186  """!Return a tuple of specific values from a dict
187 
188  This provides a hashable representation of the dict from certain keywords.
189  This can be useful for creating e.g., a tuple of the values in the DataId
190  that identify the CCD.
191 
192  @param dict_ dict to parse
193  @param keys keys to extract (order is important)
194  @return tuple of values
195  """
196  return tuple(dict_[k] for k in keys)
197 
198 
199 def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"]):
200  """!Determine a list of CCDs from exposure references
201 
202  This essentially inverts the exposure-level references (which
203  provides a list of CCDs for each exposure), by providing
204  a dataId list for each CCD. Consider an input list of exposures
205  [e1, e2, e3], and each exposure has CCDs c1 and c2. Then this
206  function returns:
207 
208  {(c1,): [e1c1, e2c1, e3c1], (c2,): [e1c2, e2c2, e3c2]}
209 
210  This is a dict whose keys are tuples of the identifying values of a
211  CCD (usually just the CCD number) and the values are lists of dataIds
212  for that CCD in each exposure. A missing dataId is given the value
213  None.
214 
215  @param expRefList List of data references for exposures
216  @param level Level for the butler to generate CCDs
217  @param ccdKeys DataId keywords that identify a CCD
218  @return dict of data identifier lists for each CCD;
219  keys are values of ccdKeys in order
220  """
221  expIdList = [[ccdRef.dataId for ccdRef in expRef.subItems(
222  level)] for expRef in expRefList]
223 
224  # Determine what additional keys make a CCD from an exposure
225  if len(ccdKeys) != len(set(ccdKeys)):
226  raise RuntimeError("Duplicate keys found in ccdKeys: %s" % ccdKeys)
227  ccdNames = set() # Set of tuples which are values for each of the CCDs in an exposure
228  for ccdIdList in expIdList:
229  for ccdId in ccdIdList:
230  name = dictToTuple(ccdId, ccdKeys)
231  ccdNames.add(name)
232 
233  # Turn the list of CCDs for each exposure into a list of exposures for
234  # each CCD
235  ccdLists = {}
236  for n, ccdIdList in enumerate(expIdList):
237  for ccdId in ccdIdList:
238  name = dictToTuple(ccdId, ccdKeys)
239  if name not in ccdLists:
240  ccdLists[name] = []
241  ccdLists[name].append(ccdId)
242 
243  for ccd in ccdLists:
244  # Sort the list by the dataId values (ordered by key)
245  ccdLists[ccd] = sorted(ccdLists[ccd], key=lambda dd: dictToTuple(dd, sorted(dd.keys())))
246 
247  return ccdLists
248 
249 
250 def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs):
251  """Generate a matrix of results using pool.map
252 
253  The function should have the call signature:
254  func(cache, dataId, *args, **kwargs)
255 
256  We return a dict mapping 'ccd name' to a list of values for
257  each exposure.
258 
259  @param pool Process pool
260  @param func Function to call for each dataId
261  @param ccdIdLists Dict of data identifier lists for each CCD name
262  @return matrix of results
263  """
264  dataIdList = sum(ccdIdLists.values(), [])
265  resultList = pool.map(func, dataIdList, *args, **kwargs)
266  # Piece everything back together
267  data = dict((ccdName, [None] * len(expList)) for ccdName, expList in ccdIdLists.items())
268  indices = dict(sum([[(tuple(dataId.values()) if dataId is not None else None, (ccdName, expNum))
269  for expNum, dataId in enumerate(expList)]
270  for ccdName, expList in ccdIdLists.items()], []))
271  for dataId, result in zip(dataIdList, resultList):
272  if dataId is None:
273  continue
274  ccdName, expNum = indices[tuple(dataId.values())]
275  data[ccdName][expNum] = result
276  return data
277 
278 
279 class CalibIdAction(argparse.Action):
280  """Split name=value pairs and put the result in a dict"""
281 
282  def __call__(self, parser, namespace, values, option_string):
283  output = getattr(namespace, self.dest, {})
284  for nameValue in values:
285  name, sep, valueStr = nameValue.partition("=")
286  if not valueStr:
287  parser.error("%s value %s must be in form name=value" %
288  (option_string, nameValue))
289  output[name] = valueStr
290  setattr(namespace, self.dest, output)
291 
292 
294  """ArgumentParser for calibration construction"""
295 
296  def __init__(self, calibName, *args, **kwargs):
297  """Add a --calibId argument to the standard pipe_base argument parser"""
298  ArgumentParser.__init__(self, *args, **kwargs)
299  self.calibName = calibName
300  self.add_id_argument("--id", datasetType="raw",
301  help="input identifiers, e.g., --id visit=123 ccd=4")
302  self.add_argument("--calibId", nargs="*", action=CalibIdAction, default={},
303  help="identifiers for calib, e.g., --calibId version=1",
304  metavar="KEY=VALUE1[^VALUE2[^VALUE3...]")
305 
306  def parse_args(self, *args, **kwargs):
307  """Parse arguments
308 
309  Checks that the "--calibId" provided works.
310  """
311  namespace = ArgumentParser.parse_args(self, *args, **kwargs)
312 
313  keys = namespace.butler.getKeys(self.calibName)
314  parsed = {}
315  for name, value in namespace.calibId.items():
316  if name not in keys:
317  self.error(
318  "%s is not a relevant calib identifier key (%s)" % (name, keys))
319  parsed[name] = keys[name](value)
320  namespace.calibId = parsed
321 
322  return namespace
323 
324 
326  """Configuration for constructing calibs"""
327  clobber = Field(dtype=bool, default=True,
328  doc="Clobber existing processed images?")
329  isr = ConfigurableField(target=IsrTask, doc="ISR configuration")
330  dateObs = Field(dtype=str, default="dateObs",
331  doc="Key for observation date in exposure registry")
332  dateCalib = Field(dtype=str, default="calibDate",
333  doc="Key for calib date in calib registry")
334  filter = Field(dtype=str, default="filter",
335  doc="Key for filter name in exposure/calib registries")
336  combination = ConfigurableField(
337  target=CalibCombineTask, doc="Calib combination configuration")
338  ccdKeys = ListField(dtype=str, default=["ccd"],
339  doc="DataId keywords specifying a CCD")
340  visitKeys = ListField(dtype=str, default=["visit"],
341  doc="DataId keywords specifying a visit")
342  calibKeys = ListField(dtype=str, default=[],
343  doc="DataId keywords specifying a calibration")
344  doCameraImage = Field(dtype=bool, default=True, doc="Create camera overview image?")
345  binning = Field(dtype=int, default=64, doc="Binning to apply for camera image")
346 
347  def setDefaults(self):
348  self.isr.doWrite = False
349 
350 
352  """Get parsed values into the CalibTask.run"""
353  @staticmethod
354  def getTargetList(parsedCmd, **kwargs):
355  return [dict(expRefList=parsedCmd.id.refList, butler=parsedCmd.butler, calibId=parsedCmd.calibId)]
356 
357  def __call__(self, args):
358  """Call the Task with the kwargs from getTargetList"""
359  task = self.TaskClass(config=self.config, log=self.log)
360  exitStatus = 0 # exit status for the shell
361  if self.doRaise:
362  result = task.runDataRef(**args)
363  else:
364  try:
365  result = task.runDataRef(**args)
366  except Exception as e:
367  # n.b. The shell exit value is the number of dataRefs returning
368  # non-zero, so the actual value used here is lost
369  exitStatus = 1
370 
371  task.log.fatal("Failed: %s" % e)
372  traceback.print_exc(file=sys.stderr)
373 
374  if self.doReturnResults:
375  return Struct(
376  exitStatus=exitStatus,
377  args=args,
378  metadata=task.metadata,
379  result=result,
380  )
381  else:
382  return Struct(
383  exitStatus=exitStatus,
384  )
385 
386 
388  """!Base class for constructing calibs.
389 
390  This should be subclassed for each of the required calib types.
391  The subclass should be sure to define the following class variables:
392  * _DefaultName: default name of the task, used by CmdLineTask
393  * calibName: name of the calibration data set in the butler
394  The subclass may optionally set:
395  * filterName: filter name to give the resultant calib
396  """
397  ConfigClass = CalibConfig
398  RunnerClass = CalibTaskRunner
399  filterName = None
400  calibName = None
401  exposureTime = 1.0 # sets this exposureTime in the output
402 
403  def __init__(self, *args, **kwargs):
404  """Constructor"""
405  BatchPoolTask.__init__(self, *args, **kwargs)
406  self.makeSubtask("isr")
407  self.makeSubtask("combination")
408 
409  @classmethod
410  def batchWallTime(cls, time, parsedCmd, numCores):
411  numCcds = len(parsedCmd.butler.get("camera"))
412  numExps = len(cls.RunnerClass.getTargetList(
413  parsedCmd)[0]['expRefList'])
414  numCycles = int(numCcds/float(numCores) + 0.5)
415  return time*numExps*numCycles
416 
417  @classmethod
418  def _makeArgumentParser(cls, *args, **kwargs):
419  kwargs.pop("doBatch", False)
420  return CalibArgumentParser(calibName=cls.calibName, name=cls._DefaultName, *args, **kwargs)
421 
422  def runDataRef(self, expRefList, butler, calibId):
423  """!Construct a calib from a list of exposure references
424 
425  This is the entry point, called by the TaskRunner.__call__
426 
427  Only the master node executes this method.
428 
429  @param expRefList List of data references at the exposure level
430  @param butler Data butler
431  @param calibId Identifier dict for calib
432  """
433  if len(expRefList) < 1:
434  raise RuntimeError("No valid input data")
435 
436  for expRef in expRefList:
437  self.addMissingKeys(expRef.dataId, butler, self.config.ccdKeys, 'raw')
438 
439  outputId = self.getOutputId(expRefList, calibId)
440  ccdIdLists = getCcdIdListFromExposures(
441  expRefList, level="sensor", ccdKeys=self.config.ccdKeys)
442  self.checkCcdIdLists(ccdIdLists)
443 
444  # Ensure we can generate filenames for each output
445  outputIdItemList = list(outputId.items())
446  for ccdName in ccdIdLists:
447  dataId = dict([(k, ccdName[i]) for i, k in enumerate(self.config.ccdKeys)])
448  dataId.update(outputIdItemList)
449  self.addMissingKeys(dataId, butler)
450  dataId.update(outputIdItemList)
451 
452  try:
453  butler.get(self.calibName + "_filename", dataId)
454  except Exception as e:
455  raise RuntimeError(
456  "Unable to determine output filename \"%s_filename\" from %s: %s" %
457  (self.calibName, dataId, e))
458 
459  processPool = Pool("process")
460  processPool.storeSet(butler=butler)
461 
462  # Scatter: process CCDs independently
463  data = self.scatterProcess(processPool, ccdIdLists)
464 
465  # Gather: determine scalings
466  scales = self.scale(ccdIdLists, data)
467 
468  combinePool = Pool("combine")
469  combinePool.storeSet(butler=butler)
470 
471  # Scatter: combine
472  calibs = self.scatterCombine(combinePool, outputId, ccdIdLists, scales)
473 
474  if self.config.doCameraImage:
475  camera = butler.get("camera")
476  # Convert indexing of calibs from "ccdName" to detector ID (as used by makeImageFromCamera)
477  calibs = {butler.get("postISRCCD_detector",
478  dict(zip(self.config.ccdKeys, ccdName))).getId(): calibs[ccdName]
479  for ccdName in ccdIdLists}
480 
481  try:
482  cameraImage = self.makeCameraImage(camera, outputId, calibs)
483  butler.put(cameraImage, self.calibName + "_camera", dataId)
484  except Exception as exc:
485  self.log.warn("Unable to create camera image: %s" % (exc,))
486 
487  return Struct(
488  outputId=outputId,
489  ccdIdLists=ccdIdLists,
490  scales=scales,
491  calibs=calibs,
492  processPool=processPool,
493  combinePool=combinePool,
494  )
495 
496  def getOutputId(self, expRefList, calibId):
497  """!Generate the data identifier for the output calib
498 
499  The mean date and the common filter are included, using keywords
500  from the configuration. The CCD-specific part is not included
501  in the data identifier.
502 
503  @param expRefList List of data references at exposure level
504  @param calibId Data identifier elements for the calib provided by the user
505  @return data identifier
506  """
507  midTime = 0
508  filterName = None
509  for expRef in expRefList:
510  butler = expRef.getButler()
511  dataId = expRef.dataId
512 
513  midTime += self.getMjd(butler, dataId)
514  thisFilter = self.getFilter(
515  butler, dataId) if self.filterName is None else self.filterName
516  if filterName is None:
517  filterName = thisFilter
518  elif filterName != thisFilter:
519  raise RuntimeError("Filter mismatch for %s: %s vs %s" % (
520  dataId, thisFilter, filterName))
521 
522  midTime /= len(expRefList)
523  date = str(dafBase.DateTime(
524  midTime, dafBase.DateTime.MJD).toPython().date())
525 
526  outputId = {self.config.filter: filterName,
527  self.config.dateCalib: date}
528  outputId.update(calibId)
529  return outputId
530 
531  def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
532  """Determine the Modified Julian Date (MJD; in TAI) from a data identifier"""
533  if self.config.dateObs in dataId:
534  dateObs = dataId[self.config.dateObs]
535  else:
536  dateObs = butler.queryMetadata('raw', [self.config.dateObs], dataId)[0]
537  if "T" not in dateObs:
538  dateObs = dateObs + "T12:00:00.0Z"
539  elif not dateObs.endswith("Z"):
540  dateObs += "Z"
541 
542  return dafBase.DateTime(dateObs, timescale).get(dafBase.DateTime.MJD)
543 
544  def getFilter(self, butler, dataId):
545  """Determine the filter from a data identifier"""
546  filt = butler.queryMetadata('raw', [self.config.filter], dataId)[0]
547  return filt
548 
549  def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None):
550  if calibName is None:
551  calibName = self.calibName
552 
553  if missingKeys is None:
554  missingKeys = set(butler.getKeys(calibName).keys()) - set(dataId.keys())
555 
556  for k in missingKeys:
557  try:
558  v = butler.queryMetadata('raw', [k], dataId) # n.b. --id refers to 'raw'
559  except Exception:
560  continue
561 
562  if len(v) == 0: # failed to lookup value
563  continue
564 
565  if len(v) == 1:
566  dataId[k] = v[0]
567  else:
568  raise RuntimeError("No unique lookup for %s: %s" % (k, v))
569 
570  def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs):
571  """!Update the metadata from the VisitInfo
572 
573  @param calibImage The image whose metadata is to be set
574  @param exposureTime The exposure time for the image
575  @param darkTime The time since the last read (default: exposureTime)
576  """
577 
578  if darkTime is None:
579  darkTime = exposureTime # avoid warning messages when using calibration products
580 
581  visitInfo = afwImage.makeVisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
582  md = calibImage.getMetadata()
583 
584  afwImage.setVisitInfoMetadata(md, visitInfo)
585 
586  def scatterProcess(self, pool, ccdIdLists):
587  """!Scatter the processing among the nodes
588 
589  We scatter each CCD independently (exposures aren't grouped together),
590  to make full use of all available processors. This necessitates piecing
591  everything back together in the same format as ccdIdLists afterwards.
592 
593  Only the master node executes this method.
594 
595  @param pool Process pool
596  @param ccdIdLists Dict of data identifier lists for each CCD name
597  @return Dict of lists of returned data for each CCD name
598  """
599  self.log.info("Scatter processing")
600  return mapToMatrix(pool, self.process, ccdIdLists)
601 
602  def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
603  """!Process a CCD, specified by a data identifier
604 
605  After processing, optionally returns a result (produced by
606  the 'processResult' method) calculated from the processed
607  exposure. These results will be gathered by the master node,
608  and is a means for coordinated scaling of all CCDs for flats,
609  etc.
610 
611  Only slave nodes execute this method.
612 
613  @param cache Process pool cache
614  @param ccdId Data identifier for CCD
615  @param outputName Output dataset name for butler
616  @return result from 'processResult'
617  """
618  if ccdId is None:
619  self.log.warn("Null identifier received on %s" % NODE)
620  return None
621  sensorRef = getDataRef(cache.butler, ccdId)
622  if self.config.clobber or not sensorRef.datasetExists(outputName):
623  self.log.info("Processing %s on %s" % (ccdId, NODE))
624  try:
625  exposure = self.processSingle(sensorRef, **kwargs)
626  except Exception as e:
627  self.log.warn("Unable to process %s: %s" % (ccdId, e))
628  raise
629  return None
630  self.processWrite(sensorRef, exposure)
631  else:
632  self.log.info(
633  "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
634  exposure = sensorRef.get(outputName)
635  return self.processResult(exposure)
636 
637  def processSingle(self, dataRef):
638  """Process a single CCD, specified by a data reference
639 
640  Generally, this simply means doing ISR.
641 
642  Only slave nodes execute this method.
643  """
644  return self.isr.runDataRef(dataRef).exposure
645 
646  def processWrite(self, dataRef, exposure, outputName="postISRCCD"):
647  """!Write the processed CCD
648 
649  We need to write these out because we can't hold them all in
650  memory at once.
651 
652  Only slave nodes execute this method.
653 
654  @param dataRef Data reference
655  @param exposure CCD exposure to write
656  @param outputName Output dataset name for butler.
657  """
658  dataRef.put(exposure, outputName)
659 
660  def processResult(self, exposure):
661  """Extract processing results from a processed exposure
662 
663  This method generates what is gathered by the master node.
664  This can be a background measurement or similar for scaling
665  flat-fields. It must be picklable!
666 
667  Only slave nodes execute this method.
668  """
669  return None
670 
671  def scale(self, ccdIdLists, data):
672  """!Determine scaling across CCDs and exposures
673 
674  This is necessary mainly for flats, so as to determine a
675  consistent scaling across the entire focal plane. This
676  implementation is simply a placeholder.
677 
678  Only the master node executes this method.
679 
680  @param ccdIdLists Dict of data identifier lists for each CCD tuple
681  @param data Dict of lists of returned data for each CCD tuple
682  @return dict of Struct(ccdScale: scaling for CCD,
683  expScales: scaling for each exposure
684  ) for each CCD tuple
685  """
686  self.log.info("Scale on %s" % NODE)
687  return dict((name, Struct(ccdScale=None, expScales=[None] * len(ccdIdLists[name])))
688  for name in ccdIdLists)
689 
690  def scatterCombine(self, pool, outputId, ccdIdLists, scales):
691  """!Scatter the combination of exposures across multiple nodes
692 
693  In this case, we can only scatter across as many nodes as
694  there are CCDs.
695 
696  Only the master node executes this method.
697 
698  @param pool Process pool
699  @param outputId Output identifier (exposure part only)
700  @param ccdIdLists Dict of data identifier lists for each CCD name
701  @param scales Dict of structs with scales, for each CCD name
702  @param dict of binned images
703  """
704  self.log.info("Scatter combination")
705  data = [Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName]) for
706  ccdName in ccdIdLists]
707  images = pool.map(self.combine, data, outputId)
708  return dict(zip(ccdIdLists.keys(), images))
709 
710  def getFullyQualifiedOutputId(self, ccdName, butler, outputId):
711  """Get fully-qualified output data identifier
712 
713  We may need to look up keys that aren't in the output dataId.
714 
715  @param ccdName Name tuple for CCD
716  @param butler Data butler
717  @param outputId Data identifier for combined image (exposure part only)
718  @return fully-qualified output dataId
719  """
720  fullOutputId = {k: ccdName[i] for i, k in enumerate(self.config.ccdKeys)}
721  fullOutputId.update(outputId)
722  self.addMissingKeys(fullOutputId, butler)
723  fullOutputId.update(outputId) # must be after the call to queryMetadata in 'addMissingKeys'
724  return fullOutputId
725 
726  def combine(self, cache, struct, outputId):
727  """!Combine multiple exposures of a particular CCD and write the output
728 
729  Only the slave nodes execute this method.
730 
731  @param cache Process pool cache
732  @param struct Parameters for the combination, which has the following components:
733  * ccdName Name tuple for CCD
734  * ccdIdList List of data identifiers for combination
735  * scales Scales to apply (expScales are scalings for each exposure,
736  ccdScale is final scale for combined image)
737  @param outputId Data identifier for combined image (exposure part only)
738  @return binned calib image
739  """
740  outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
741  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
742  dataId in struct.ccdIdList]
743  self.log.info("Combining %s on %s" % (outputId, NODE))
744  calib = self.combination.run(dataRefList, expScales=struct.scales.expScales,
745  finalScale=struct.scales.ccdScale)
746 
747  if not hasattr(calib, "getMetadata"):
748  if hasattr(calib, "getVariance"):
749  calib = afwImage.makeExposure(calib)
750  else:
751  calib = afwImage.DecoratedImageF(calib.getImage()) # n.b. hardwires "F" for the output type
752 
753  self.updateMetadata(calib, self.exposureTime)
754 
755  self.recordCalibInputs(cache.butler, calib,
756  struct.ccdIdList, outputId)
757 
758  self.interpolateNans(calib)
759 
760  self.write(cache.butler, calib, outputId)
761 
762  return afwMath.binImage(calib.getImage(), self.config.binning)
763 
764  def recordCalibInputs(self, butler, calib, dataIdList, outputId):
765  """!Record metadata including the inputs and creation details
766 
767  This metadata will go into the FITS header.
768 
769  @param butler Data butler
770  @param calib Combined calib exposure.
771  @param dataIdList List of data identifiers for calibration inputs
772  @param outputId Data identifier for output
773  """
774  header = calib.getMetadata()
775  header.add("OBSTYPE", self.calibName) # Used by ingestCalibs.py
776 
777  # date, time, host, and root
778  now = time.localtime()
779  header.add("CALIB_CREATION_DATE", time.strftime("%Y-%m-%d", now))
780  header.add("CALIB_CREATION_TIME", time.strftime("%X %Z", now))
781  # add date-obs as its absence upsets ExposureInfo; use the mean date that the calibs were taken
782  header.add("DATE-OBS", "%sT00:00:00.00" % outputId[self.config.dateCalib])
783 
784  # Inputs
785  visits = [str(dictToTuple(dataId, self.config.visitKeys)) for dataId in dataIdList if
786  dataId is not None]
787  for i, v in enumerate(sorted(set(visits))):
788  header.add("CALIB_INPUT_%d" % (i,), v)
789 
790  header.add("CALIB_ID", " ".join("%s=%s" % (key, value)
791  for key, value in outputId.items()))
792  checksum(calib, header)
793 
794  def interpolateNans(self, image):
795  """Interpolate over NANs in the combined image
796 
797  NANs can result from masked areas on the CCD. We don't want them getting
798  into our science images, so we replace them with the median of the image.
799  """
800  if hasattr(image, "getMaskedImage"): # Deal with Exposure vs Image
801  self.interpolateNans(image.getMaskedImage().getVariance())
802  image = image.getMaskedImage().getImage()
803  if hasattr(image, "getImage"): # Deal with DecoratedImage or MaskedImage vs Image
804  image = image.getImage()
805  array = image.getArray()
806  bad = np.isnan(array)
807  array[bad] = np.median(array[np.logical_not(bad)])
808 
809  def write(self, butler, exposure, dataId):
810  """!Write the final combined calib
811 
812  Only the slave nodes execute this method
813 
814  @param butler Data butler
815  @param exposure CCD exposure to write
816  @param dataId Data identifier for output
817  """
818  self.log.info("Writing %s on %s" % (dataId, NODE))
819  butler.put(exposure, self.calibName, dataId)
820 
821  def makeCameraImage(self, camera, dataId, calibs):
822  """!Create and write an image of the entire camera
823 
824  This is useful for judging the quality or getting an overview of
825  the features of the calib.
826 
827  @param camera Camera object
828  @param dataId Data identifier for output
829  @param calibs Dict mapping CCD detector ID to calib image
830  """
831  return makeCameraImage(camera, calibs, self.config.binning)
832 
833  def checkCcdIdLists(self, ccdIdLists):
834  """Check that the list of CCD dataIds is consistent
835 
836  @param ccdIdLists Dict of data identifier lists for each CCD name
837  @return Number of exposures, number of CCDs
838  """
839  visitIdLists = collections.defaultdict(list)
840  for ccdName in ccdIdLists:
841  for dataId in ccdIdLists[ccdName]:
842  visitName = dictToTuple(dataId, self.config.visitKeys)
843  visitIdLists[visitName].append(dataId)
844 
845  numExps = set(len(expList) for expList in ccdIdLists.values())
846  numCcds = set(len(ccdList) for ccdList in visitIdLists.values())
847 
848  if len(numExps) != 1 or len(numCcds) != 1:
849  # Presumably a visit somewhere doesn't have the full complement available.
850  # Dump the information so the user can figure it out.
851  self.log.warn("Number of visits for each CCD: %s",
852  {ccdName: len(ccdIdLists[ccdName]) for ccdName in ccdIdLists})
853  self.log.warn("Number of CCDs for each visit: %s",
854  {vv: len(visitIdLists[vv]) for vv in visitIdLists})
855  raise RuntimeError("Inconsistent number of exposures/CCDs")
856 
857  return numExps.pop(), numCcds.pop()
858 
859 
861  """Configuration for bias construction.
862 
863  No changes required compared to the base class, but
864  subclassed for distinction.
865  """
866  pass
867 
868 
869 class BiasTask(CalibTask):
870  """Bias construction"""
871  ConfigClass = BiasConfig
872  _DefaultName = "bias"
873  calibName = "bias"
874  filterName = "NONE" # Sets this filter name in the output
875  exposureTime = 0.0 # sets this exposureTime in the output
876 
877  @classmethod
878  def applyOverrides(cls, config):
879  """Overrides to apply for bias construction"""
880  config.isr.doBias = False
881  config.isr.doDark = False
882  config.isr.doFlat = False
883  config.isr.doFringe = False
884 
885 
887  """Configuration for dark construction"""
888  doRepair = Field(dtype=bool, default=True, doc="Repair artifacts?")
889  psfFwhm = Field(dtype=float, default=3.0, doc="Repair PSF FWHM (pixels)")
890  psfSize = Field(dtype=int, default=21, doc="Repair PSF size (pixels)")
891  crGrow = Field(dtype=int, default=2, doc="Grow radius for CR (pixels)")
893  target=RepairTask, doc="Task to repair artifacts")
894 
895  def setDefaults(self):
896  CalibConfig.setDefaults(self)
897  self.combination.mask.append("CR")
898 
899 
901  """Dark construction
902 
903  The only major difference from the base class is a cosmic-ray
904  identification stage, and dividing each image by the dark time
905  to generate images of the dark rate.
906  """
907  ConfigClass = DarkConfig
908  _DefaultName = "dark"
909  calibName = "dark"
910  filterName = "NONE" # Sets this filter name in the output
911 
912  def __init__(self, *args, **kwargs):
913  CalibTask.__init__(self, *args, **kwargs)
914  self.makeSubtask("repair")
915 
916  @classmethod
917  def applyOverrides(cls, config):
918  """Overrides to apply for dark construction"""
919  config.isr.doDark = False
920  config.isr.doFlat = False
921  config.isr.doFringe = False
922 
923  def processSingle(self, sensorRef):
924  """Process a single CCD
925 
926  Besides the regular ISR, also masks cosmic-rays and divides each
927  processed image by the dark time to generate images of the dark rate.
928  The dark time is provided by the 'getDarkTime' method.
929  """
930  exposure = CalibTask.processSingle(self, sensorRef)
931 
932  if self.config.doRepair:
933  psf = measAlg.DoubleGaussianPsf(self.config.psfSize, self.config.psfSize,
934  self.config.psfFwhm/(2*math.sqrt(2*math.log(2))))
935  exposure.setPsf(psf)
936  self.repair.run(exposure, keepCRs=False)
937  if self.config.crGrow > 0:
938  mask = exposure.getMaskedImage().getMask().clone()
939  mask &= mask.getPlaneBitMask("CR")
940  fpSet = afwDet.FootprintSet(
941  mask, afwDet.Threshold(0.5))
942  fpSet = afwDet.FootprintSet(fpSet, self.config.crGrow, True)
943  fpSet.setMask(exposure.getMaskedImage().getMask(), "CR")
944 
945  mi = exposure.getMaskedImage()
946  mi /= self.getDarkTime(exposure)
947  return exposure
948 
949  def getDarkTime(self, exposure):
950  """Retrieve the dark time for an exposure"""
951  darkTime = exposure.getInfo().getVisitInfo().getDarkTime()
952  if not np.isfinite(darkTime):
953  raise RuntimeError("Non-finite darkTime")
954  return darkTime
955 
956 
958  """Configuration for flat construction"""
959  iterations = Field(dtype=int, default=10,
960  doc="Number of iterations for scale determination")
961  stats = ConfigurableField(target=CalibStatsTask,
962  doc="Background statistics configuration")
963 
964 
966  """Flat construction
967 
968  The principal change from the base class involves gathering the background
969  values from each image and using them to determine the scalings for the final
970  combination.
971  """
972  ConfigClass = FlatConfig
973  _DefaultName = "flat"
974  calibName = "flat"
975 
976  @classmethod
977  def applyOverrides(cls, config):
978  """Overrides for flat construction"""
979  config.isr.doFlat = False
980  config.isr.doFringe = False
981 
982  def __init__(self, *args, **kwargs):
983  CalibTask.__init__(self, *args, **kwargs)
984  self.makeSubtask("stats")
985 
986  def processResult(self, exposure):
987  return self.stats.run(exposure)
988 
989  def scale(self, ccdIdLists, data):
990  """Determine the scalings for the final combination
991 
992  We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
993  of one CCD to all the others in an exposure, and E_j is the scaling
994  of the exposure. We convert everything to logarithms so we can work
995  with a linear system. We determine the C_i and E_j from B_ij by iteration,
996  under the additional constraint that the average CCD scale is unity.
997 
998  This algorithm comes from Eugene Magnier and Pan-STARRS.
999  """
1000  assert len(ccdIdLists.values()) > 0, "No successful CCDs"
1001  lengths = set([len(expList) for expList in ccdIdLists.values()])
1002  assert len(
1003  lengths) == 1, "Number of successful exposures for each CCD differs"
1004  assert tuple(lengths)[0] > 0, "No successful exposures"
1005  # Format background measurements into a matrix
1006  indices = dict((name, i) for i, name in enumerate(ccdIdLists))
1007  bgMatrix = np.array([[0.0] * len(expList)
1008  for expList in ccdIdLists.values()])
1009  for name in ccdIdLists:
1010  i = indices[name]
1011  bgMatrix[i] = [
1012  d if d is not None else np.nan for d in data[name]]
1013 
1014  numpyPrint = np.get_printoptions()
1015  np.set_printoptions(threshold=np.inf)
1016  self.log.info("Input backgrounds: %s" % bgMatrix)
1017 
1018  # Flat-field scaling
1019  numCcds = len(ccdIdLists)
1020  numExps = bgMatrix.shape[1]
1021  # log(Background) for each exposure/component
1022  bgMatrix = np.log(bgMatrix)
1023  bgMatrix = np.ma.masked_array(bgMatrix, np.isnan(bgMatrix))
1024  # Initial guess at log(scale) for each component
1025  compScales = np.zeros(numCcds)
1026  expScales = np.array(
1027  [(bgMatrix[:, i0] - compScales).mean() for i0 in range(numExps)])
1028 
1029  for iterate in range(self.config.iterations):
1030  compScales = np.array(
1031  [(bgMatrix[i1, :] - expScales).mean() for i1 in range(numCcds)])
1032  expScales = np.array(
1033  [(bgMatrix[:, i2] - compScales).mean() for i2 in range(numExps)])
1034 
1035  avgScale = np.average(np.exp(compScales))
1036  compScales -= np.log(avgScale)
1037  self.log.debug("Iteration %d exposure scales: %s",
1038  iterate, np.exp(expScales))
1039  self.log.debug("Iteration %d component scales: %s",
1040  iterate, np.exp(compScales))
1041 
1042  expScales = np.array(
1043  [(bgMatrix[:, i3] - compScales).mean() for i3 in range(numExps)])
1044 
1045  if np.any(np.isnan(expScales)):
1046  raise RuntimeError("Bad exposure scales: %s --> %s" %
1047  (bgMatrix, expScales))
1048 
1049  expScales = np.exp(expScales)
1050  compScales = np.exp(compScales)
1051 
1052  self.log.info("Exposure scales: %s" % expScales)
1053  self.log.info("Component relative scaling: %s" % compScales)
1054  np.set_printoptions(**numpyPrint)
1055 
1056  return dict((ccdName, Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1057  for ccdName in ccdIdLists)
1058 
1059 
1061  """Configuration for fringe construction"""
1062  stats = ConfigurableField(target=CalibStatsTask,
1063  doc="Background statistics configuration")
1064  subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1065  doc="Background configuration")
1066  detection = ConfigurableField(
1067  target=measAlg.SourceDetectionTask, doc="Detection configuration")
1068  detectSigma = Field(dtype=float, default=1.0,
1069  doc="Detection PSF gaussian sigma")
1070 
1071  def setDefaults(self):
1072  CalibConfig.setDefaults(self)
1073  self.detection.reEstimateBackground = False
1074 
1075 
1077  """Fringe construction task
1078 
1079  The principal change from the base class is that the images are
1080  background-subtracted and rescaled by the background.
1081 
1082  XXX This is probably not right for a straight-up combination, as we
1083  are currently doing, since the fringe amplitudes need not scale with
1084  the continuum.
1085 
1086  XXX Would like to have this do PCA and generate multiple images, but
1087  that will take a bit of work with the persistence code.
1088  """
1089  ConfigClass = FringeConfig
1090  _DefaultName = "fringe"
1091  calibName = "fringe"
1092 
1093  @classmethod
1094  def applyOverrides(cls, config):
1095  """Overrides for fringe construction"""
1096  config.isr.doFringe = False
1097 
1098  def __init__(self, *args, **kwargs):
1099  CalibTask.__init__(self, *args, **kwargs)
1100  self.makeSubtask("detection")
1101  self.makeSubtask("stats")
1102  self.makeSubtask("subtractBackground")
1103 
1104  def processSingle(self, sensorRef):
1105  """Subtract the background and normalise by the background level"""
1106  exposure = CalibTask.processSingle(self, sensorRef)
1107  bgLevel = self.stats.run(exposure)
1108  self.subtractBackground.run(exposure)
1109  mi = exposure.getMaskedImage()
1110  mi /= bgLevel
1111  footprintSets = self.detection.detectFootprints(
1112  exposure, sigma=self.config.detectSigma)
1113  mask = exposure.getMaskedImage().getMask()
1114  detected = 1 << mask.addMaskPlane("DETECTED")
1115  for fpSet in (footprintSets.positive, footprintSets.negative):
1116  if fpSet is not None:
1117  afwDet.setMaskFromFootprintList(
1118  mask, fpSet.getFootprints(), detected)
1119  return exposure
1120 
1121 
1123  """Configuration for sky frame construction"""
1124  detection = ConfigurableField(target=measAlg.SourceDetectionTask, doc="Detection configuration")
1125  detectSigma = Field(dtype=float, default=2.0, doc="Detection PSF gaussian sigma")
1126  subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1127  doc="Regular-scale background configuration, for object detection")
1128  largeScaleBackground = ConfigField(dtype=FocalPlaneBackgroundConfig,
1129  doc="Large-scale background configuration")
1130  sky = ConfigurableField(target=SkyMeasurementTask, doc="Sky measurement")
1131  maskThresh = Field(dtype=float, default=3.0, doc="k-sigma threshold for masking pixels")
1132  mask = ListField(dtype=str, default=["BAD", "SAT", "DETECTED", "NO_DATA"],
1133  doc="Mask planes to consider as contaminated")
1134 
1135 
1137  """Task for sky frame construction
1138 
1139  The sky frame is a (relatively) small-scale background
1140  model, the response of the camera to the sky.
1141 
1142  To construct, we first remove a large-scale background (e.g., caused
1143  by moonlight) which may vary from image to image. Then we construct a
1144  model of the sky, which is essentially a binned version of the image
1145  (important configuration parameters: sky.background.[xy]BinSize).
1146  It is these models which are coadded to yield the sky frame.
1147  """
1148  ConfigClass = SkyConfig
1149  _DefaultName = "sky"
1150  calibName = "sky"
1151 
1152  def __init__(self, *args, **kwargs):
1153  CalibTask.__init__(self, *args, **kwargs)
1154  self.makeSubtask("detection")
1155  self.makeSubtask("subtractBackground")
1156  self.makeSubtask("sky")
1157 
1158  def scatterProcess(self, pool, ccdIdLists):
1159  """!Scatter the processing among the nodes
1160 
1161  Only the master node executes this method, assigning work to the
1162  slaves.
1163 
1164  We measure and subtract off a large-scale background model across
1165  all CCDs, which requires a scatter/gather. Then we process the
1166  individual CCDs, subtracting the large-scale background model and
1167  the residual background model measured. These residuals will be
1168  combined for the sky frame.
1169 
1170  @param pool Process pool
1171  @param ccdIdLists Dict of data identifier lists for each CCD name
1172  @return Dict of lists of returned data for each CCD name
1173  """
1174  self.log.info("Scatter processing")
1175 
1176  numExps = set(len(expList) for expList in ccdIdLists.values()).pop()
1177 
1178  # First subtract off general gradients to make all the exposures look similar.
1179  # We want to preserve the common small-scale structure, which we will coadd.
1180  bgModelList = mapToMatrix(pool, self.measureBackground, ccdIdLists)
1181 
1182  backgrounds = {}
1183  scales = {}
1184  for exp in range(numExps):
1185  bgModels = [bgModelList[ccdName][exp] for ccdName in ccdIdLists]
1186  visit = set(tuple(ccdIdLists[ccdName][exp][key] for key in sorted(self.config.visitKeys)) for
1187  ccdName in ccdIdLists)
1188  assert len(visit) == 1
1189  visit = visit.pop()
1190  bgModel = bgModels[0]
1191  for bg in bgModels[1:]:
1192  bgModel.merge(bg)
1193  self.log.info("Background model min/max for visit %s: %f %f", visit,
1194  np.min(bgModel.getStatsImage().getArray()),
1195  np.max(bgModel.getStatsImage().getArray()))
1196  backgrounds[visit] = bgModel
1197  scales[visit] = np.median(bgModel.getStatsImage().getArray())
1198 
1199  return mapToMatrix(pool, self.process, ccdIdLists, backgrounds=backgrounds, scales=scales)
1200 
1201  def measureBackground(self, cache, dataId):
1202  """!Measure background model for CCD
1203 
1204  This method is executed by the slaves.
1205 
1206  The background models for all CCDs in an exposure will be
1207  combined to form a full focal-plane background model.
1208 
1209  @param cache Process pool cache
1210  @param dataId Data identifier
1211  @return Bcakground model
1212  """
1213  dataRef = getDataRef(cache.butler, dataId)
1214  exposure = self.processSingleBackground(dataRef)
1215 
1216  # NAOJ prototype smoothed and then combined the entire image, but it shouldn't be any different
1217  # to bin and combine the binned images except that there's fewer pixels to worry about.
1218  config = self.config.largeScaleBackground
1219  camera = dataRef.get("camera")
1220  bgModel = FocalPlaneBackground.fromCamera(config, camera)
1221  bgModel.addCcd(exposure)
1222  return bgModel
1223 
1224  def processSingleBackground(self, dataRef):
1225  """!Process a single CCD for the background
1226 
1227  This method is executed by the slaves.
1228 
1229  Because we're interested in the background, we detect and mask astrophysical
1230  sources, and pixels above the noise level.
1231 
1232  @param dataRef Data reference for CCD.
1233  @return processed exposure
1234  """
1235  if not self.config.clobber and dataRef.datasetExists("postISRCCD"):
1236  return dataRef.get("postISRCCD")
1237  exposure = CalibTask.processSingle(self, dataRef)
1238 
1239  # Detect sources. Requires us to remove the background; we'll restore it later.
1240  bgTemp = self.subtractBackground.run(exposure).background
1241  footprints = self.detection.detectFootprints(exposure, sigma=self.config.detectSigma)
1242  image = exposure.getMaskedImage()
1243  if footprints.background is not None:
1244  image += footprints.background.getImage()
1245 
1246  # Mask high pixels
1247  variance = image.getVariance()
1248  noise = np.sqrt(np.median(variance.getArray()))
1249  isHigh = image.getImage().getArray() > self.config.maskThresh*noise
1250  image.getMask().getArray()[isHigh] |= image.getMask().getPlaneBitMask("DETECTED")
1251 
1252  # Restore the background: it's what we want!
1253  image += bgTemp.getImage()
1254 
1255  # Set detected/bad pixels to background to ensure they don't corrupt the background
1256  maskVal = image.getMask().getPlaneBitMask(self.config.mask)
1257  isBad = image.getMask().getArray() & maskVal > 0
1258  bgLevel = np.median(image.getImage().getArray()[~isBad])
1259  image.getImage().getArray()[isBad] = bgLevel
1260  dataRef.put(exposure, "postISRCCD")
1261  return exposure
1262 
1263  def processSingle(self, dataRef, backgrounds, scales):
1264  """Process a single CCD, specified by a data reference
1265 
1266  We subtract the appropriate focal plane background model,
1267  divide by the appropriate scale and measure the background.
1268 
1269  Only slave nodes execute this method.
1270 
1271  @param dataRef Data reference for single CCD
1272  @param backgrounds Background model for each visit
1273  @param scales Scales for each visit
1274  @return Processed exposure
1275  """
1276  visit = tuple(dataRef.dataId[key] for key in sorted(self.config.visitKeys))
1277  exposure = dataRef.get("postISRCCD", immediate=True)
1278  image = exposure.getMaskedImage()
1279  detector = exposure.getDetector()
1280  bbox = image.getBBox()
1281 
1282  bgModel = backgrounds[visit]
1283  bg = bgModel.toCcdBackground(detector, bbox)
1284  image -= bg.getImage()
1285  image /= scales[visit]
1286 
1287  bg = self.sky.measureBackground(exposure.getMaskedImage())
1288  dataRef.put(bg, "icExpBackground")
1289  return exposure
1290 
1291  def combine(self, cache, struct, outputId):
1292  """!Combine multiple background models of a particular CCD and write the output
1293 
1294  Only the slave nodes execute this method.
1295 
1296  @param cache Process pool cache
1297  @param struct Parameters for the combination, which has the following components:
1298  * ccdName Name tuple for CCD
1299  * ccdIdList List of data identifiers for combination
1300  @param outputId Data identifier for combined image (exposure part only)
1301  @return binned calib image
1302  """
1303  outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
1304  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
1305  dataId in struct.ccdIdList]
1306  self.log.info("Combining %s on %s" % (outputId, NODE))
1307  bgList = [dataRef.get("icExpBackground", immediate=True).clone() for dataRef in dataRefList]
1308 
1309  bgExp = self.sky.averageBackgrounds(bgList)
1310 
1311  self.recordCalibInputs(cache.butler, bgExp, struct.ccdIdList, outputId)
1312  cache.butler.put(bgExp, "sky", outputId)
1313  return afwMath.binImage(self.sky.exposureToBackground(bgExp).getImage(), self.config.binning)
def processWrite(self, dataRef, exposure, outputName="postISRCCD")
Write the processed CCD.
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64
def scatterCombine(self, pool, outputId, ccdIdLists, scales)
Scatter the combination of exposures across multiple nodes.
def run(self, exposureOrImage)
Measure a particular statistic on an image (of some sort).
def checksum(obj, header=None, sumType="MD5")
Calculate a checksum of an object.
Definition: checksum.py:29
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.
A Threshold is used to pass a threshold value to detection algorithms.
Definition: Threshold.h:43
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
Definition: fits.cc:832
def getFullyQualifiedOutputId(self, ccdName, butler, outputId)
def __call__(self, parser, namespace, values, option_string)
int min
Statistics makeStatistics(lsst::afw::math::MaskedVector< EntryT > const &mv, std::vector< WeightPixel > const &vweights, int const flags, StatisticsControl const &sctrl=StatisticsControl())
The makeStatistics() overload to handle lsst::afw::math::MaskedVector<>
Definition: Statistics.h:520
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
Definition: Exposure.h:446
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
def updateMetadata(self, calibImage, exposureTime, darkTime=None, kwargs)
Update the metadata from the VisitInfo.
Pass parameters to a Statistics object.
Definition: Statistics.h:93
def process(self, cache, ccdId, outputName="postISRCCD", kwargs)
Process a CCD, specified by a data identifier.
def processSingle(self, dataRef, backgrounds, scales)
def makeCameraImage(self, camera, dataId, calibs)
Create and write an image of the entire camera.
def combine(self, target, imageList, stats)
Combine multiple images.
A set of Footprints, associated with a MaskedImage.
Definition: FootprintSet.h:53
def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC)
def combine(self, cache, struct, outputId)
Combine multiple exposures of a particular CCD and write the output.
def runDataRef(self, expRefList, butler, calibId)
Construct a calib from a list of exposure references.
std::shared_ptr< image::MaskedImage< PixelT > > statisticsStack(image::MaskedImage< PixelT > const &image, Property flags, char dimension, StatisticsControl const &sctrl)
A function to compute statistics on the rows or columns of an image.
Definition: Stack.cc:478
def write(self, butler, exposure, dataId)
Write the final combined calib.
def combine(self, cache, struct, outputId)
Combine multiple background models of a particular CCD and write the output.
def measureBackground(self, cache, dataId)
Measure background model for CCD.
def recordCalibInputs(self, butler, calib, dataIdList, outputId)
Record metadata including the inputs and creation details.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD")
Combine calib images for a single sensor.
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binsize, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:38
def getOutputId(self, expRefList, calibId)
Generate the data identifier for the output calib.
def mapToMatrix(pool, func, ccdIdLists, args, kwargs)
def scale(self, ccdIdLists, data)
Determine scaling across CCDs and exposures.
def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None)
Property
control what is calculated
Definition: Statistics.h:63
def getDimensions(self, sensorRefList, inputName="postISRCCD")
An integer coordinate rectangle.
Definition: Box.h:54
daf::base::PropertyList * list
Definition: fits.cc:833
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:703
Base class for constructing calibs.
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
def processSingleBackground(self, dataRef)
Process a single CCD for the background.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def batchWallTime(cls, time, parsedCmd, numCores)