LSSTApplications  17.0+11,17.0+34,17.0+56,17.0+57,17.0+59,17.0+7,17.0-1-g377950a+33,17.0.1-1-g114240f+2,17.0.1-1-g4d4fbc4+28,17.0.1-1-g55520dc+49,17.0.1-1-g5f4ed7e+52,17.0.1-1-g6dd7d69+17,17.0.1-1-g8de6c91+11,17.0.1-1-gb9095d2+7,17.0.1-1-ge9fec5e+5,17.0.1-1-gf4e0155+55,17.0.1-1-gfc65f5f+50,17.0.1-1-gfc6fb1f+20,17.0.1-10-g87f9f3f+1,17.0.1-11-ge9de802+16,17.0.1-16-ga14f7d5c+4,17.0.1-17-gc79d625+1,17.0.1-17-gdae4c4a+8,17.0.1-2-g26618f5+29,17.0.1-2-g54f2ebc+9,17.0.1-2-gf403422+1,17.0.1-20-g2ca2f74+6,17.0.1-23-gf3eadeb7+1,17.0.1-3-g7e86b59+39,17.0.1-3-gb5ca14a,17.0.1-3-gd08d533+40,17.0.1-30-g596af8797,17.0.1-4-g59d126d+4,17.0.1-4-gc69c472+5,17.0.1-6-g5afd9b9+4,17.0.1-7-g35889ee+1,17.0.1-7-gc7c8782+18,17.0.1-9-gc4bbfb2+3,w.2019.22
LSSTDataManagementBasePackage
constructCalibs.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 
3 import sys
4 import math
5 import time
6 import argparse
7 import traceback
8 import collections
9 
10 import numpy as np
11 from builtins import zip
12 from builtins import range
13 
14 from astro_metadata_translator import merge_headers, ObservationGroup
15 from astro_metadata_translator.serialize import dates_to_fits
16 
17 from lsst.pex.config import Config, ConfigurableField, Field, ListField, ConfigField
18 from lsst.pipe.base import Task, Struct, TaskRunner, ArgumentParser
19 import lsst.daf.base as dafBase
20 import lsst.afw.math as afwMath
21 import lsst.afw.geom as afwGeom
22 import lsst.afw.detection as afwDet
23 import lsst.afw.image as afwImage
24 import lsst.meas.algorithms as measAlg
25 from lsst.pipe.tasks.repair import RepairTask
26 from lsst.ip.isr import IsrTask
27 
28 from lsst.ctrl.pool.parallel import BatchPoolTask
29 from lsst.ctrl.pool.pool import Pool, NODE
30 from lsst.pipe.drivers.background import (SkyMeasurementTask, FocalPlaneBackground,
31  FocalPlaneBackgroundConfig, MaskObjectsTask)
32 from lsst.pipe.drivers.visualizeVisit import makeCameraImage
33 
34 from .checksum import checksum
35 from .utils import getDataRef
36 
37 
39  """Parameters controlling the measurement of background statistics"""
40  stat = Field(doc="Statistic to use to estimate background (from lsst.afw.math)", dtype=int,
41  default=int(afwMath.MEANCLIP))
42  clip = Field(doc="Clipping threshold for background",
43  dtype=float, default=3.0)
44  nIter = Field(doc="Clipping iterations for background",
45  dtype=int, default=3)
46  maxVisitsToCalcErrorFromInputVariance = Field(
47  doc="Maximum number of visits to estimate variance from input variance, not per-pixel spread",
48  dtype=int, default=2)
49  mask = ListField(doc="Mask planes to reject",
50  dtype=str, default=["DETECTED", "BAD", "NO_DATA"])
51 
52 
54  """Measure statistics on the background
55 
56  This can be useful for scaling the background, e.g., for flats and fringe frames.
57  """
58  ConfigClass = CalibStatsConfig
59 
60  def run(self, exposureOrImage):
61  """!Measure a particular statistic on an image (of some sort).
62 
63  @param exposureOrImage Exposure, MaskedImage or Image.
64  @return Value of desired statistic
65  """
66  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
67  afwImage.Mask.getPlaneBitMask(self.config.mask))
68  try:
69  image = exposureOrImage.getMaskedImage()
70  except Exception:
71  try:
72  image = exposureOrImage.getImage()
73  except Exception:
74  image = exposureOrImage
75 
76  return afwMath.makeStatistics(image, self.config.stat, stats).getValue()
77 
78 
80  """Configuration for combining calib images"""
81  rows = Field(doc="Number of rows to read at a time",
82  dtype=int, default=512)
83  mask = ListField(doc="Mask planes to respect", dtype=str,
84  default=["SAT", "DETECTED", "INTRP"])
85  combine = Field(doc="Statistic to use for combination (from lsst.afw.math)", dtype=int,
86  default=int(afwMath.MEANCLIP))
87  clip = Field(doc="Clipping threshold for combination",
88  dtype=float, default=3.0)
89  nIter = Field(doc="Clipping iterations for combination",
90  dtype=int, default=3)
91  stats = ConfigurableField(target=CalibStatsTask,
92  doc="Background statistics configuration")
93 
94 
96  """Task to combine calib images"""
97  ConfigClass = CalibCombineConfig
98 
99  def __init__(self, *args, **kwargs):
100  Task.__init__(self, *args, **kwargs)
101  self.makeSubtask("stats")
102 
103  def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD"):
104  """!Combine calib images for a single sensor
105 
106  @param sensorRefList List of data references to combine (for a single sensor)
107  @param expScales List of scales to apply for each exposure
108  @param finalScale Desired scale for final combined image
109  @param inputName Data set name for inputs
110  @return combined image
111  """
112  width, height = self.getDimensions(sensorRefList)
113  stats = afwMath.StatisticsControl(self.config.clip, self.config.nIter,
114  afwImage.Mask.getPlaneBitMask(self.config.mask))
115  numImages = len(sensorRefList)
116  if numImages < 1:
117  raise RuntimeError("No valid input data")
118  if numImages < self.config.stats.maxVisitsToCalcErrorFromInputVariance:
119  stats.setCalcErrorFromInputVariance(True)
120 
121  # Combine images
122  combined = afwImage.MaskedImageF(width, height)
123  imageList = [None]*numImages
124  for start in range(0, height, self.config.rows):
125  rows = min(self.config.rows, height - start)
126  box = afwGeom.Box2I(afwGeom.Point2I(0, start),
127  afwGeom.Extent2I(width, rows))
128  subCombined = combined.Factory(combined, box)
129 
130  for i, sensorRef in enumerate(sensorRefList):
131  if sensorRef is None:
132  imageList[i] = None
133  continue
134  exposure = sensorRef.get(inputName + "_sub", bbox=box)
135  if expScales is not None:
136  self.applyScale(exposure, expScales[i])
137  imageList[i] = exposure.getMaskedImage()
138 
139  self.combine(subCombined, imageList, stats)
140 
141  if finalScale is not None:
142  background = self.stats.run(combined)
143  self.log.info("%s: Measured background of stack is %f; adjusting to %f" %
144  (NODE, background, finalScale))
145  combined *= finalScale / background
146 
147  return combined
148 
149  def getDimensions(self, sensorRefList, inputName="postISRCCD"):
150  """Get dimensions of the inputs"""
151  dimList = []
152  for sensorRef in sensorRefList:
153  if sensorRef is None:
154  continue
155  md = sensorRef.get(inputName + "_md")
156  dimList.append(afwImage.bboxFromMetadata(md).getDimensions())
157  return getSize(dimList)
158 
159  def applyScale(self, exposure, scale=None):
160  """Apply scale to input exposure
161 
162  This implementation applies a flux scaling: the input exposure is
163  divided by the provided scale.
164  """
165  if scale is not None:
166  mi = exposure.getMaskedImage()
167  mi /= scale
168 
169  def combine(self, target, imageList, stats):
170  """!Combine multiple images
171 
172  @param target Target image to receive the combined pixels
173  @param imageList List of input images
174  @param stats Statistics control
175  """
176  images = [img for img in imageList if img is not None]
177  afwMath.statisticsStack(target, images, afwMath.Property(self.config.combine), stats)
178 
179 
180 def getSize(dimList):
181  """Determine a consistent size, given a list of image sizes"""
182  dim = set((w, h) for w, h in dimList)
183  dim.discard(None)
184  if len(dim) != 1:
185  raise RuntimeError("Inconsistent dimensions: %s" % dim)
186  return dim.pop()
187 
188 
189 def dictToTuple(dict_, keys):
190  """!Return a tuple of specific values from a dict
191 
192  This provides a hashable representation of the dict from certain keywords.
193  This can be useful for creating e.g., a tuple of the values in the DataId
194  that identify the CCD.
195 
196  @param dict_ dict to parse
197  @param keys keys to extract (order is important)
198  @return tuple of values
199  """
200  return tuple(dict_[k] for k in keys)
201 
202 
203 def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"]):
204  """!Determine a list of CCDs from exposure references
205 
206  This essentially inverts the exposure-level references (which
207  provides a list of CCDs for each exposure), by providing
208  a dataId list for each CCD. Consider an input list of exposures
209  [e1, e2, e3], and each exposure has CCDs c1 and c2. Then this
210  function returns:
211 
212  {(c1,): [e1c1, e2c1, e3c1], (c2,): [e1c2, e2c2, e3c2]}
213 
214  This is a dict whose keys are tuples of the identifying values of a
215  CCD (usually just the CCD number) and the values are lists of dataIds
216  for that CCD in each exposure. A missing dataId is given the value
217  None.
218 
219  @param expRefList List of data references for exposures
220  @param level Level for the butler to generate CCDs
221  @param ccdKeys DataId keywords that identify a CCD
222  @return dict of data identifier lists for each CCD;
223  keys are values of ccdKeys in order
224  """
225  expIdList = [[ccdRef.dataId for ccdRef in expRef.subItems(
226  level)] for expRef in expRefList]
227 
228  # Determine what additional keys make a CCD from an exposure
229  if len(ccdKeys) != len(set(ccdKeys)):
230  raise RuntimeError("Duplicate keys found in ccdKeys: %s" % ccdKeys)
231  ccdNames = set() # Set of tuples which are values for each of the CCDs in an exposure
232  for ccdIdList in expIdList:
233  for ccdId in ccdIdList:
234  name = dictToTuple(ccdId, ccdKeys)
235  ccdNames.add(name)
236 
237  # Turn the list of CCDs for each exposure into a list of exposures for
238  # each CCD
239  ccdLists = {}
240  for n, ccdIdList in enumerate(expIdList):
241  for ccdId in ccdIdList:
242  name = dictToTuple(ccdId, ccdKeys)
243  if name not in ccdLists:
244  ccdLists[name] = []
245  ccdLists[name].append(ccdId)
246 
247  for ccd in ccdLists:
248  # Sort the list by the dataId values (ordered by key)
249  ccdLists[ccd] = sorted(ccdLists[ccd], key=lambda dd: dictToTuple(dd, sorted(dd.keys())))
250 
251  return ccdLists
252 
253 
254 def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs):
255  """Generate a matrix of results using pool.map
256 
257  The function should have the call signature:
258  func(cache, dataId, *args, **kwargs)
259 
260  We return a dict mapping 'ccd name' to a list of values for
261  each exposure.
262 
263  @param pool Process pool
264  @param func Function to call for each dataId
265  @param ccdIdLists Dict of data identifier lists for each CCD name
266  @return matrix of results
267  """
268  dataIdList = sum(ccdIdLists.values(), [])
269  resultList = pool.map(func, dataIdList, *args, **kwargs)
270  # Piece everything back together
271  data = dict((ccdName, [None] * len(expList)) for ccdName, expList in ccdIdLists.items())
272  indices = dict(sum([[(tuple(dataId.values()) if dataId is not None else None, (ccdName, expNum))
273  for expNum, dataId in enumerate(expList)]
274  for ccdName, expList in ccdIdLists.items()], []))
275  for dataId, result in zip(dataIdList, resultList):
276  if dataId is None:
277  continue
278  ccdName, expNum = indices[tuple(dataId.values())]
279  data[ccdName][expNum] = result
280  return data
281 
282 
283 class CalibIdAction(argparse.Action):
284  """Split name=value pairs and put the result in a dict"""
285 
286  def __call__(self, parser, namespace, values, option_string):
287  output = getattr(namespace, self.dest, {})
288  for nameValue in values:
289  name, sep, valueStr = nameValue.partition("=")
290  if not valueStr:
291  parser.error("%s value %s must be in form name=value" %
292  (option_string, nameValue))
293  output[name] = valueStr
294  setattr(namespace, self.dest, output)
295 
296 
298  """ArgumentParser for calibration construction"""
299 
300  def __init__(self, calibName, *args, **kwargs):
301  """Add a --calibId argument to the standard pipe_base argument parser"""
302  ArgumentParser.__init__(self, *args, **kwargs)
303  self.calibName = calibName
304  self.add_id_argument("--id", datasetType="raw",
305  help="input identifiers, e.g., --id visit=123 ccd=4")
306  self.add_argument("--calibId", nargs="*", action=CalibIdAction, default={},
307  help="identifiers for calib, e.g., --calibId version=1",
308  metavar="KEY=VALUE1[^VALUE2[^VALUE3...]")
309 
310  def parse_args(self, *args, **kwargs):
311  """Parse arguments
312 
313  Checks that the "--calibId" provided works.
314  """
315  namespace = ArgumentParser.parse_args(self, *args, **kwargs)
316 
317  keys = namespace.butler.getKeys(self.calibName)
318  parsed = {}
319  for name, value in namespace.calibId.items():
320  if name not in keys:
321  self.error(
322  "%s is not a relevant calib identifier key (%s)" % (name, keys))
323  parsed[name] = keys[name](value)
324  namespace.calibId = parsed
325 
326  return namespace
327 
328 
330  """Configuration for constructing calibs"""
331  clobber = Field(dtype=bool, default=True,
332  doc="Clobber existing processed images?")
333  isr = ConfigurableField(target=IsrTask, doc="ISR configuration")
334  dateObs = Field(dtype=str, default="dateObs",
335  doc="Key for observation date in exposure registry")
336  dateCalib = Field(dtype=str, default="calibDate",
337  doc="Key for calib date in calib registry")
338  filter = Field(dtype=str, default="filter",
339  doc="Key for filter name in exposure/calib registries")
340  combination = ConfigurableField(
341  target=CalibCombineTask, doc="Calib combination configuration")
342  ccdKeys = ListField(dtype=str, default=["ccd"],
343  doc="DataId keywords specifying a CCD")
344  visitKeys = ListField(dtype=str, default=["visit"],
345  doc="DataId keywords specifying a visit")
346  calibKeys = ListField(dtype=str, default=[],
347  doc="DataId keywords specifying a calibration")
348  doCameraImage = Field(dtype=bool, default=True, doc="Create camera overview image?")
349  binning = Field(dtype=int, default=64, doc="Binning to apply for camera image")
350 
351  def setDefaults(self):
352  self.isr.doWrite = False
353 
354 
356  """Get parsed values into the CalibTask.run"""
357  @staticmethod
358  def getTargetList(parsedCmd, **kwargs):
359  return [dict(expRefList=parsedCmd.id.refList, butler=parsedCmd.butler, calibId=parsedCmd.calibId)]
360 
361  def __call__(self, args):
362  """Call the Task with the kwargs from getTargetList"""
363  task = self.TaskClass(config=self.config, log=self.log)
364  exitStatus = 0 # exit status for the shell
365  if self.doRaise:
366  result = task.runDataRef(**args)
367  else:
368  try:
369  result = task.runDataRef(**args)
370  except Exception as e:
371  # n.b. The shell exit value is the number of dataRefs returning
372  # non-zero, so the actual value used here is lost
373  exitStatus = 1
374 
375  task.log.fatal("Failed: %s" % e)
376  traceback.print_exc(file=sys.stderr)
377 
378  if self.doReturnResults:
379  return Struct(
380  exitStatus=exitStatus,
381  args=args,
382  metadata=task.metadata,
383  result=result,
384  )
385  else:
386  return Struct(
387  exitStatus=exitStatus,
388  )
389 
390 
392  """!Base class for constructing calibs.
393 
394  This should be subclassed for each of the required calib types.
395  The subclass should be sure to define the following class variables:
396  * _DefaultName: default name of the task, used by CmdLineTask
397  * calibName: name of the calibration data set in the butler
398  The subclass may optionally set:
399  * filterName: filter name to give the resultant calib
400  """
401  ConfigClass = CalibConfig
402  RunnerClass = CalibTaskRunner
403  filterName = None
404  calibName = None
405  exposureTime = 1.0 # sets this exposureTime in the output
406 
407  def __init__(self, *args, **kwargs):
408  """Constructor"""
409  BatchPoolTask.__init__(self, *args, **kwargs)
410  self.makeSubtask("isr")
411  self.makeSubtask("combination")
412 
413  @classmethod
414  def batchWallTime(cls, time, parsedCmd, numCores):
415  numCcds = len(parsedCmd.butler.get("camera"))
416  numExps = len(cls.RunnerClass.getTargetList(
417  parsedCmd)[0]['expRefList'])
418  numCycles = int(numCcds/float(numCores) + 0.5)
419  return time*numExps*numCycles
420 
421  @classmethod
422  def _makeArgumentParser(cls, *args, **kwargs):
423  kwargs.pop("doBatch", False)
424  return CalibArgumentParser(calibName=cls.calibName, name=cls._DefaultName, *args, **kwargs)
425 
426  def runDataRef(self, expRefList, butler, calibId):
427  """!Construct a calib from a list of exposure references
428 
429  This is the entry point, called by the TaskRunner.__call__
430 
431  Only the master node executes this method.
432 
433  @param expRefList List of data references at the exposure level
434  @param butler Data butler
435  @param calibId Identifier dict for calib
436  """
437  if len(expRefList) < 1:
438  raise RuntimeError("No valid input data")
439 
440  for expRef in expRefList:
441  self.addMissingKeys(expRef.dataId, butler, self.config.ccdKeys, 'raw')
442 
443  outputId = self.getOutputId(expRefList, calibId)
444  ccdIdLists = getCcdIdListFromExposures(
445  expRefList, level="sensor", ccdKeys=self.config.ccdKeys)
446  self.checkCcdIdLists(ccdIdLists)
447 
448  # Ensure we can generate filenames for each output
449  outputIdItemList = list(outputId.items())
450  for ccdName in ccdIdLists:
451  dataId = dict([(k, ccdName[i]) for i, k in enumerate(self.config.ccdKeys)])
452  dataId.update(outputIdItemList)
453  self.addMissingKeys(dataId, butler)
454  dataId.update(outputIdItemList)
455 
456  try:
457  butler.get(self.calibName + "_filename", dataId)
458  except Exception as e:
459  raise RuntimeError(
460  "Unable to determine output filename \"%s_filename\" from %s: %s" %
461  (self.calibName, dataId, e))
462 
463  processPool = Pool("process")
464  processPool.storeSet(butler=butler)
465 
466  # Scatter: process CCDs independently
467  data = self.scatterProcess(processPool, ccdIdLists)
468 
469  # Gather: determine scalings
470  scales = self.scale(ccdIdLists, data)
471 
472  combinePool = Pool("combine")
473  combinePool.storeSet(butler=butler)
474 
475  # Scatter: combine
476  calibs = self.scatterCombine(combinePool, outputId, ccdIdLists, scales)
477 
478  if self.config.doCameraImage:
479  camera = butler.get("camera")
480  # Convert indexing of calibs from "ccdName" to detector ID (as used by makeImageFromCamera)
481  calibs = {butler.get("postISRCCD_detector",
482  dict(zip(self.config.ccdKeys, ccdName))).getId(): calibs[ccdName]
483  for ccdName in ccdIdLists}
484 
485  try:
486  cameraImage = self.makeCameraImage(camera, outputId, calibs)
487  butler.put(cameraImage, self.calibName + "_camera", dataId)
488  except Exception as exc:
489  self.log.warn("Unable to create camera image: %s" % (exc,))
490 
491  return Struct(
492  outputId=outputId,
493  ccdIdLists=ccdIdLists,
494  scales=scales,
495  calibs=calibs,
496  processPool=processPool,
497  combinePool=combinePool,
498  )
499 
500  def getOutputId(self, expRefList, calibId):
501  """!Generate the data identifier for the output calib
502 
503  The mean date and the common filter are included, using keywords
504  from the configuration. The CCD-specific part is not included
505  in the data identifier.
506 
507  @param expRefList List of data references at exposure level
508  @param calibId Data identifier elements for the calib provided by the user
509  @return data identifier
510  """
511  midTime = 0
512  filterName = None
513  for expRef in expRefList:
514  butler = expRef.getButler()
515  dataId = expRef.dataId
516 
517  midTime += self.getMjd(butler, dataId)
518  thisFilter = self.getFilter(
519  butler, dataId) if self.filterName is None else self.filterName
520  if filterName is None:
521  filterName = thisFilter
522  elif filterName != thisFilter:
523  raise RuntimeError("Filter mismatch for %s: %s vs %s" % (
524  dataId, thisFilter, filterName))
525 
526  midTime /= len(expRefList)
527  date = str(dafBase.DateTime(
528  midTime, dafBase.DateTime.MJD).toPython().date())
529 
530  outputId = {self.config.filter: filterName,
531  self.config.dateCalib: date}
532  outputId.update(calibId)
533  return outputId
534 
535  def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
536  """Determine the Modified Julian Date (MJD; in TAI) from a data identifier"""
537  if self.config.dateObs in dataId:
538  dateObs = dataId[self.config.dateObs]
539  else:
540  dateObs = butler.queryMetadata('raw', [self.config.dateObs], dataId)[0]
541  if "T" not in dateObs:
542  dateObs = dateObs + "T12:00:00.0Z"
543  elif not dateObs.endswith("Z"):
544  dateObs += "Z"
545 
546  return dafBase.DateTime(dateObs, timescale).get(dafBase.DateTime.MJD)
547 
548  def getFilter(self, butler, dataId):
549  """Determine the filter from a data identifier"""
550  filt = butler.queryMetadata('raw', [self.config.filter], dataId)[0]
551  return filt
552 
553  def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None):
554  if calibName is None:
555  calibName = self.calibName
556 
557  if missingKeys is None:
558  missingKeys = set(butler.getKeys(calibName).keys()) - set(dataId.keys())
559 
560  for k in missingKeys:
561  try:
562  v = butler.queryMetadata('raw', [k], dataId) # n.b. --id refers to 'raw'
563  except Exception:
564  continue
565 
566  if len(v) == 0: # failed to lookup value
567  continue
568 
569  if len(v) == 1:
570  dataId[k] = v[0]
571  else:
572  raise RuntimeError("No unique lookup for %s: %s" % (k, v))
573 
574  def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs):
575  """!Update the metadata from the VisitInfo
576 
577  @param calibImage The image whose metadata is to be set
578  @param exposureTime The exposure time for the image
579  @param darkTime The time since the last read (default: exposureTime)
580  """
581 
582  if darkTime is None:
583  darkTime = exposureTime # avoid warning messages when using calibration products
584 
585  visitInfo = afwImage.makeVisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
586  md = calibImage.getMetadata()
587 
588  afwImage.setVisitInfoMetadata(md, visitInfo)
589 
590  def scatterProcess(self, pool, ccdIdLists):
591  """!Scatter the processing among the nodes
592 
593  We scatter each CCD independently (exposures aren't grouped together),
594  to make full use of all available processors. This necessitates piecing
595  everything back together in the same format as ccdIdLists afterwards.
596 
597  Only the master node executes this method.
598 
599  @param pool Process pool
600  @param ccdIdLists Dict of data identifier lists for each CCD name
601  @return Dict of lists of returned data for each CCD name
602  """
603  self.log.info("Scatter processing")
604  return mapToMatrix(pool, self.process, ccdIdLists)
605 
606  def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
607  """!Process a CCD, specified by a data identifier
608 
609  After processing, optionally returns a result (produced by
610  the 'processResult' method) calculated from the processed
611  exposure. These results will be gathered by the master node,
612  and is a means for coordinated scaling of all CCDs for flats,
613  etc.
614 
615  Only slave nodes execute this method.
616 
617  @param cache Process pool cache
618  @param ccdId Data identifier for CCD
619  @param outputName Output dataset name for butler
620  @return result from 'processResult'
621  """
622  if ccdId is None:
623  self.log.warn("Null identifier received on %s" % NODE)
624  return None
625  sensorRef = getDataRef(cache.butler, ccdId)
626  if self.config.clobber or not sensorRef.datasetExists(outputName):
627  self.log.info("Processing %s on %s" % (ccdId, NODE))
628  try:
629  exposure = self.processSingle(sensorRef, **kwargs)
630  except Exception as e:
631  self.log.warn("Unable to process %s: %s" % (ccdId, e))
632  raise
633  return None
634  self.processWrite(sensorRef, exposure)
635  else:
636  self.log.info(
637  "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
638  exposure = sensorRef.get(outputName)
639  return self.processResult(exposure)
640 
641  def processSingle(self, dataRef):
642  """Process a single CCD, specified by a data reference
643 
644  Generally, this simply means doing ISR.
645 
646  Only slave nodes execute this method.
647  """
648  return self.isr.runDataRef(dataRef).exposure
649 
650  def processWrite(self, dataRef, exposure, outputName="postISRCCD"):
651  """!Write the processed CCD
652 
653  We need to write these out because we can't hold them all in
654  memory at once.
655 
656  Only slave nodes execute this method.
657 
658  @param dataRef Data reference
659  @param exposure CCD exposure to write
660  @param outputName Output dataset name for butler.
661  """
662  dataRef.put(exposure, outputName)
663 
664  def processResult(self, exposure):
665  """Extract processing results from a processed exposure
666 
667  This method generates what is gathered by the master node.
668  This can be a background measurement or similar for scaling
669  flat-fields. It must be picklable!
670 
671  Only slave nodes execute this method.
672  """
673  return None
674 
675  def scale(self, ccdIdLists, data):
676  """!Determine scaling across CCDs and exposures
677 
678  This is necessary mainly for flats, so as to determine a
679  consistent scaling across the entire focal plane. This
680  implementation is simply a placeholder.
681 
682  Only the master node executes this method.
683 
684  @param ccdIdLists Dict of data identifier lists for each CCD tuple
685  @param data Dict of lists of returned data for each CCD tuple
686  @return dict of Struct(ccdScale: scaling for CCD,
687  expScales: scaling for each exposure
688  ) for each CCD tuple
689  """
690  self.log.info("Scale on %s" % NODE)
691  return dict((name, Struct(ccdScale=None, expScales=[None] * len(ccdIdLists[name])))
692  for name in ccdIdLists)
693 
694  def scatterCombine(self, pool, outputId, ccdIdLists, scales):
695  """!Scatter the combination of exposures across multiple nodes
696 
697  In this case, we can only scatter across as many nodes as
698  there are CCDs.
699 
700  Only the master node executes this method.
701 
702  @param pool Process pool
703  @param outputId Output identifier (exposure part only)
704  @param ccdIdLists Dict of data identifier lists for each CCD name
705  @param scales Dict of structs with scales, for each CCD name
706  @param dict of binned images
707  """
708  self.log.info("Scatter combination")
709  data = [Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName]) for
710  ccdName in ccdIdLists]
711  images = pool.map(self.combine, data, outputId)
712  return dict(zip(ccdIdLists.keys(), images))
713 
714  def getFullyQualifiedOutputId(self, ccdName, butler, outputId):
715  """Get fully-qualified output data identifier
716 
717  We may need to look up keys that aren't in the output dataId.
718 
719  @param ccdName Name tuple for CCD
720  @param butler Data butler
721  @param outputId Data identifier for combined image (exposure part only)
722  @return fully-qualified output dataId
723  """
724  fullOutputId = {k: ccdName[i] for i, k in enumerate(self.config.ccdKeys)}
725  fullOutputId.update(outputId)
726  self.addMissingKeys(fullOutputId, butler)
727  fullOutputId.update(outputId) # must be after the call to queryMetadata in 'addMissingKeys'
728  return fullOutputId
729 
730  def combine(self, cache, struct, outputId):
731  """!Combine multiple exposures of a particular CCD and write the output
732 
733  Only the slave nodes execute this method.
734 
735  @param cache Process pool cache
736  @param struct Parameters for the combination, which has the following components:
737  * ccdName Name tuple for CCD
738  * ccdIdList List of data identifiers for combination
739  * scales Scales to apply (expScales are scalings for each exposure,
740  ccdScale is final scale for combined image)
741  @param outputId Data identifier for combined image (exposure part only)
742  @return binned calib image
743  """
744  outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
745  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
746  dataId in struct.ccdIdList]
747  self.log.info("Combining %s on %s" % (outputId, NODE))
748  calib = self.combination.run(dataRefList, expScales=struct.scales.expScales,
749  finalScale=struct.scales.ccdScale)
750 
751  if not hasattr(calib, "getMetadata"):
752  if hasattr(calib, "getVariance"):
753  calib = afwImage.makeExposure(calib)
754  else:
755  calib = afwImage.DecoratedImageF(calib.getImage()) # n.b. hardwires "F" for the output type
756 
757  self.calculateOutputHeaderFromRaws(cache.butler, calib, struct.ccdIdList, outputId)
758 
759  self.updateMetadata(calib, self.exposureTime)
760 
761  self.recordCalibInputs(cache.butler, calib,
762  struct.ccdIdList, outputId)
763 
764  self.interpolateNans(calib)
765 
766  self.write(cache.butler, calib, outputId)
767 
768  return afwMath.binImage(calib.getImage(), self.config.binning)
769 
770  def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId):
771  """!Calculate the output header from the raw headers.
772 
773  This metadata will go into the output FITS header. It will include all
774  headers that are identical in all inputs.
775 
776  @param butler Data butler
777  @param calib Combined calib exposure.
778  @param dataIdList List of data identifiers for calibration inputs
779  @param outputId Data identifier for output
780  """
781  header = calib.getMetadata()
782 
783  rawmd = [butler.get("raw_md", dataId) for dataId in dataIdList if
784  dataId is not None]
785 
786  merged = merge_headers(rawmd, mode="drop")
787 
788  # Place merged set into the PropertyList if a value is not
789  # present already
790  # Comments are not present in the merged version so copy them across
791  for k, v in merged.items():
792  if k not in header:
793  comment = rawmd[0].getComment(k) if k in rawmd[0] else None
794  header.set(k, v, comment=comment)
795 
796  # Create an observation group so we can add some standard headers
797  # independent of the form in the input files.
798  # Use try block in case we are dealing with unexpected data headers
799  try:
800  group = ObservationGroup(rawmd, pedantic=False)
801  except Exception:
802  group = None
803 
804  comments = {"TIMESYS": "Time scale for all dates",
805  "DATE-OBS": "Start date of earliest input observation",
806  "MJD-OBS": "[d] Start MJD of earliest input observation",
807  "DATE-END": "End date of oldest input observation",
808  "MJD-END": "[d] End MJD of oldest input observation",
809  "MJD-AVG": "[d] MJD midpoint of all input observations",
810  "DATE-AVG": "Midpoint date of all input observations"}
811 
812  if group is not None:
813  oldest, newest = group.extremes()
814  dateCards = dates_to_fits(oldest.datetime_begin, newest.datetime_end)
815  else:
816  # Fall back to setting a DATE-OBS from the calibDate
817  dateCards = {"DATE-OBS": "{}T00:00:00.00".format(outputId[self.config.dateCalib])}
818  comments["DATE-OBS"] = "Date of start of day of calibration midpoint"
819 
820  for k, v in dateCards.items():
821  header.set(k, v, comment=comments.get(k, None))
822 
823  def recordCalibInputs(self, butler, calib, dataIdList, outputId):
824  """!Record metadata including the inputs and creation details
825 
826  This metadata will go into the FITS header.
827 
828  @param butler Data butler
829  @param calib Combined calib exposure.
830  @param dataIdList List of data identifiers for calibration inputs
831  @param outputId Data identifier for output
832  """
833  header = calib.getMetadata()
834  header.set("OBSTYPE", self.calibName) # Used by ingestCalibs.py
835 
836  # date, time, host, and root
837  now = time.localtime()
838  header.set("CALIB_CREATION_DATE", time.strftime("%Y-%m-%d", now))
839  header.set("CALIB_CREATION_TIME", time.strftime("%X %Z", now))
840 
841  # Inputs
842  visits = [str(dictToTuple(dataId, self.config.visitKeys)) for dataId in dataIdList if
843  dataId is not None]
844  for i, v in enumerate(sorted(set(visits))):
845  header.set("CALIB_INPUT_%d" % (i,), v)
846 
847  header.set("CALIB_ID", " ".join("%s=%s" % (key, value)
848  for key, value in outputId.items()))
849  checksum(calib, header)
850 
851  def interpolateNans(self, image):
852  """Interpolate over NANs in the combined image
853 
854  NANs can result from masked areas on the CCD. We don't want them getting
855  into our science images, so we replace them with the median of the image.
856  """
857  if hasattr(image, "getMaskedImage"): # Deal with Exposure vs Image
858  self.interpolateNans(image.getMaskedImage().getVariance())
859  image = image.getMaskedImage().getImage()
860  if hasattr(image, "getImage"): # Deal with DecoratedImage or MaskedImage vs Image
861  image = image.getImage()
862  array = image.getArray()
863  bad = np.isnan(array)
864  array[bad] = np.median(array[np.logical_not(bad)])
865 
866  def write(self, butler, exposure, dataId):
867  """!Write the final combined calib
868 
869  Only the slave nodes execute this method
870 
871  @param butler Data butler
872  @param exposure CCD exposure to write
873  @param dataId Data identifier for output
874  """
875  self.log.info("Writing %s on %s" % (dataId, NODE))
876  butler.put(exposure, self.calibName, dataId)
877 
878  def makeCameraImage(self, camera, dataId, calibs):
879  """!Create and write an image of the entire camera
880 
881  This is useful for judging the quality or getting an overview of
882  the features of the calib.
883 
884  @param camera Camera object
885  @param dataId Data identifier for output
886  @param calibs Dict mapping CCD detector ID to calib image
887  """
888  return makeCameraImage(camera, calibs, self.config.binning)
889 
890  def checkCcdIdLists(self, ccdIdLists):
891  """Check that the list of CCD dataIds is consistent
892 
893  @param ccdIdLists Dict of data identifier lists for each CCD name
894  @return Number of exposures, number of CCDs
895  """
896  visitIdLists = collections.defaultdict(list)
897  for ccdName in ccdIdLists:
898  for dataId in ccdIdLists[ccdName]:
899  visitName = dictToTuple(dataId, self.config.visitKeys)
900  visitIdLists[visitName].append(dataId)
901 
902  numExps = set(len(expList) for expList in ccdIdLists.values())
903  numCcds = set(len(ccdList) for ccdList in visitIdLists.values())
904 
905  if len(numExps) != 1 or len(numCcds) != 1:
906  # Presumably a visit somewhere doesn't have the full complement available.
907  # Dump the information so the user can figure it out.
908  self.log.warn("Number of visits for each CCD: %s",
909  {ccdName: len(ccdIdLists[ccdName]) for ccdName in ccdIdLists})
910  self.log.warn("Number of CCDs for each visit: %s",
911  {vv: len(visitIdLists[vv]) for vv in visitIdLists})
912  raise RuntimeError("Inconsistent number of exposures/CCDs")
913 
914  return numExps.pop(), numCcds.pop()
915 
916 
918  """Configuration for bias construction.
919 
920  No changes required compared to the base class, but
921  subclassed for distinction.
922  """
923  pass
924 
925 
926 class BiasTask(CalibTask):
927  """Bias construction"""
928  ConfigClass = BiasConfig
929  _DefaultName = "bias"
930  calibName = "bias"
931  filterName = "NONE" # Sets this filter name in the output
932  exposureTime = 0.0 # sets this exposureTime in the output
933 
934  @classmethod
935  def applyOverrides(cls, config):
936  """Overrides to apply for bias construction"""
937  config.isr.doBias = False
938  config.isr.doDark = False
939  config.isr.doFlat = False
940  config.isr.doFringe = False
941 
942 
944  """Configuration for dark construction"""
945  doRepair = Field(dtype=bool, default=True, doc="Repair artifacts?")
946  psfFwhm = Field(dtype=float, default=3.0, doc="Repair PSF FWHM (pixels)")
947  psfSize = Field(dtype=int, default=21, doc="Repair PSF size (pixels)")
948  crGrow = Field(dtype=int, default=2, doc="Grow radius for CR (pixels)")
950  target=RepairTask, doc="Task to repair artifacts")
951 
952  def setDefaults(self):
953  CalibConfig.setDefaults(self)
954  self.combination.mask.append("CR")
955 
956 
958  """Dark construction
959 
960  The only major difference from the base class is a cosmic-ray
961  identification stage, and dividing each image by the dark time
962  to generate images of the dark rate.
963  """
964  ConfigClass = DarkConfig
965  _DefaultName = "dark"
966  calibName = "dark"
967  filterName = "NONE" # Sets this filter name in the output
968 
969  def __init__(self, *args, **kwargs):
970  CalibTask.__init__(self, *args, **kwargs)
971  self.makeSubtask("repair")
972 
973  @classmethod
974  def applyOverrides(cls, config):
975  """Overrides to apply for dark construction"""
976  config.isr.doDark = False
977  config.isr.doFlat = False
978  config.isr.doFringe = False
979 
980  def processSingle(self, sensorRef):
981  """Process a single CCD
982 
983  Besides the regular ISR, also masks cosmic-rays and divides each
984  processed image by the dark time to generate images of the dark rate.
985  The dark time is provided by the 'getDarkTime' method.
986  """
987  exposure = CalibTask.processSingle(self, sensorRef)
988 
989  if self.config.doRepair:
990  psf = measAlg.DoubleGaussianPsf(self.config.psfSize, self.config.psfSize,
991  self.config.psfFwhm/(2*math.sqrt(2*math.log(2))))
992  exposure.setPsf(psf)
993  self.repair.run(exposure, keepCRs=False)
994  if self.config.crGrow > 0:
995  mask = exposure.getMaskedImage().getMask().clone()
996  mask &= mask.getPlaneBitMask("CR")
997  fpSet = afwDet.FootprintSet(
998  mask, afwDet.Threshold(0.5))
999  fpSet = afwDet.FootprintSet(fpSet, self.config.crGrow, True)
1000  fpSet.setMask(exposure.getMaskedImage().getMask(), "CR")
1001 
1002  mi = exposure.getMaskedImage()
1003  mi /= self.getDarkTime(exposure)
1004  return exposure
1005 
1006  def getDarkTime(self, exposure):
1007  """Retrieve the dark time for an exposure"""
1008  darkTime = exposure.getInfo().getVisitInfo().getDarkTime()
1009  if not np.isfinite(darkTime):
1010  raise RuntimeError("Non-finite darkTime")
1011  return darkTime
1012 
1013 
1015  """Configuration for flat construction"""
1016  iterations = Field(dtype=int, default=10,
1017  doc="Number of iterations for scale determination")
1018  stats = ConfigurableField(target=CalibStatsTask,
1019  doc="Background statistics configuration")
1020 
1021 
1023  """Flat construction
1024 
1025  The principal change from the base class involves gathering the background
1026  values from each image and using them to determine the scalings for the final
1027  combination.
1028  """
1029  ConfigClass = FlatConfig
1030  _DefaultName = "flat"
1031  calibName = "flat"
1032 
1033  @classmethod
1034  def applyOverrides(cls, config):
1035  """Overrides for flat construction"""
1036  config.isr.doFlat = False
1037  config.isr.doFringe = False
1038 
1039  def __init__(self, *args, **kwargs):
1040  CalibTask.__init__(self, *args, **kwargs)
1041  self.makeSubtask("stats")
1042 
1043  def processResult(self, exposure):
1044  return self.stats.run(exposure)
1045 
1046  def scale(self, ccdIdLists, data):
1047  """Determine the scalings for the final combination
1048 
1049  We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
1050  of one CCD to all the others in an exposure, and E_j is the scaling
1051  of the exposure. We convert everything to logarithms so we can work
1052  with a linear system. We determine the C_i and E_j from B_ij by iteration,
1053  under the additional constraint that the average CCD scale is unity.
1054 
1055  This algorithm comes from Eugene Magnier and Pan-STARRS.
1056  """
1057  assert len(ccdIdLists.values()) > 0, "No successful CCDs"
1058  lengths = set([len(expList) for expList in ccdIdLists.values()])
1059  assert len(lengths) == 1, "Number of successful exposures for each CCD differs"
1060  assert tuple(lengths)[0] > 0, "No successful exposures"
1061  # Format background measurements into a matrix
1062  indices = dict((name, i) for i, name in enumerate(ccdIdLists))
1063  bgMatrix = np.array([[0.0] * len(expList) for expList in ccdIdLists.values()])
1064  for name in ccdIdLists:
1065  i = indices[name]
1066  bgMatrix[i] = [d if d is not None else np.nan for d in data[name]]
1067 
1068  numpyPrint = np.get_printoptions()
1069  np.set_printoptions(threshold=np.inf)
1070  self.log.info("Input backgrounds: %s" % bgMatrix)
1071 
1072  # Flat-field scaling
1073  numCcds = len(ccdIdLists)
1074  numExps = bgMatrix.shape[1]
1075  # log(Background) for each exposure/component
1076  bgMatrix = np.log(bgMatrix)
1077  bgMatrix = np.ma.masked_array(bgMatrix, ~np.isfinite(bgMatrix))
1078  # Initial guess at log(scale) for each component
1079  compScales = np.zeros(numCcds)
1080  expScales = np.array([(bgMatrix[:, i0] - compScales).mean() for i0 in range(numExps)])
1081 
1082  for iterate in range(self.config.iterations):
1083  compScales = np.array([(bgMatrix[i1, :] - expScales).mean() for i1 in range(numCcds)])
1084  bad = np.isnan(compScales)
1085  if np.any(bad):
1086  # Bad CCDs: just set them to the mean scale
1087  compScales[bad] = compScales[~bad].mean()
1088  expScales = np.array([(bgMatrix[:, i2] - compScales).mean() for i2 in range(numExps)])
1089 
1090  avgScale = np.average(np.exp(compScales))
1091  compScales -= np.log(avgScale)
1092  self.log.debug("Iteration %d exposure scales: %s", iterate, np.exp(expScales))
1093  self.log.debug("Iteration %d component scales: %s", iterate, np.exp(compScales))
1094 
1095  expScales = np.array([(bgMatrix[:, i3] - compScales).mean() for i3 in range(numExps)])
1096 
1097  if np.any(np.isnan(expScales)):
1098  raise RuntimeError("Bad exposure scales: %s --> %s" % (bgMatrix, expScales))
1099 
1100  expScales = np.exp(expScales)
1101  compScales = np.exp(compScales)
1102 
1103  self.log.info("Exposure scales: %s" % expScales)
1104  self.log.info("Component relative scaling: %s" % compScales)
1105  np.set_printoptions(**numpyPrint)
1106 
1107  return dict((ccdName, Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1108  for ccdName in ccdIdLists)
1109 
1110 
1112  """Configuration for fringe construction"""
1113  stats = ConfigurableField(target=CalibStatsTask,
1114  doc="Background statistics configuration")
1115  subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1116  doc="Background configuration")
1117  detection = ConfigurableField(
1118  target=measAlg.SourceDetectionTask, doc="Detection configuration")
1119  detectSigma = Field(dtype=float, default=1.0,
1120  doc="Detection PSF gaussian sigma")
1121 
1122  def setDefaults(self):
1123  CalibConfig.setDefaults(self)
1124  self.detection.reEstimateBackground = False
1125 
1126 
1128  """Fringe construction task
1129 
1130  The principal change from the base class is that the images are
1131  background-subtracted and rescaled by the background.
1132 
1133  XXX This is probably not right for a straight-up combination, as we
1134  are currently doing, since the fringe amplitudes need not scale with
1135  the continuum.
1136 
1137  XXX Would like to have this do PCA and generate multiple images, but
1138  that will take a bit of work with the persistence code.
1139  """
1140  ConfigClass = FringeConfig
1141  _DefaultName = "fringe"
1142  calibName = "fringe"
1143 
1144  @classmethod
1145  def applyOverrides(cls, config):
1146  """Overrides for fringe construction"""
1147  config.isr.doFringe = False
1148 
1149  def __init__(self, *args, **kwargs):
1150  CalibTask.__init__(self, *args, **kwargs)
1151  self.makeSubtask("detection")
1152  self.makeSubtask("stats")
1153  self.makeSubtask("subtractBackground")
1154 
1155  def processSingle(self, sensorRef):
1156  """Subtract the background and normalise by the background level"""
1157  exposure = CalibTask.processSingle(self, sensorRef)
1158  bgLevel = self.stats.run(exposure)
1159  self.subtractBackground.run(exposure)
1160  mi = exposure.getMaskedImage()
1161  mi /= bgLevel
1162  footprintSets = self.detection.detectFootprints(
1163  exposure, sigma=self.config.detectSigma)
1164  mask = exposure.getMaskedImage().getMask()
1165  detected = 1 << mask.addMaskPlane("DETECTED")
1166  for fpSet in (footprintSets.positive, footprintSets.negative):
1167  if fpSet is not None:
1168  afwDet.setMaskFromFootprintList(
1169  mask, fpSet.getFootprints(), detected)
1170  return exposure
1171 
1172 
1174  """Configuration for sky frame construction"""
1175  detectSigma = Field(dtype=float, default=2.0, doc="Detection PSF gaussian sigma")
1176  maskObjects = ConfigurableField(target=MaskObjectsTask,
1177  doc="Configuration for masking objects aggressively")
1178  largeScaleBackground = ConfigField(dtype=FocalPlaneBackgroundConfig,
1179  doc="Large-scale background configuration")
1180  sky = ConfigurableField(target=SkyMeasurementTask, doc="Sky measurement")
1181  maskThresh = Field(dtype=float, default=3.0, doc="k-sigma threshold for masking pixels")
1182  mask = ListField(dtype=str, default=["BAD", "SAT", "DETECTED", "NO_DATA"],
1183  doc="Mask planes to consider as contaminated")
1184 
1185 
1187  """Task for sky frame construction
1188 
1189  The sky frame is a (relatively) small-scale background
1190  model, the response of the camera to the sky.
1191 
1192  To construct, we first remove a large-scale background (e.g., caused
1193  by moonlight) which may vary from image to image. Then we construct a
1194  model of the sky, which is essentially a binned version of the image
1195  (important configuration parameters: sky.background.[xy]BinSize).
1196  It is these models which are coadded to yield the sky frame.
1197  """
1198  ConfigClass = SkyConfig
1199  _DefaultName = "sky"
1200  calibName = "sky"
1201 
1202  def __init__(self, *args, **kwargs):
1203  CalibTask.__init__(self, *args, **kwargs)
1204  self.makeSubtask("maskObjects")
1205  self.makeSubtask("sky")
1206 
1207  def scatterProcess(self, pool, ccdIdLists):
1208  """!Scatter the processing among the nodes
1209 
1210  Only the master node executes this method, assigning work to the
1211  slaves.
1212 
1213  We measure and subtract off a large-scale background model across
1214  all CCDs, which requires a scatter/gather. Then we process the
1215  individual CCDs, subtracting the large-scale background model and
1216  the residual background model measured. These residuals will be
1217  combined for the sky frame.
1218 
1219  @param pool Process pool
1220  @param ccdIdLists Dict of data identifier lists for each CCD name
1221  @return Dict of lists of returned data for each CCD name
1222  """
1223  self.log.info("Scatter processing")
1224 
1225  numExps = set(len(expList) for expList in ccdIdLists.values()).pop()
1226 
1227  # First subtract off general gradients to make all the exposures look similar.
1228  # We want to preserve the common small-scale structure, which we will coadd.
1229  bgModelList = mapToMatrix(pool, self.measureBackground, ccdIdLists)
1230 
1231  backgrounds = {}
1232  scales = {}
1233  for exp in range(numExps):
1234  bgModels = [bgModelList[ccdName][exp] for ccdName in ccdIdLists]
1235  visit = set(tuple(ccdIdLists[ccdName][exp][key] for key in sorted(self.config.visitKeys)) for
1236  ccdName in ccdIdLists)
1237  assert len(visit) == 1
1238  visit = visit.pop()
1239  bgModel = bgModels[0]
1240  for bg in bgModels[1:]:
1241  bgModel.merge(bg)
1242  self.log.info("Background model min/max for visit %s: %f %f", visit,
1243  np.min(bgModel.getStatsImage().getArray()),
1244  np.max(bgModel.getStatsImage().getArray()))
1245  backgrounds[visit] = bgModel
1246  scales[visit] = np.median(bgModel.getStatsImage().getArray())
1247 
1248  return mapToMatrix(pool, self.process, ccdIdLists, backgrounds=backgrounds, scales=scales)
1249 
1250  def measureBackground(self, cache, dataId):
1251  """!Measure background model for CCD
1252 
1253  This method is executed by the slaves.
1254 
1255  The background models for all CCDs in an exposure will be
1256  combined to form a full focal-plane background model.
1257 
1258  @param cache Process pool cache
1259  @param dataId Data identifier
1260  @return Bcakground model
1261  """
1262  dataRef = getDataRef(cache.butler, dataId)
1263  exposure = self.processSingleBackground(dataRef)
1264 
1265  # NAOJ prototype smoothed and then combined the entire image, but it shouldn't be any different
1266  # to bin and combine the binned images except that there's fewer pixels to worry about.
1267  config = self.config.largeScaleBackground
1268  camera = dataRef.get("camera")
1269  bgModel = FocalPlaneBackground.fromCamera(config, camera)
1270  bgModel.addCcd(exposure)
1271  return bgModel
1272 
1273  def processSingleBackground(self, dataRef):
1274  """!Process a single CCD for the background
1275 
1276  This method is executed by the slaves.
1277 
1278  Because we're interested in the background, we detect and mask astrophysical
1279  sources, and pixels above the noise level.
1280 
1281  @param dataRef Data reference for CCD.
1282  @return processed exposure
1283  """
1284  if not self.config.clobber and dataRef.datasetExists("postISRCCD"):
1285  return dataRef.get("postISRCCD")
1286  exposure = CalibTask.processSingle(self, dataRef)
1287 
1288  self.maskObjects.run(exposure, self.config.mask)
1289  dataRef.put(exposure, "postISRCCD")
1290  return exposure
1291 
1292  def processSingle(self, dataRef, backgrounds, scales):
1293  """Process a single CCD, specified by a data reference
1294 
1295  We subtract the appropriate focal plane background model,
1296  divide by the appropriate scale and measure the background.
1297 
1298  Only slave nodes execute this method.
1299 
1300  @param dataRef Data reference for single CCD
1301  @param backgrounds Background model for each visit
1302  @param scales Scales for each visit
1303  @return Processed exposure
1304  """
1305  visit = tuple(dataRef.dataId[key] for key in sorted(self.config.visitKeys))
1306  exposure = dataRef.get("postISRCCD", immediate=True)
1307  image = exposure.getMaskedImage()
1308  detector = exposure.getDetector()
1309  bbox = image.getBBox()
1310 
1311  bgModel = backgrounds[visit]
1312  bg = bgModel.toCcdBackground(detector, bbox)
1313  image -= bg.getImage()
1314  image /= scales[visit]
1315 
1316  bg = self.sky.measureBackground(exposure.getMaskedImage())
1317  dataRef.put(bg, "icExpBackground")
1318  return exposure
1319 
1320  def combine(self, cache, struct, outputId):
1321  """!Combine multiple background models of a particular CCD and write the output
1322 
1323  Only the slave nodes execute this method.
1324 
1325  @param cache Process pool cache
1326  @param struct Parameters for the combination, which has the following components:
1327  * ccdName Name tuple for CCD
1328  * ccdIdList List of data identifiers for combination
1329  @param outputId Data identifier for combined image (exposure part only)
1330  @return binned calib image
1331  """
1332  outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
1333  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
1334  dataId in struct.ccdIdList]
1335  self.log.info("Combining %s on %s" % (outputId, NODE))
1336  bgList = [dataRef.get("icExpBackground", immediate=True).clone() for dataRef in dataRefList]
1337 
1338  bgExp = self.sky.averageBackgrounds(bgList)
1339 
1340  self.recordCalibInputs(cache.butler, bgExp, struct.ccdIdList, outputId)
1341  cache.butler.put(bgExp, "sky", outputId)
1342  return afwMath.binImage(self.sky.exposureToBackground(bgExp).getImage(), self.config.binning)
def processWrite(self, dataRef, exposure, outputName="postISRCCD")
Write the processed CCD.
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64
def scatterCombine(self, pool, outputId, ccdIdLists, scales)
Scatter the combination of exposures across multiple nodes.
def run(self, exposureOrImage)
Measure a particular statistic on an image (of some sort).
def checksum(obj, header=None, sumType="MD5")
Calculate a checksum of an object.
Definition: checksum.py:29
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.
A Threshold is used to pass a threshold value to detection algorithms.
Definition: Threshold.h:43
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
Definition: fits.cc:884
def getFullyQualifiedOutputId(self, ccdName, butler, outputId)
def __call__(self, parser, namespace, values, option_string)
int min
Statistics makeStatistics(lsst::afw::math::MaskedVector< EntryT > const &mv, std::vector< WeightPixel > const &vweights, int const flags, StatisticsControl const &sctrl=StatisticsControl())
The makeStatistics() overload to handle lsst::afw::math::MaskedVector<>
Definition: Statistics.h:520
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
Definition: Exposure.h:457
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
def updateMetadata(self, calibImage, exposureTime, darkTime=None, kwargs)
Update the metadata from the VisitInfo.
Pass parameters to a Statistics object.
Definition: Statistics.h:93
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def process(self, cache, ccdId, outputName="postISRCCD", kwargs)
Process a CCD, specified by a data identifier.
def processSingle(self, dataRef, backgrounds, scales)
def makeCameraImage(self, camera, dataId, calibs)
Create and write an image of the entire camera.
def combine(self, target, imageList, stats)
Combine multiple images.
A set of Footprints, associated with a MaskedImage.
Definition: FootprintSet.h:53
def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC)
def combine(self, cache, struct, outputId)
Combine multiple exposures of a particular CCD and write the output.
def runDataRef(self, expRefList, butler, calibId)
Construct a calib from a list of exposure references.
std::shared_ptr< image::MaskedImage< PixelT > > statisticsStack(image::MaskedImage< PixelT > const &image, Property flags, char dimension, StatisticsControl const &sctrl)
A function to compute statistics on the rows or columns of an image.
Definition: Stack.cc:478
def write(self, butler, exposure, dataId)
Write the final combined calib.
def combine(self, cache, struct, outputId)
Combine multiple background models of a particular CCD and write the output.
def measureBackground(self, cache, dataId)
Measure background model for CCD.
def recordCalibInputs(self, butler, calib, dataIdList, outputId)
Record metadata including the inputs and creation details.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD")
Combine calib images for a single sensor.
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binsize, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:38
def getOutputId(self, expRefList, calibId)
Generate the data identifier for the output calib.
def mapToMatrix(pool, func, ccdIdLists, args, kwargs)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def scale(self, ccdIdLists, data)
Determine scaling across CCDs and exposures.
def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None)
Property
control what is calculated
Definition: Statistics.h:63
def getDimensions(self, sensorRefList, inputName="postISRCCD")
An integer coordinate rectangle.
Definition: Box.h:54
daf::base::PropertyList * list
Definition: fits.cc:885
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:709
Base class for constructing calibs.
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId)
Calculate the output header from the raw headers.
def processSingleBackground(self, dataRef)
Process a single CCD for the background.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def batchWallTime(cls, time, parsedCmd, numCores)