LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
constructCalibs.py
Go to the documentation of this file.
1 import sys
2 import math
3 import time
4 import argparse
5 import traceback
6 import collections
7 
8 import numpy as np
9 
10 from astro_metadata_translator import merge_headers, ObservationGroup
11 from astro_metadata_translator.serialize import dates_to_fits
12 
13 from lsst.pex.config import Config, ConfigurableField, Field, ListField, ConfigField
14 from lsst.pipe.base import Task, Struct, TaskRunner, ArgumentParser
15 import lsst.daf.base as dafBase
16 import lsst.afw.math as afwMath
17 import lsst.afw.detection as afwDet
18 import lsst.afw.image as afwImage
19 import lsst.geom as geom
20 import lsst.meas.algorithms as measAlg
21 from lsst.pipe.tasks.repair import RepairTask
22 from lsst.ip.isr import IsrTask
23 
24 from lsst.ctrl.pool.parallel import BatchPoolTask
25 from lsst.ctrl.pool.pool import Pool, NODE
26 from lsst.pipe.drivers.background import (SkyMeasurementTask, FocalPlaneBackground,
27  FocalPlaneBackgroundConfig, MaskObjectsTask)
28 from lsst.pipe.drivers.visualizeVisit import makeCameraImage
29 
30 from .checksum import checksum
31 from .utils import getDataRef
32 
33 
35  """Parameters controlling the measurement of background statistics"""
36  stat = Field(doc="Statistic to use to estimate background (from lsst.afw.math)", dtype=int,
37  default=int(afwMath.MEANCLIP))
38  clip = Field(doc="Clipping threshold for background",
39  dtype=float, default=3.0)
40  nIter = Field(doc="Clipping iterations for background",
41  dtype=int, default=3)
42  maxVisitsToCalcErrorFromInputVariance = Field(
43  doc="Maximum number of visits to estimate variance from input variance, not per-pixel spread",
44  dtype=int, default=2)
45  mask = ListField(doc="Mask planes to reject",
46  dtype=str, default=["DETECTED", "BAD", "NO_DATA"])
47 
48 
49 class CalibStatsTask(Task):
50  """Measure statistics on the background
51 
52  This can be useful for scaling the background, e.g., for flats and fringe frames.
53  """
54  ConfigClass = CalibStatsConfig
55 
56  def run(self, exposureOrImage):
57  """!Measure a particular statistic on an image (of some sort).
58 
59  @param exposureOrImage Exposure, MaskedImage or Image.
60  @return Value of desired statistic
61  """
62  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
63  afwImage.Mask.getPlaneBitMask(self.config.mask))
64  try:
65  image = exposureOrImage.getMaskedImage()
66  except Exception:
67  try:
68  image = exposureOrImage.getImage()
69  except Exception:
70  image = exposureOrImage
71 
72  return afwMath.makeStatistics(image, self.config.stat, stats).getValue()
73 
74 
76  """Configuration for combining calib images"""
77  rows = Field(doc="Number of rows to read at a time",
78  dtype=int, default=512)
79  mask = ListField(doc="Mask planes to respect", dtype=str,
80  default=["SAT", "DETECTED", "INTRP"])
81  combine = Field(doc="Statistic to use for combination (from lsst.afw.math)", dtype=int,
82  default=int(afwMath.MEANCLIP))
83  clip = Field(doc="Clipping threshold for combination",
84  dtype=float, default=3.0)
85  nIter = Field(doc="Clipping iterations for combination",
86  dtype=int, default=3)
87  stats = ConfigurableField(target=CalibStatsTask,
88  doc="Background statistics configuration")
89 
90 
91 class CalibCombineTask(Task):
92  """Task to combine calib images"""
93  ConfigClass = CalibCombineConfig
94 
95  def __init__(self, *args, **kwargs):
96  Task.__init__(self, *args, **kwargs)
97  self.makeSubtask("stats")
98 
99  def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD"):
100  """!Combine calib images for a single sensor
101 
102  @param sensorRefList List of data references to combine (for a single sensor)
103  @param expScales List of scales to apply for each exposure
104  @param finalScale Desired scale for final combined image
105  @param inputName Data set name for inputs
106  @return combined image
107  """
108  width, height = self.getDimensionsgetDimensions(sensorRefList)
109  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
110  afwImage.Mask.getPlaneBitMask(self.config.mask))
111  numImages = len(sensorRefList)
112  if numImages < 1:
113  raise RuntimeError("No valid input data")
114  if numImages < self.config.stats.maxVisitsToCalcErrorFromInputVariance:
115  stats.setCalcErrorFromInputVariance(True)
116 
117  # Combine images
118  combined = afwImage.MaskedImageF(width, height)
119  imageList = [None]*numImages
120  for start in range(0, height, self.config.rows):
121  rows = min(self.config.rows, height - start)
122  box = geom.Box2I(geom.Point2I(0, start),
123  geom.Extent2I(width, rows))
124  subCombined = combined.Factory(combined, box)
125 
126  for i, sensorRef in enumerate(sensorRefList):
127  if sensorRef is None:
128  imageList[i] = None
129  continue
130  exposure = sensorRef.get(inputName + "_sub", bbox=box)
131  if expScales is not None:
132  self.applyScaleapplyScale(exposure, expScales[i])
133  imageList[i] = exposure.getMaskedImage()
134 
135  self.combinecombine(subCombined, imageList, stats)
136 
137  if finalScale is not None:
138  background = self.stats.run(combined)
139  self.log.info("%s: Measured background of stack is %f; adjusting to %f" %
140  (NODE, background, finalScale))
141  combined *= finalScale / background
142 
143  return combined
144 
145  def getDimensions(self, sensorRefList, inputName="postISRCCD"):
146  """Get dimensions of the inputs"""
147  dimList = []
148  for sensorRef in sensorRefList:
149  if sensorRef is None:
150  continue
151  md = sensorRef.get(inputName + "_md")
152  dimList.append(afwImage.bboxFromMetadata(md).getDimensions())
153  return getSize(dimList)
154 
155  def applyScale(self, exposure, scale=None):
156  """Apply scale to input exposure
157 
158  This implementation applies a flux scaling: the input exposure is
159  divided by the provided scale.
160  """
161  if scale is not None:
162  mi = exposure.getMaskedImage()
163  mi /= scale
164 
165  def combine(self, target, imageList, stats):
166  """!Combine multiple images
167 
168  @param target Target image to receive the combined pixels
169  @param imageList List of input images
170  @param stats Statistics control
171  """
172  images = [img for img in imageList if img is not None]
173  afwMath.statisticsStack(target, images, afwMath.Property(self.config.combine), stats)
174 
175 
176 def getSize(dimList):
177  """Determine a consistent size, given a list of image sizes"""
178  dim = set((w, h) for w, h in dimList)
179  dim.discard(None)
180  if len(dim) != 1:
181  raise RuntimeError("Inconsistent dimensions: %s" % dim)
182  return dim.pop()
183 
184 
185 def dictToTuple(dict_, keys):
186  """!Return a tuple of specific values from a dict
187 
188  This provides a hashable representation of the dict from certain keywords.
189  This can be useful for creating e.g., a tuple of the values in the DataId
190  that identify the CCD.
191 
192  @param dict_ dict to parse
193  @param keys keys to extract (order is important)
194  @return tuple of values
195  """
196  return tuple(dict_[k] for k in keys)
197 
198 
199 def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"]):
200  """!Determine a list of CCDs from exposure references
201 
202  This essentially inverts the exposure-level references (which
203  provides a list of CCDs for each exposure), by providing
204  a dataId list for each CCD. Consider an input list of exposures
205  [e1, e2, e3], and each exposure has CCDs c1 and c2. Then this
206  function returns:
207 
208  {(c1,): [e1c1, e2c1, e3c1], (c2,): [e1c2, e2c2, e3c2]}
209 
210  This is a dict whose keys are tuples of the identifying values of a
211  CCD (usually just the CCD number) and the values are lists of dataIds
212  for that CCD in each exposure. A missing dataId is given the value
213  None.
214 
215  @param expRefList List of data references for exposures
216  @param level Level for the butler to generate CCDs
217  @param ccdKeys DataId keywords that identify a CCD
218  @return dict of data identifier lists for each CCD;
219  keys are values of ccdKeys in order
220  """
221  expIdList = [[ccdRef.dataId for ccdRef in expRef.subItems(
222  level)] for expRef in expRefList]
223 
224  # Determine what additional keys make a CCD from an exposure
225  if len(ccdKeys) != len(set(ccdKeys)):
226  raise RuntimeError("Duplicate keys found in ccdKeys: %s" % ccdKeys)
227  ccdNames = set() # Set of tuples which are values for each of the CCDs in an exposure
228  for ccdIdList in expIdList:
229  for ccdId in ccdIdList:
230  name = dictToTuple(ccdId, ccdKeys)
231  ccdNames.add(name)
232 
233  # Turn the list of CCDs for each exposure into a list of exposures for
234  # each CCD
235  ccdLists = {}
236  for n, ccdIdList in enumerate(expIdList):
237  for ccdId in ccdIdList:
238  name = dictToTuple(ccdId, ccdKeys)
239  if name not in ccdLists:
240  ccdLists[name] = []
241  ccdLists[name].append(ccdId)
242 
243  for ccd in ccdLists:
244  # Sort the list by the dataId values (ordered by key)
245  ccdLists[ccd] = sorted(ccdLists[ccd], key=lambda dd: dictToTuple(dd, sorted(dd.keys())))
246 
247  return ccdLists
248 
249 
250 def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs):
251  """Generate a matrix of results using pool.map
252 
253  The function should have the call signature:
254  func(cache, dataId, *args, **kwargs)
255 
256  We return a dict mapping 'ccd name' to a list of values for
257  each exposure.
258 
259  @param pool Process pool
260  @param func Function to call for each dataId
261  @param ccdIdLists Dict of data identifier lists for each CCD name
262  @return matrix of results
263  """
264  dataIdList = sum(ccdIdLists.values(), [])
265  resultList = pool.map(func, dataIdList, *args, **kwargs)
266  # Piece everything back together
267  data = dict((ccdName, [None] * len(expList)) for ccdName, expList in ccdIdLists.items())
268  indices = dict(sum([[(tuple(dataId.values()) if dataId is not None else None, (ccdName, expNum))
269  for expNum, dataId in enumerate(expList)]
270  for ccdName, expList in ccdIdLists.items()], []))
271  for dataId, result in zip(dataIdList, resultList):
272  if dataId is None:
273  continue
274  ccdName, expNum = indices[tuple(dataId.values())]
275  data[ccdName][expNum] = result
276  return data
277 
278 
279 class CalibIdAction(argparse.Action):
280  """Split name=value pairs and put the result in a dict"""
281 
282  def __call__(self, parser, namespace, values, option_string):
283  output = getattr(namespace, self.dest, {})
284  for nameValue in values:
285  name, sep, valueStr = nameValue.partition("=")
286  if not valueStr:
287  parser.error("%s value %s must be in form name=value" %
288  (option_string, nameValue))
289  output[name] = valueStr
290  setattr(namespace, self.dest, output)
291 
292 
293 class CalibArgumentParser(ArgumentParser):
294  """ArgumentParser for calibration construction"""
295 
296  def __init__(self, calibName, *args, **kwargs):
297  """Add a --calibId argument to the standard pipe_base argument parser"""
298  ArgumentParser.__init__(self, *args, **kwargs)
299  self.calibNamecalibName = calibName
300  self.add_id_argument("--id", datasetType="raw",
301  help="input identifiers, e.g., --id visit=123 ccd=4")
302  self.add_argument("--calibId", nargs="*", action=CalibIdAction, default={},
303  help="identifiers for calib, e.g., --calibId version=1",
304  metavar="KEY=VALUE1[^VALUE2[^VALUE3...]")
305 
306  def parse_args(self, *args, **kwargs):
307  """Parse arguments
308 
309  Checks that the "--calibId" provided works.
310  """
311  namespace = ArgumentParser.parse_args(self, *args, **kwargs)
312 
313  keys = namespace.butler.getKeys(self.calibNamecalibName)
314  parsed = {}
315  for name, value in namespace.calibId.items():
316  if name not in keys:
317  self.error(
318  "%s is not a relevant calib identifier key (%s)" % (name, keys))
319  parsed[name] = keys[name](value)
320  namespace.calibId = parsed
321 
322  return namespace
323 
324 
326  """Configuration for constructing calibs"""
327  clobber = Field(dtype=bool, default=True,
328  doc="Clobber existing processed images?")
329  isr = ConfigurableField(target=IsrTask, doc="ISR configuration")
330  dateObs = Field(dtype=str, default="dateObs",
331  doc="Key for observation date in exposure registry")
332  dateCalib = Field(dtype=str, default="calibDate",
333  doc="Key for calib date in calib registry")
334  filter = Field(dtype=str, default="filter",
335  doc="Key for filter name in exposure/calib registries")
336  combination = ConfigurableField(
337  target=CalibCombineTask, doc="Calib combination configuration")
338  ccdKeys = ListField(dtype=str, default=["ccd"],
339  doc="DataId keywords specifying a CCD")
340  visitKeys = ListField(dtype=str, default=["visit"],
341  doc="DataId keywords specifying a visit")
342  calibKeys = ListField(dtype=str, default=[],
343  doc="DataId keywords specifying a calibration")
344  doCameraImage = Field(dtype=bool, default=True, doc="Create camera overview image?")
345  binning = Field(dtype=int, default=64, doc="Binning to apply for camera image")
346 
347  def setDefaults(self):
348  self.isrisr.doWrite = False
349 
350 
351 class CalibTaskRunner(TaskRunner):
352  """Get parsed values into the CalibTask.run"""
353  @staticmethod
354  def getTargetList(parsedCmd, **kwargs):
355  return [dict(expRefList=parsedCmd.id.refList, butler=parsedCmd.butler, calibId=parsedCmd.calibId)]
356 
357  def __call__(self, args):
358  """Call the Task with the kwargs from getTargetList"""
359  task = self.TaskClass(config=self.config, log=self.log)
360  exitStatus = 0 # exit status for the shell
361  if self.doRaise:
362  result = task.runDataRef(**args)
363  else:
364  try:
365  result = task.runDataRef(**args)
366  except Exception as e:
367  # n.b. The shell exit value is the number of dataRefs returning
368  # non-zero, so the actual value used here is lost
369  exitStatus = 1
370 
371  task.log.fatal("Failed: %s" % e)
372  traceback.print_exc(file=sys.stderr)
373 
374  if self.doReturnResults:
375  return Struct(
376  exitStatus=exitStatus,
377  args=args,
378  metadata=task.metadata,
379  result=result,
380  )
381  else:
382  return Struct(
383  exitStatus=exitStatus,
384  )
385 
386 
388  """!Base class for constructing calibs.
389 
390  This should be subclassed for each of the required calib types.
391  The subclass should be sure to define the following class variables:
392  * _DefaultName: default name of the task, used by CmdLineTask
393  * calibName: name of the calibration data set in the butler
394  The subclass may optionally set:
395  * filterName: filter name to give the resultant calib
396  """
397  ConfigClass = CalibConfig
398  RunnerClass = CalibTaskRunner
399  filterName = None
400  calibName = None
401  exposureTime = 1.0 # sets this exposureTime in the output
402 
403  def __init__(self, *args, **kwargs):
404  """Constructor"""
405  BatchPoolTask.__init__(self, *args, **kwargs)
406  self.makeSubtask("isr")
407  self.makeSubtask("combination")
408 
409  @classmethod
410  def batchWallTime(cls, time, parsedCmd, numCores):
411  numCcds = len(parsedCmd.butler.get("camera"))
412  numExps = len(cls.RunnerClassRunnerClass.getTargetList(
413  parsedCmd)[0]['expRefList'])
414  numCycles = int(numCcds/float(numCores) + 0.5)
415  return time*numExps*numCycles
416 
417  @classmethod
418  def _makeArgumentParser(cls, *args, **kwargs):
419  kwargs.pop("doBatch", False)
420  return CalibArgumentParser(calibName=cls.calibNamecalibName, name=cls._DefaultName, *args, **kwargs)
421 
422  def runDataRef(self, expRefList, butler, calibId):
423  """!Construct a calib from a list of exposure references
424 
425  This is the entry point, called by the TaskRunner.__call__
426 
427  Only the master node executes this method.
428 
429  @param expRefList List of data references at the exposure level
430  @param butler Data butler
431  @param calibId Identifier dict for calib
432  """
433  if len(expRefList) < 1:
434  raise RuntimeError("No valid input data")
435 
436  for expRef in expRefList:
437  self.addMissingKeysaddMissingKeys(expRef.dataId, butler, self.config.ccdKeys, 'raw')
438 
439  outputId = self.getOutputIdgetOutputId(expRefList, calibId)
440  ccdIdLists = getCcdIdListFromExposures(
441  expRefList, level="sensor", ccdKeys=self.config.ccdKeys)
442  self.checkCcdIdListscheckCcdIdLists(ccdIdLists)
443 
444  # Ensure we can generate filenames for each output
445  outputIdItemList = list(outputId.items())
446  for ccdName in ccdIdLists:
447  dataId = dict([(k, ccdName[i]) for i, k in enumerate(self.config.ccdKeys)])
448  dataId.update(outputIdItemList)
449  self.addMissingKeysaddMissingKeys(dataId, butler)
450  dataId.update(outputIdItemList)
451 
452  try:
453  butler.get(self.calibNamecalibName + "_filename", dataId)
454  except Exception as e:
455  raise RuntimeError(
456  "Unable to determine output filename \"%s_filename\" from %s: %s" %
457  (self.calibNamecalibName, dataId, e))
458 
459  processPool = Pool("process")
460  processPool.storeSet(butler=butler)
461 
462  # Scatter: process CCDs independently
463  data = self.scatterProcessscatterProcess(processPool, ccdIdLists)
464 
465  # Gather: determine scalings
466  scales = self.scalescale(ccdIdLists, data)
467 
468  combinePool = Pool("combine")
469  combinePool.storeSet(butler=butler)
470 
471  # Scatter: combine
472  calibs = self.scatterCombinescatterCombine(combinePool, outputId, ccdIdLists, scales)
473 
474  if self.config.doCameraImage:
475  camera = butler.get("camera")
476  # Convert indexing of calibs from "ccdName" to detector ID (as used by makeImageFromCamera)
477  calibs = {butler.get("postISRCCD_detector",
478  dict(zip(self.config.ccdKeys, ccdName))).getId(): calibs[ccdName]
479  for ccdName in ccdIdLists}
480 
481  try:
482  cameraImage = self.makeCameraImagemakeCameraImage(camera, outputId, calibs)
483  butler.put(cameraImage, self.calibNamecalibName + "_camera", dataId)
484  except Exception as exc:
485  self.log.warn("Unable to create camera image: %s" % (exc,))
486 
487  return Struct(
488  outputId=outputId,
489  ccdIdLists=ccdIdLists,
490  scales=scales,
491  calibs=calibs,
492  processPool=processPool,
493  combinePool=combinePool,
494  )
495 
496  def getOutputId(self, expRefList, calibId):
497  """!Generate the data identifier for the output calib
498 
499  The mean date and the common filter are included, using keywords
500  from the configuration. The CCD-specific part is not included
501  in the data identifier.
502 
503  @param expRefList List of data references at exposure level
504  @param calibId Data identifier elements for the calib provided by the user
505  @return data identifier
506  """
507  midTime = 0
508  filterName = None
509  for expRef in expRefList:
510  butler = expRef.getButler()
511  dataId = expRef.dataId
512 
513  midTime += self.getMjdgetMjd(butler, dataId)
514  thisFilter = self.getFiltergetFilter(
515  butler, dataId) if self.filterNamefilterName is None else self.filterNamefilterName
516  if filterName is None:
517  filterName = thisFilter
518  elif filterName != thisFilter:
519  raise RuntimeError("Filter mismatch for %s: %s vs %s" % (
520  dataId, thisFilter, filterName))
521 
522  midTime /= len(expRefList)
523  date = str(dafBase.DateTime(
524  midTime, dafBase.DateTime.MJD).toPython().date())
525 
526  outputId = {self.config.filter: filterName,
527  self.config.dateCalib: date}
528  outputId.update(calibId)
529  return outputId
530 
531  def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
532  """Determine the Modified Julian Date (MJD; in TAI) from a data identifier"""
533  if self.config.dateObs in dataId:
534  dateObs = dataId[self.config.dateObs]
535  else:
536  dateObs = butler.queryMetadata('raw', [self.config.dateObs], dataId)[0]
537  if "T" not in dateObs:
538  dateObs = dateObs + "T12:00:00.0Z"
539  elif not dateObs.endswith("Z"):
540  dateObs += "Z"
541 
542  return dafBase.DateTime(dateObs, timescale).get(dafBase.DateTime.MJD)
543 
544  def getFilter(self, butler, dataId):
545  """Determine the filter from a data identifier"""
546  filt = butler.queryMetadata('raw', [self.config.filter], dataId)[0]
547  return filt
548 
549  def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None):
550  if calibName is None:
551  calibName = self.calibNamecalibName
552 
553  if missingKeys is None:
554  missingKeys = set(butler.getKeys(calibName).keys()) - set(dataId.keys())
555 
556  for k in missingKeys:
557  try:
558  v = butler.queryMetadata('raw', [k], dataId) # n.b. --id refers to 'raw'
559  except Exception:
560  continue
561 
562  if len(v) == 0: # failed to lookup value
563  continue
564 
565  if len(v) == 1:
566  dataId[k] = v[0]
567  else:
568  raise RuntimeError("No unique lookup for %s: %s" % (k, v))
569 
570  def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs):
571  """!Update the metadata from the VisitInfo
572 
573  @param calibImage The image whose metadata is to be set
574  @param exposureTime The exposure time for the image
575  @param darkTime The time since the last read (default: exposureTime)
576  """
577 
578  if darkTime is None:
579  darkTime = exposureTime # avoid warning messages when using calibration products
580 
581  visitInfo = afwImage.VisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
582  md = calibImage.getMetadata()
583 
584  afwImage.setVisitInfoMetadata(md, visitInfo)
585 
586  def scatterProcess(self, pool, ccdIdLists):
587  """!Scatter the processing among the nodes
588 
589  We scatter each CCD independently (exposures aren't grouped together),
590  to make full use of all available processors. This necessitates piecing
591  everything back together in the same format as ccdIdLists afterwards.
592 
593  Only the master node executes this method.
594 
595  @param pool Process pool
596  @param ccdIdLists Dict of data identifier lists for each CCD name
597  @return Dict of lists of returned data for each CCD name
598  """
599  self.log.info("Scatter processing")
600  return mapToMatrix(pool, self.processprocess, ccdIdLists)
601 
602  def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
603  """!Process a CCD, specified by a data identifier
604 
605  After processing, optionally returns a result (produced by
606  the 'processResult' method) calculated from the processed
607  exposure. These results will be gathered by the master node,
608  and is a means for coordinated scaling of all CCDs for flats,
609  etc.
610 
611  Only slave nodes execute this method.
612 
613  @param cache Process pool cache
614  @param ccdId Data identifier for CCD
615  @param outputName Output dataset name for butler
616  @return result from 'processResult'
617  """
618  if ccdId is None:
619  self.log.warn("Null identifier received on %s" % NODE)
620  return None
621  sensorRef = getDataRef(cache.butler, ccdId)
622  if self.config.clobber or not sensorRef.datasetExists(outputName):
623  self.log.info("Processing %s on %s" % (ccdId, NODE))
624  try:
625  exposure = self.processSingleprocessSingle(sensorRef, **kwargs)
626  except Exception as e:
627  self.log.warn("Unable to process %s: %s" % (ccdId, e))
628  raise
629  return None
630  self.processWriteprocessWrite(sensorRef, exposure)
631  else:
632  self.log.info(
633  "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
634  exposure = sensorRef.get(outputName)
635  return self.processResultprocessResult(exposure)
636 
637  def processSingle(self, dataRef):
638  """Process a single CCD, specified by a data reference
639 
640  Generally, this simply means doing ISR.
641 
642  Only slave nodes execute this method.
643  """
644  return self.isr.runDataRef(dataRef).exposure
645 
646  def processWrite(self, dataRef, exposure, outputName="postISRCCD"):
647  """!Write the processed CCD
648 
649  We need to write these out because we can't hold them all in
650  memory at once.
651 
652  Only slave nodes execute this method.
653 
654  @param dataRef Data reference
655  @param exposure CCD exposure to write
656  @param outputName Output dataset name for butler.
657  """
658  dataRef.put(exposure, outputName)
659 
660  def processResult(self, exposure):
661  """Extract processing results from a processed exposure
662 
663  This method generates what is gathered by the master node.
664  This can be a background measurement or similar for scaling
665  flat-fields. It must be picklable!
666 
667  Only slave nodes execute this method.
668  """
669  return None
670 
671  def scale(self, ccdIdLists, data):
672  """!Determine scaling across CCDs and exposures
673 
674  This is necessary mainly for flats, so as to determine a
675  consistent scaling across the entire focal plane. This
676  implementation is simply a placeholder.
677 
678  Only the master node executes this method.
679 
680  @param ccdIdLists Dict of data identifier lists for each CCD tuple
681  @param data Dict of lists of returned data for each CCD tuple
682  @return dict of Struct(ccdScale: scaling for CCD,
683  expScales: scaling for each exposure
684  ) for each CCD tuple
685  """
686  self.log.info("Scale on %s" % NODE)
687  return dict((name, Struct(ccdScale=None, expScales=[None] * len(ccdIdLists[name])))
688  for name in ccdIdLists)
689 
690  def scatterCombine(self, pool, outputId, ccdIdLists, scales):
691  """!Scatter the combination of exposures across multiple nodes
692 
693  In this case, we can only scatter across as many nodes as
694  there are CCDs.
695 
696  Only the master node executes this method.
697 
698  @param pool Process pool
699  @param outputId Output identifier (exposure part only)
700  @param ccdIdLists Dict of data identifier lists for each CCD name
701  @param scales Dict of structs with scales, for each CCD name
702  @param dict of binned images
703  """
704  self.log.info("Scatter combination")
705  data = [Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName]) for
706  ccdName in ccdIdLists]
707  images = pool.map(self.combinecombine, data, outputId)
708  return dict(zip(ccdIdLists.keys(), images))
709 
710  def getFullyQualifiedOutputId(self, ccdName, butler, outputId):
711  """Get fully-qualified output data identifier
712 
713  We may need to look up keys that aren't in the output dataId.
714 
715  @param ccdName Name tuple for CCD
716  @param butler Data butler
717  @param outputId Data identifier for combined image (exposure part only)
718  @return fully-qualified output dataId
719  """
720  fullOutputId = {k: ccdName[i] for i, k in enumerate(self.config.ccdKeys)}
721  fullOutputId.update(outputId)
722  self.addMissingKeysaddMissingKeys(fullOutputId, butler)
723  fullOutputId.update(outputId) # must be after the call to queryMetadata in 'addMissingKeys'
724  return fullOutputId
725 
726  def combine(self, cache, struct, outputId):
727  """!Combine multiple exposures of a particular CCD and write the output
728 
729  Only the slave nodes execute this method.
730 
731  @param cache Process pool cache
732  @param struct Parameters for the combination, which has the following components:
733  * ccdName Name tuple for CCD
734  * ccdIdList List of data identifiers for combination
735  * scales Scales to apply (expScales are scalings for each exposure,
736  ccdScale is final scale for combined image)
737  @param outputId Data identifier for combined image (exposure part only)
738  @return binned calib image
739  """
740  outputId = self.getFullyQualifiedOutputIdgetFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
741  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
742  dataId in struct.ccdIdList]
743  self.log.info("Combining %s on %s" % (outputId, NODE))
744  calib = self.combination.run(dataRefList, expScales=struct.scales.expScales,
745  finalScale=struct.scales.ccdScale)
746 
747  if not hasattr(calib, "getMetadata"):
748  if hasattr(calib, "getVariance"):
749  calib = afwImage.makeExposure(calib)
750  else:
751  calib = afwImage.DecoratedImageF(calib.getImage()) # n.b. hardwires "F" for the output type
752 
753  self.calculateOutputHeaderFromRawscalculateOutputHeaderFromRaws(cache.butler, calib, struct.ccdIdList, outputId)
754 
755  self.updateMetadataupdateMetadata(calib, self.exposureTimeexposureTime)
756 
757  self.recordCalibInputsrecordCalibInputs(cache.butler, calib,
758  struct.ccdIdList, outputId)
759 
760  self.interpolateNansinterpolateNans(calib)
761 
762  self.writewrite(cache.butler, calib, outputId)
763 
764  return afwMath.binImage(calib.getImage(), self.config.binning)
765 
766  def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId):
767  """!Calculate the output header from the raw headers.
768 
769  This metadata will go into the output FITS header. It will include all
770  headers that are identical in all inputs.
771 
772  @param butler Data butler
773  @param calib Combined calib exposure.
774  @param dataIdList List of data identifiers for calibration inputs
775  @param outputId Data identifier for output
776  """
777  header = calib.getMetadata()
778 
779  rawmd = [butler.get("raw_md", dataId) for dataId in dataIdList if
780  dataId is not None]
781 
782  merged = merge_headers(rawmd, mode="drop")
783 
784  # Place merged set into the PropertyList if a value is not
785  # present already
786  # Comments are not present in the merged version so copy them across
787  for k, v in merged.items():
788  if k not in header:
789  comment = rawmd[0].getComment(k) if k in rawmd[0] else None
790  header.set(k, v, comment=comment)
791 
792  # Create an observation group so we can add some standard headers
793  # independent of the form in the input files.
794  # Use try block in case we are dealing with unexpected data headers
795  try:
796  group = ObservationGroup(rawmd, pedantic=False)
797  except Exception:
798  group = None
799 
800  comments = {"TIMESYS": "Time scale for all dates",
801  "DATE-OBS": "Start date of earliest input observation",
802  "MJD-OBS": "[d] Start MJD of earliest input observation",
803  "DATE-END": "End date of oldest input observation",
804  "MJD-END": "[d] End MJD of oldest input observation",
805  "MJD-AVG": "[d] MJD midpoint of all input observations",
806  "DATE-AVG": "Midpoint date of all input observations"}
807 
808  if group is not None:
809  oldest, newest = group.extremes()
810  dateCards = dates_to_fits(oldest.datetime_begin, newest.datetime_end)
811  else:
812  # Fall back to setting a DATE-OBS from the calibDate
813  dateCards = {"DATE-OBS": "{}T00:00:00.00".format(outputId[self.config.dateCalib])}
814  comments["DATE-OBS"] = "Date of start of day of calibration midpoint"
815 
816  for k, v in dateCards.items():
817  header.set(k, v, comment=comments.get(k, None))
818 
819  def recordCalibInputs(self, butler, calib, dataIdList, outputId):
820  """!Record metadata including the inputs and creation details
821 
822  This metadata will go into the FITS header.
823 
824  @param butler Data butler
825  @param calib Combined calib exposure.
826  @param dataIdList List of data identifiers for calibration inputs
827  @param outputId Data identifier for output
828  """
829  header = calib.getMetadata()
830  header.set("OBSTYPE", self.calibNamecalibName) # Used by ingestCalibs.py
831 
832  # date, time, host, and root
833  now = time.localtime()
834  header.set("CALIB_CREATION_DATE", time.strftime("%Y-%m-%d", now))
835  header.set("CALIB_CREATION_TIME", time.strftime("%X %Z", now))
836 
837  # Inputs
838  visits = [str(dictToTuple(dataId, self.config.visitKeys)) for dataId in dataIdList if
839  dataId is not None]
840  for i, v in enumerate(sorted(set(visits))):
841  header.set("CALIB_INPUT_%d" % (i,), v)
842 
843  header.set("CALIB_ID", " ".join("%s=%s" % (key, value)
844  for key, value in outputId.items()))
845  checksum(calib, header)
846 
847  def interpolateNans(self, image):
848  """Interpolate over NANs in the combined image
849 
850  NANs can result from masked areas on the CCD. We don't want them getting
851  into our science images, so we replace them with the median of the image.
852  """
853  if hasattr(image, "getMaskedImage"): # Deal with Exposure vs Image
854  self.interpolateNansinterpolateNans(image.getMaskedImage().getVariance())
855  image = image.getMaskedImage().getImage()
856  if hasattr(image, "getImage"): # Deal with DecoratedImage or MaskedImage vs Image
857  image = image.getImage()
858  array = image.getArray()
859  bad = np.isnan(array)
860  array[bad] = np.median(array[np.logical_not(bad)])
861 
862  def write(self, butler, exposure, dataId):
863  """!Write the final combined calib
864 
865  Only the slave nodes execute this method
866 
867  @param butler Data butler
868  @param exposure CCD exposure to write
869  @param dataId Data identifier for output
870  """
871  self.log.info("Writing %s on %s" % (dataId, NODE))
872  butler.put(exposure, self.calibNamecalibName, dataId)
873 
874  def makeCameraImage(self, camera, dataId, calibs):
875  """!Create and write an image of the entire camera
876 
877  This is useful for judging the quality or getting an overview of
878  the features of the calib.
879 
880  @param camera Camera object
881  @param dataId Data identifier for output
882  @param calibs Dict mapping CCD detector ID to calib image
883  """
884  return makeCameraImage(camera, calibs, self.config.binning)
885 
886  def checkCcdIdLists(self, ccdIdLists):
887  """Check that the list of CCD dataIds is consistent
888 
889  @param ccdIdLists Dict of data identifier lists for each CCD name
890  @return Number of exposures, number of CCDs
891  """
892  visitIdLists = collections.defaultdict(list)
893  for ccdName in ccdIdLists:
894  for dataId in ccdIdLists[ccdName]:
895  visitName = dictToTuple(dataId, self.config.visitKeys)
896  visitIdLists[visitName].append(dataId)
897 
898  numExps = set(len(expList) for expList in ccdIdLists.values())
899  numCcds = set(len(ccdList) for ccdList in visitIdLists.values())
900 
901  if len(numExps) != 1 or len(numCcds) != 1:
902  # Presumably a visit somewhere doesn't have the full complement available.
903  # Dump the information so the user can figure it out.
904  self.log.warn("Number of visits for each CCD: %s",
905  {ccdName: len(ccdIdLists[ccdName]) for ccdName in ccdIdLists})
906  self.log.warn("Number of CCDs for each visit: %s",
907  {vv: len(visitIdLists[vv]) for vv in visitIdLists})
908  raise RuntimeError("Inconsistent number of exposures/CCDs")
909 
910  return numExps.pop(), numCcds.pop()
911 
912 
914  """Configuration for bias construction.
915 
916  No changes required compared to the base class, but
917  subclassed for distinction.
918  """
919  pass
920 
921 
922 class BiasTask(CalibTask):
923  """Bias construction"""
924  ConfigClass = BiasConfig
925  _DefaultName = "bias"
926  calibName = "bias"
927  filterName = "NONE" # Sets this filter name in the output
928  exposureTime = 0.0 # sets this exposureTime in the output
929 
930  @classmethod
931  def applyOverrides(cls, config):
932  """Overrides to apply for bias construction"""
933  config.isr.doBias = False
934  config.isr.doDark = False
935  config.isr.doFlat = False
936  config.isr.doFringe = False
937 
938 
940  """Configuration for dark construction"""
941  doRepair = Field(dtype=bool, default=True, doc="Repair artifacts?")
942  psfFwhm = Field(dtype=float, default=3.0, doc="Repair PSF FWHM (pixels)")
943  psfSize = Field(dtype=int, default=21, doc="Repair PSF size (pixels)")
944  crGrow = Field(dtype=int, default=2, doc="Grow radius for CR (pixels)")
946  target=RepairTask, doc="Task to repair artifacts")
947 
948  def setDefaults(self):
949  CalibConfig.setDefaults(self)
950  self.combinationcombination.mask.append("CR")
951 
952 
954  """Dark construction
955 
956  The only major difference from the base class is a cosmic-ray
957  identification stage, and dividing each image by the dark time
958  to generate images of the dark rate.
959  """
960  ConfigClass = DarkConfig
961  _DefaultName = "dark"
962  calibName = "dark"
963  filterName = "NONE" # Sets this filter name in the output
964 
965  def __init__(self, *args, **kwargs):
966  CalibTask.__init__(self, *args, **kwargs)
967  self.makeSubtask("repair")
968 
969  @classmethod
970  def applyOverrides(cls, config):
971  """Overrides to apply for dark construction"""
972  config.isr.doDark = False
973  config.isr.doFlat = False
974  config.isr.doFringe = False
975 
976  def processSingle(self, sensorRef):
977  """Process a single CCD
978 
979  Besides the regular ISR, also masks cosmic-rays and divides each
980  processed image by the dark time to generate images of the dark rate.
981  The dark time is provided by the 'getDarkTime' method.
982  """
983  exposure = CalibTask.processSingle(self, sensorRef)
984 
985  if self.config.doRepair:
986  psf = measAlg.DoubleGaussianPsf(self.config.psfSize, self.config.psfSize,
987  self.config.psfFwhm/(2*math.sqrt(2*math.log(2))))
988  exposure.setPsf(psf)
989  self.repair.run(exposure, keepCRs=False)
990  if self.config.crGrow > 0:
991  mask = exposure.getMaskedImage().getMask().clone()
992  mask &= mask.getPlaneBitMask("CR")
993  fpSet = afwDet.FootprintSet(
994  mask, afwDet.Threshold(0.5))
995  fpSet = afwDet.FootprintSet(fpSet, self.config.crGrow, True)
996  fpSet.setMask(exposure.getMaskedImage().getMask(), "CR")
997 
998  mi = exposure.getMaskedImage()
999  mi /= self.getDarkTimegetDarkTime(exposure)
1000  return exposure
1001 
1002  def getDarkTime(self, exposure):
1003  """Retrieve the dark time for an exposure"""
1004  darkTime = exposure.getInfo().getVisitInfo().getDarkTime()
1005  if not np.isfinite(darkTime):
1006  raise RuntimeError("Non-finite darkTime")
1007  return darkTime
1008 
1009 
1011  """Configuration for flat construction"""
1012  iterations = Field(dtype=int, default=10,
1013  doc="Number of iterations for scale determination")
1014  stats = ConfigurableField(target=CalibStatsTask,
1015  doc="Background statistics configuration")
1016 
1017 
1019  """Flat construction
1020 
1021  The principal change from the base class involves gathering the background
1022  values from each image and using them to determine the scalings for the final
1023  combination.
1024  """
1025  ConfigClass = FlatConfig
1026  _DefaultName = "flat"
1027  calibName = "flat"
1028 
1029  @classmethod
1030  def applyOverrides(cls, config):
1031  """Overrides for flat construction"""
1032  config.isr.doFlat = False
1033  config.isr.doFringe = False
1034 
1035  def __init__(self, *args, **kwargs):
1036  CalibTask.__init__(self, *args, **kwargs)
1037  self.makeSubtask("stats")
1038 
1039  def processResult(self, exposure):
1040  return self.stats.run(exposure)
1041 
1042  def scale(self, ccdIdLists, data):
1043  """Determine the scalings for the final combination
1044 
1045  We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
1046  of one CCD to all the others in an exposure, and E_j is the scaling
1047  of the exposure. We convert everything to logarithms so we can work
1048  with a linear system. We determine the C_i and E_j from B_ij by iteration,
1049  under the additional constraint that the average CCD scale is unity.
1050 
1051  This algorithm comes from Eugene Magnier and Pan-STARRS.
1052  """
1053  assert len(ccdIdLists.values()) > 0, "No successful CCDs"
1054  lengths = set([len(expList) for expList in ccdIdLists.values()])
1055  assert len(lengths) == 1, "Number of successful exposures for each CCD differs"
1056  assert tuple(lengths)[0] > 0, "No successful exposures"
1057  # Format background measurements into a matrix
1058  indices = dict((name, i) for i, name in enumerate(ccdIdLists))
1059  bgMatrix = np.array([[0.0] * len(expList) for expList in ccdIdLists.values()])
1060  for name in ccdIdLists:
1061  i = indices[name]
1062  bgMatrix[i] = [d if d is not None else np.nan for d in data[name]]
1063 
1064  numpyPrint = np.get_printoptions()
1065  np.set_printoptions(threshold=np.inf)
1066  self.log.info("Input backgrounds: %s" % bgMatrix)
1067 
1068  # Flat-field scaling
1069  numCcds = len(ccdIdLists)
1070  numExps = bgMatrix.shape[1]
1071  # log(Background) for each exposure/component
1072  bgMatrix = np.log(bgMatrix)
1073  bgMatrix = np.ma.masked_array(bgMatrix, ~np.isfinite(bgMatrix))
1074  # Initial guess at log(scale) for each component
1075  compScales = np.zeros(numCcds)
1076  expScales = np.array([(bgMatrix[:, i0] - compScales).mean() for i0 in range(numExps)])
1077 
1078  for iterate in range(self.config.iterations):
1079  compScales = np.array([(bgMatrix[i1, :] - expScales).mean() for i1 in range(numCcds)])
1080  bad = np.isnan(compScales)
1081  if np.any(bad):
1082  # Bad CCDs: just set them to the mean scale
1083  compScales[bad] = compScales[~bad].mean()
1084  expScales = np.array([(bgMatrix[:, i2] - compScales).mean() for i2 in range(numExps)])
1085 
1086  avgScale = np.average(np.exp(compScales))
1087  compScales -= np.log(avgScale)
1088  self.log.debug("Iteration %d exposure scales: %s", iterate, np.exp(expScales))
1089  self.log.debug("Iteration %d component scales: %s", iterate, np.exp(compScales))
1090 
1091  expScales = np.array([(bgMatrix[:, i3] - compScales).mean() for i3 in range(numExps)])
1092 
1093  if np.any(np.isnan(expScales)):
1094  raise RuntimeError("Bad exposure scales: %s --> %s" % (bgMatrix, expScales))
1095 
1096  expScales = np.exp(expScales)
1097  compScales = np.exp(compScales)
1098 
1099  self.log.info("Exposure scales: %s" % expScales)
1100  self.log.info("Component relative scaling: %s" % compScales)
1101  np.set_printoptions(**numpyPrint)
1102 
1103  return dict((ccdName, Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1104  for ccdName in ccdIdLists)
1105 
1106 
1108  """Configuration for fringe construction"""
1109  stats = ConfigurableField(target=CalibStatsTask,
1110  doc="Background statistics configuration")
1111  subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1112  doc="Background configuration")
1113  detection = ConfigurableField(
1114  target=measAlg.SourceDetectionTask, doc="Detection configuration")
1115  detectSigma = Field(dtype=float, default=1.0,
1116  doc="Detection PSF gaussian sigma")
1117 
1118  def setDefaults(self):
1119  CalibConfig.setDefaults(self)
1120  self.detectiondetection.reEstimateBackground = False
1121 
1122 
1124  """Fringe construction task
1125 
1126  The principal change from the base class is that the images are
1127  background-subtracted and rescaled by the background.
1128 
1129  XXX This is probably not right for a straight-up combination, as we
1130  are currently doing, since the fringe amplitudes need not scale with
1131  the continuum.
1132 
1133  XXX Would like to have this do PCA and generate multiple images, but
1134  that will take a bit of work with the persistence code.
1135  """
1136  ConfigClass = FringeConfig
1137  _DefaultName = "fringe"
1138  calibName = "fringe"
1139 
1140  @classmethod
1141  def applyOverrides(cls, config):
1142  """Overrides for fringe construction"""
1143  config.isr.doFringe = False
1144 
1145  def __init__(self, *args, **kwargs):
1146  CalibTask.__init__(self, *args, **kwargs)
1147  self.makeSubtask("detection")
1148  self.makeSubtask("stats")
1149  self.makeSubtask("subtractBackground")
1150 
1151  def processSingle(self, sensorRef):
1152  """Subtract the background and normalise by the background level"""
1153  exposure = CalibTask.processSingle(self, sensorRef)
1154  bgLevel = self.stats.run(exposure)
1155  self.subtractBackground.run(exposure)
1156  mi = exposure.getMaskedImage()
1157  mi /= bgLevel
1158  footprintSets = self.detection.detectFootprints(
1159  exposure, sigma=self.config.detectSigma)
1160  mask = exposure.getMaskedImage().getMask()
1161  detected = 1 << mask.addMaskPlane("DETECTED")
1162  for fpSet in (footprintSets.positive, footprintSets.negative):
1163  if fpSet is not None:
1164  afwDet.setMaskFromFootprintList(
1165  mask, fpSet.getFootprints(), detected)
1166  return exposure
1167 
1168 
1170  """Configuration for sky frame construction"""
1171  detectSigma = Field(dtype=float, default=2.0, doc="Detection PSF gaussian sigma")
1172  maskObjects = ConfigurableField(target=MaskObjectsTask,
1173  doc="Configuration for masking objects aggressively")
1174  largeScaleBackground = ConfigField(dtype=FocalPlaneBackgroundConfig,
1175  doc="Large-scale background configuration")
1176  sky = ConfigurableField(target=SkyMeasurementTask, doc="Sky measurement")
1177  maskThresh = Field(dtype=float, default=3.0, doc="k-sigma threshold for masking pixels")
1178  mask = ListField(dtype=str, default=["BAD", "SAT", "DETECTED", "NO_DATA"],
1179  doc="Mask planes to consider as contaminated")
1180 
1181 
1183  """Task for sky frame construction
1184 
1185  The sky frame is a (relatively) small-scale background
1186  model, the response of the camera to the sky.
1187 
1188  To construct, we first remove a large-scale background (e.g., caused
1189  by moonlight) which may vary from image to image. Then we construct a
1190  model of the sky, which is essentially a binned version of the image
1191  (important configuration parameters: sky.background.[xy]BinSize).
1192  It is these models which are coadded to yield the sky frame.
1193  """
1194  ConfigClass = SkyConfig
1195  _DefaultName = "sky"
1196  calibName = "sky"
1197 
1198  def __init__(self, *args, **kwargs):
1199  CalibTask.__init__(self, *args, **kwargs)
1200  self.makeSubtask("maskObjects")
1201  self.makeSubtask("sky")
1202 
1203  def scatterProcess(self, pool, ccdIdLists):
1204  """!Scatter the processing among the nodes
1205 
1206  Only the master node executes this method, assigning work to the
1207  slaves.
1208 
1209  We measure and subtract off a large-scale background model across
1210  all CCDs, which requires a scatter/gather. Then we process the
1211  individual CCDs, subtracting the large-scale background model and
1212  the residual background model measured. These residuals will be
1213  combined for the sky frame.
1214 
1215  @param pool Process pool
1216  @param ccdIdLists Dict of data identifier lists for each CCD name
1217  @return Dict of lists of returned data for each CCD name
1218  """
1219  self.log.info("Scatter processing")
1220 
1221  numExps = set(len(expList) for expList in ccdIdLists.values()).pop()
1222 
1223  # First subtract off general gradients to make all the exposures look similar.
1224  # We want to preserve the common small-scale structure, which we will coadd.
1225  bgModelList = mapToMatrix(pool, self.measureBackgroundmeasureBackground, ccdIdLists)
1226 
1227  backgrounds = {}
1228  scales = {}
1229  for exp in range(numExps):
1230  bgModels = [bgModelList[ccdName][exp] for ccdName in ccdIdLists]
1231  visit = set(tuple(ccdIdLists[ccdName][exp][key] for key in sorted(self.config.visitKeys)) for
1232  ccdName in ccdIdLists)
1233  assert len(visit) == 1
1234  visit = visit.pop()
1235  bgModel = bgModels[0]
1236  for bg in bgModels[1:]:
1237  bgModel.merge(bg)
1238  self.log.info("Background model min/max for visit %s: %f %f", visit,
1239  np.min(bgModel.getStatsImage().getArray()),
1240  np.max(bgModel.getStatsImage().getArray()))
1241  backgrounds[visit] = bgModel
1242  scales[visit] = np.median(bgModel.getStatsImage().getArray())
1243 
1244  return mapToMatrix(pool, self.processprocess, ccdIdLists, backgrounds=backgrounds, scales=scales)
1245 
1246  def measureBackground(self, cache, dataId):
1247  """!Measure background model for CCD
1248 
1249  This method is executed by the slaves.
1250 
1251  The background models for all CCDs in an exposure will be
1252  combined to form a full focal-plane background model.
1253 
1254  @param cache Process pool cache
1255  @param dataId Data identifier
1256  @return Bcakground model
1257  """
1258  dataRef = getDataRef(cache.butler, dataId)
1259  exposure = self.processSingleBackgroundprocessSingleBackground(dataRef)
1260 
1261  # NAOJ prototype smoothed and then combined the entire image, but it shouldn't be any different
1262  # to bin and combine the binned images except that there's fewer pixels to worry about.
1263  config = self.config.largeScaleBackground
1264  camera = dataRef.get("camera")
1265  bgModel = FocalPlaneBackground.fromCamera(config, camera)
1266  bgModel.addCcd(exposure)
1267  return bgModel
1268 
1269  def processSingleBackground(self, dataRef):
1270  """!Process a single CCD for the background
1271 
1272  This method is executed by the slaves.
1273 
1274  Because we're interested in the background, we detect and mask astrophysical
1275  sources, and pixels above the noise level.
1276 
1277  @param dataRef Data reference for CCD.
1278  @return processed exposure
1279  """
1280  if not self.config.clobber and dataRef.datasetExists("postISRCCD"):
1281  return dataRef.get("postISRCCD")
1282  exposure = CalibTask.processSingle(self, dataRef)
1283 
1284  self.maskObjects.run(exposure, self.config.mask)
1285  dataRef.put(exposure, "postISRCCD")
1286  return exposure
1287 
1288  def processSingle(self, dataRef, backgrounds, scales):
1289  """Process a single CCD, specified by a data reference
1290 
1291  We subtract the appropriate focal plane background model,
1292  divide by the appropriate scale and measure the background.
1293 
1294  Only slave nodes execute this method.
1295 
1296  @param dataRef Data reference for single CCD
1297  @param backgrounds Background model for each visit
1298  @param scales Scales for each visit
1299  @return Processed exposure
1300  """
1301  visit = tuple(dataRef.dataId[key] for key in sorted(self.config.visitKeys))
1302  exposure = dataRef.get("postISRCCD", immediate=True)
1303  image = exposure.getMaskedImage()
1304  detector = exposure.getDetector()
1305  bbox = image.getBBox()
1306 
1307  bgModel = backgrounds[visit]
1308  bg = bgModel.toCcdBackground(detector, bbox)
1309  image -= bg.getImage()
1310  image /= scales[visit]
1311 
1312  bg = self.sky.measureBackground(exposure.getMaskedImage())
1313  dataRef.put(bg, "icExpBackground")
1314  return exposure
1315 
1316  def combine(self, cache, struct, outputId):
1317  """!Combine multiple background models of a particular CCD and write the output
1318 
1319  Only the slave nodes execute this method.
1320 
1321  @param cache Process pool cache
1322  @param struct Parameters for the combination, which has the following components:
1323  * ccdName Name tuple for CCD
1324  * ccdIdList List of data identifiers for combination
1325  @param outputId Data identifier for combined image (exposure part only)
1326  @return binned calib image
1327  """
1328  outputId = self.getFullyQualifiedOutputIdgetFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
1329  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
1330  dataId in struct.ccdIdList]
1331  self.log.info("Combining %s on %s" % (outputId, NODE))
1332  bgList = [dataRef.get("icExpBackground", immediate=True).clone() for dataRef in dataRefList]
1333 
1334  bgExp = self.sky.averageBackgrounds(bgList)
1335 
1336  self.recordCalibInputsrecordCalibInputs(cache.butler, bgExp, struct.ccdIdList, outputId)
1337  cache.butler.put(bgExp, "sky", outputId)
1338  return afwMath.binImage(self.sky.exposureToBackground(bgExp).getImage(), self.config.binning)
int min
A set of Footprints, associated with a MaskedImage.
Definition: FootprintSet.h:53
A Threshold is used to pass a threshold value to detection algorithms.
Definition: Threshold.h:43
Information about a single exposure of an imaging camera.
Definition: VisitInfo.h:68
Pass parameters to a Statistics object.
Definition: Statistics.h:92
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64
An integer coordinate rectangle.
Definition: Box.h:55
def __init__(self, calibName, *args, **kwargs)
def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD")
Combine calib images for a single sensor.
def getDimensions(self, sensorRefList, inputName="postISRCCD")
def combine(self, target, imageList, stats)
Combine multiple images.
def __call__(self, parser, namespace, values, option_string)
def run(self, exposureOrImage)
Measure a particular statistic on an image (of some sort).
Base class for constructing calibs.
def scatterCombine(self, pool, outputId, ccdIdLists, scales)
Scatter the combination of exposures across multiple nodes.
def getFullyQualifiedOutputId(self, ccdName, butler, outputId)
def scale(self, ccdIdLists, data)
Determine scaling across CCDs and exposures.
def makeCameraImage(self, camera, dataId, calibs)
Create and write an image of the entire camera.
def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC)
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def processWrite(self, dataRef, exposure, outputName="postISRCCD")
Write the processed CCD.
def process(self, cache, ccdId, outputName="postISRCCD", **kwargs)
Process a CCD, specified by a data identifier.
def write(self, butler, exposure, dataId)
Write the final combined calib.
def getOutputId(self, expRefList, calibId)
Generate the data identifier for the output calib.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None)
def runDataRef(self, expRefList, butler, calibId)
Construct a calib from a list of exposure references.
def combine(self, cache, struct, outputId)
Combine multiple exposures of a particular CCD and write the output.
def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs)
Update the metadata from the VisitInfo.
def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId)
Calculate the output header from the raw headers.
def recordCalibInputs(self, butler, calib, dataIdList, outputId)
Record metadata including the inputs and creation details.
def measureBackground(self, cache, dataId)
Measure background model for CCD.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def processSingle(self, dataRef, backgrounds, scales)
def processSingleBackground(self, dataRef)
Process a single CCD for the background.
def combine(self, cache, struct, outputId)
Combine multiple background models of a particular CCD and write the output.
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:682
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
Definition: Exposure.h:462
Statistics makeStatistics(lsst::afw::image::Image< Pixel > const &img, lsst::afw::image::Mask< image::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl=StatisticsControl())
Handle a watered-down front-end to the constructor (no variance)
Definition: Statistics.h:359
Property
control what is calculated
Definition: Statistics.h:62
std::shared_ptr< lsst::afw::image::Image< PixelT > > statisticsStack(std::vector< std::shared_ptr< lsst::afw::image::Image< PixelT >>> &images, Property flags, StatisticsControl const &sctrl=StatisticsControl(), std::vector< lsst::afw::image::VariancePixel > const &wvector=std::vector< lsst::afw::image::VariancePixel >(0))
A function to compute some statistics of a stack of Images.
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binX, int const binY, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:44
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def checksum(obj, header=None, sumType="MD5")
Calculate a checksum of an object.
Definition: checksum.py:24
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.
def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs)
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:16