1 from __future__
import absolute_import, division, print_function
11 from builtins
import zip
12 from builtins
import range
14 from astro_metadata_translator
import merge_headers, ObservationGroup
15 from astro_metadata_translator.serialize
import dates_to_fits
17 from lsst.pex.config
import Config, ConfigurableField, Field, ListField, ConfigField
18 from lsst.pipe.base import Task, Struct, TaskRunner, ArgumentParser
31 FocalPlaneBackgroundConfig, MaskObjectsTask)
34 from .checksum
import checksum
35 from .utils
import getDataRef
39 """Parameters controlling the measurement of background statistics""" 40 stat = Field(doc=
"Statistic to use to estimate background (from lsst.afw.math)", dtype=int,
41 default=int(afwMath.MEANCLIP))
42 clip = Field(doc=
"Clipping threshold for background",
43 dtype=float, default=3.0)
44 nIter = Field(doc=
"Clipping iterations for background",
46 maxVisitsToCalcErrorFromInputVariance = Field(
47 doc=
"Maximum number of visits to estimate variance from input variance, not per-pixel spread",
49 mask = ListField(doc=
"Mask planes to reject",
50 dtype=str, default=[
"DETECTED",
"BAD",
"NO_DATA"])
54 """Measure statistics on the background 56 This can be useful for scaling the background, e.g., for flats and fringe frames. 58 ConfigClass = CalibStatsConfig
60 def run(self, exposureOrImage):
61 """!Measure a particular statistic on an image (of some sort). 63 @param exposureOrImage Exposure, MaskedImage or Image. 64 @return Value of desired statistic 67 afwImage.Mask.getPlaneBitMask(self.
config.mask))
69 image = exposureOrImage.getMaskedImage()
72 image = exposureOrImage.getImage()
74 image = exposureOrImage
80 """Configuration for combining calib images""" 81 rows = Field(doc=
"Number of rows to read at a time",
82 dtype=int, default=512)
83 mask = ListField(doc=
"Mask planes to respect", dtype=str,
84 default=[
"SAT",
"DETECTED",
"INTRP"])
85 combine = Field(doc=
"Statistic to use for combination (from lsst.afw.math)", dtype=int,
86 default=int(afwMath.MEANCLIP))
87 clip = Field(doc=
"Clipping threshold for combination",
88 dtype=float, default=3.0)
89 nIter = Field(doc=
"Clipping iterations for combination",
91 stats = ConfigurableField(target=CalibStatsTask,
92 doc=
"Background statistics configuration")
96 """Task to combine calib images""" 97 ConfigClass = CalibCombineConfig
100 Task.__init__(self, *args, **kwargs)
103 def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD"):
104 """!Combine calib images for a single sensor 106 @param sensorRefList List of data references to combine (for a single sensor) 107 @param expScales List of scales to apply for each exposure 108 @param finalScale Desired scale for final combined image 109 @param inputName Data set name for inputs 110 @return combined image 114 afwImage.Mask.getPlaneBitMask(self.
config.mask))
115 numImages = len(sensorRefList)
117 raise RuntimeError(
"No valid input data")
118 if numImages < self.
config.stats.maxVisitsToCalcErrorFromInputVariance:
119 stats.setCalcErrorFromInputVariance(
True)
122 combined = afwImage.MaskedImageF(width, height)
123 imageList = [
None]*numImages
124 for start
in range(0, height, self.
config.rows):
125 rows =
min(self.
config.rows, height - start)
128 subCombined = combined.Factory(combined, box)
130 for i, sensorRef
in enumerate(sensorRefList):
131 if sensorRef
is None:
134 exposure = sensorRef.get(inputName +
"_sub", bbox=box)
135 if expScales
is not None:
137 imageList[i] = exposure.getMaskedImage()
139 self.
combine(subCombined, imageList, stats)
141 if finalScale
is not None:
142 background = self.stats.
run(combined)
143 self.
log.
info(
"%s: Measured background of stack is %f; adjusting to %f" %
144 (NODE, background, finalScale))
145 combined *= finalScale / background
150 """Get dimensions of the inputs""" 152 for sensorRef
in sensorRefList:
153 if sensorRef
is None:
155 md = sensorRef.get(inputName +
"_md")
160 """Apply scale to input exposure 162 This implementation applies a flux scaling: the input exposure is 163 divided by the provided scale. 165 if scale
is not None:
166 mi = exposure.getMaskedImage()
170 """!Combine multiple images 172 @param target Target image to receive the combined pixels 173 @param imageList List of input images 174 @param stats Statistics control 176 images = [img
for img
in imageList
if img
is not None]
181 """Determine a consistent size, given a list of image sizes""" 182 dim =
set((w, h)
for w, h
in dimList)
185 raise RuntimeError(
"Inconsistent dimensions: %s" % dim)
190 """!Return a tuple of specific values from a dict 192 This provides a hashable representation of the dict from certain keywords. 193 This can be useful for creating e.g., a tuple of the values in the DataId 194 that identify the CCD. 196 @param dict_ dict to parse 197 @param keys keys to extract (order is important) 198 @return tuple of values 200 return tuple(dict_[k]
for k
in keys)
204 """!Determine a list of CCDs from exposure references 206 This essentially inverts the exposure-level references (which 207 provides a list of CCDs for each exposure), by providing 208 a dataId list for each CCD. Consider an input list of exposures 209 [e1, e2, e3], and each exposure has CCDs c1 and c2. Then this 212 {(c1,): [e1c1, e2c1, e3c1], (c2,): [e1c2, e2c2, e3c2]} 214 This is a dict whose keys are tuples of the identifying values of a 215 CCD (usually just the CCD number) and the values are lists of dataIds 216 for that CCD in each exposure. A missing dataId is given the value 219 @param expRefList List of data references for exposures 220 @param level Level for the butler to generate CCDs 221 @param ccdKeys DataId keywords that identify a CCD 222 @return dict of data identifier lists for each CCD; 223 keys are values of ccdKeys in order 225 expIdList = [[ccdRef.dataId
for ccdRef
in expRef.subItems(
226 level)]
for expRef
in expRefList]
229 if len(ccdKeys) != len(
set(ccdKeys)):
230 raise RuntimeError(
"Duplicate keys found in ccdKeys: %s" % ccdKeys)
232 for ccdIdList
in expIdList:
233 for ccdId
in ccdIdList:
240 for n, ccdIdList
in enumerate(expIdList):
241 for ccdId
in ccdIdList:
243 if name
not in ccdLists:
245 ccdLists[name].
append(ccdId)
249 ccdLists[ccd] = sorted(ccdLists[ccd], key=
lambda dd:
dictToTuple(dd, sorted(dd.keys())))
255 """Generate a matrix of results using pool.map 257 The function should have the call signature: 258 func(cache, dataId, *args, **kwargs) 260 We return a dict mapping 'ccd name' to a list of values for 263 @param pool Process pool 264 @param func Function to call for each dataId 265 @param ccdIdLists Dict of data identifier lists for each CCD name 266 @return matrix of results 268 dataIdList = sum(ccdIdLists.values(), [])
269 resultList = pool.map(func, dataIdList, *args, **kwargs)
271 data = dict((ccdName, [
None] * len(expList))
for ccdName, expList
in ccdIdLists.items())
272 indices = dict(sum([[(tuple(dataId.values())
if dataId
is not None else None, (ccdName, expNum))
273 for expNum, dataId
in enumerate(expList)]
274 for ccdName, expList
in ccdIdLists.items()], []))
275 for dataId, result
in zip(dataIdList, resultList):
278 ccdName, expNum = indices[tuple(dataId.values())]
279 data[ccdName][expNum] = result
284 """Split name=value pairs and put the result in a dict""" 286 def __call__(self, parser, namespace, values, option_string):
287 output = getattr(namespace, self.dest, {})
288 for nameValue
in values:
289 name, sep, valueStr = nameValue.partition(
"=")
291 parser.error(
"%s value %s must be in form name=value" %
292 (option_string, nameValue))
293 output[name] = valueStr
294 setattr(namespace, self.dest, output)
298 """ArgumentParser for calibration construction""" 301 """Add a --calibId argument to the standard pipe_base argument parser""" 302 ArgumentParser.__init__(self, *args, **kwargs)
305 help=
"input identifiers, e.g., --id visit=123 ccd=4")
306 self.add_argument(
"--calibId", nargs=
"*", action=CalibIdAction, default={},
307 help=
"identifiers for calib, e.g., --calibId version=1",
308 metavar=
"KEY=VALUE1[^VALUE2[^VALUE3...]")
313 Checks that the "--calibId" provided works. 315 namespace = ArgumentParser.parse_args(self, *args, **kwargs)
317 keys = namespace.butler.getKeys(self.
calibName)
319 for name, value
in namespace.calibId.items():
322 "%s is not a relevant calib identifier key (%s)" % (name, keys))
323 parsed[name] = keys[name](value)
324 namespace.calibId = parsed
330 """Configuration for constructing calibs""" 331 clobber = Field(dtype=bool, default=
True,
332 doc=
"Clobber existing processed images?")
333 isr = ConfigurableField(target=IsrTask, doc=
"ISR configuration")
334 dateObs = Field(dtype=str, default=
"dateObs",
335 doc=
"Key for observation date in exposure registry")
336 dateCalib = Field(dtype=str, default=
"calibDate",
337 doc=
"Key for calib date in calib registry")
338 filter = Field(dtype=str, default=
"filter",
339 doc=
"Key for filter name in exposure/calib registries")
340 combination = ConfigurableField(
341 target=CalibCombineTask, doc=
"Calib combination configuration")
342 ccdKeys = ListField(dtype=str, default=[
"ccd"],
343 doc=
"DataId keywords specifying a CCD")
344 visitKeys = ListField(dtype=str, default=[
"visit"],
345 doc=
"DataId keywords specifying a visit")
346 calibKeys = ListField(dtype=str, default=[],
347 doc=
"DataId keywords specifying a calibration")
348 doCameraImage = Field(dtype=bool, default=
True, doc=
"Create camera overview image?")
349 binning = Field(dtype=int, default=64, doc=
"Binning to apply for camera image")
352 self.
isr.doWrite =
False 356 """Get parsed values into the CalibTask.run""" 359 return [dict(expRefList=parsedCmd.id.refList, butler=parsedCmd.butler, calibId=parsedCmd.calibId)]
362 """Call the Task with the kwargs from getTargetList""" 366 result = task.runDataRef(**args)
369 result = task.runDataRef(**args)
370 except Exception
as e:
375 task.log.fatal(
"Failed: %s" % e)
376 traceback.print_exc(file=sys.stderr)
380 exitStatus=exitStatus,
382 metadata=task.metadata,
387 exitStatus=exitStatus,
392 """!Base class for constructing calibs. 394 This should be subclassed for each of the required calib types. 395 The subclass should be sure to define the following class variables: 396 * _DefaultName: default name of the task, used by CmdLineTask 397 * calibName: name of the calibration data set in the butler 398 The subclass may optionally set: 399 * filterName: filter name to give the resultant calib 401 ConfigClass = CalibConfig
402 RunnerClass = CalibTaskRunner
409 BatchPoolTask.__init__(self, *args, **kwargs)
415 numCcds = len(parsedCmd.butler.get(
"camera"))
417 parsedCmd)[0][
'expRefList'])
418 numCycles = int(numCcds/float(numCores) + 0.5)
419 return time*numExps*numCycles
422 def _makeArgumentParser(cls, *args, **kwargs):
423 kwargs.pop(
"doBatch",
False)
427 """!Construct a calib from a list of exposure references 429 This is the entry point, called by the TaskRunner.__call__ 431 Only the master node executes this method. 433 @param expRefList List of data references at the exposure level 434 @param butler Data butler 435 @param calibId Identifier dict for calib 437 if len(expRefList) < 1:
438 raise RuntimeError(
"No valid input data")
440 for expRef
in expRefList:
445 expRefList, level=
"sensor", ccdKeys=self.
config.ccdKeys)
449 outputIdItemList =
list(outputId.items())
450 for ccdName
in ccdIdLists:
451 dataId = dict([(k, ccdName[i])
for i, k
in enumerate(self.
config.ccdKeys)])
452 dataId.update(outputIdItemList)
454 dataId.update(outputIdItemList)
457 butler.get(self.
calibName +
"_filename", dataId)
458 except Exception
as e:
460 "Unable to determine output filename \"%s_filename\" from %s: %s" %
463 processPool =
Pool(
"process")
464 processPool.storeSet(butler=butler)
470 scales = self.
scale(ccdIdLists, data)
472 combinePool =
Pool(
"combine")
473 combinePool.storeSet(butler=butler)
476 calibs = self.
scatterCombine(combinePool, outputId, ccdIdLists, scales)
478 if self.
config.doCameraImage:
479 camera = butler.get(
"camera")
481 calibs = {butler.get(
"postISRCCD_detector",
482 dict(zip(self.
config.ccdKeys, ccdName))).getId(): calibs[ccdName]
483 for ccdName
in ccdIdLists}
487 butler.put(cameraImage, self.
calibName +
"_camera", dataId)
488 except Exception
as exc:
489 self.
log.
warn(
"Unable to create camera image: %s" % (exc,))
493 ccdIdLists=ccdIdLists,
496 processPool=processPool,
497 combinePool=combinePool,
501 """!Generate the data identifier for the output calib 503 The mean date and the common filter are included, using keywords 504 from the configuration. The CCD-specific part is not included 505 in the data identifier. 507 @param expRefList List of data references at exposure level 508 @param calibId Data identifier elements for the calib provided by the user 509 @return data identifier 513 for expRef
in expRefList:
514 butler = expRef.getButler()
515 dataId = expRef.dataId
517 midTime += self.
getMjd(butler, dataId)
520 if filterName
is None:
521 filterName = thisFilter
522 elif filterName != thisFilter:
523 raise RuntimeError(
"Filter mismatch for %s: %s vs %s" % (
524 dataId, thisFilter, filterName))
526 midTime /= len(expRefList)
528 midTime, dafBase.DateTime.MJD).toPython().date())
530 outputId = {self.
config.filter: filterName,
531 self.
config.dateCalib: date}
532 outputId.update(calibId)
535 def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
536 """Determine the Modified Julian Date (MJD; in TAI) from a data identifier""" 537 if self.
config.dateObs
in dataId:
538 dateObs = dataId[self.
config.dateObs]
540 dateObs = butler.queryMetadata(
'raw', [self.
config.dateObs], dataId)[0]
541 if "T" not in dateObs:
542 dateObs = dateObs +
"T12:00:00.0Z" 543 elif not dateObs.endswith(
"Z"):
549 """Determine the filter from a data identifier""" 550 filt = butler.queryMetadata(
'raw', [self.
config.filter], dataId)[0]
554 if calibName
is None:
557 if missingKeys
is None:
558 missingKeys =
set(butler.getKeys(calibName).
keys()) -
set(dataId.keys())
560 for k
in missingKeys:
562 v = butler.queryMetadata(
'raw', [k], dataId)
572 raise RuntimeError(
"No unique lookup for %s: %s" % (k, v))
575 """!Update the metadata from the VisitInfo 577 @param calibImage The image whose metadata is to be set 578 @param exposureTime The exposure time for the image 579 @param darkTime The time since the last read (default: exposureTime) 583 darkTime = exposureTime
585 visitInfo =
afwImage.VisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
586 md = calibImage.getMetadata()
588 afwImage.setVisitInfoMetadata(md, visitInfo)
591 """!Scatter the processing among the nodes 593 We scatter each CCD independently (exposures aren't grouped together), 594 to make full use of all available processors. This necessitates piecing 595 everything back together in the same format as ccdIdLists afterwards. 597 Only the master node executes this method. 599 @param pool Process pool 600 @param ccdIdLists Dict of data identifier lists for each CCD name 601 @return Dict of lists of returned data for each CCD name 603 self.
log.
info(
"Scatter processing")
606 def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
607 """!Process a CCD, specified by a data identifier 609 After processing, optionally returns a result (produced by 610 the 'processResult' method) calculated from the processed 611 exposure. These results will be gathered by the master node, 612 and is a means for coordinated scaling of all CCDs for flats, 615 Only slave nodes execute this method. 617 @param cache Process pool cache 618 @param ccdId Data identifier for CCD 619 @param outputName Output dataset name for butler 620 @return result from 'processResult' 623 self.
log.
warn(
"Null identifier received on %s" % NODE)
626 if self.
config.clobber
or not sensorRef.datasetExists(outputName):
627 self.
log.
info(
"Processing %s on %s" % (ccdId, NODE))
630 except Exception
as e:
631 self.
log.
warn(
"Unable to process %s: %s" % (ccdId, e))
637 "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
638 exposure = sensorRef.get(outputName)
642 """Process a single CCD, specified by a data reference 644 Generally, this simply means doing ISR. 646 Only slave nodes execute this method. 651 """!Write the processed CCD 653 We need to write these out because we can't hold them all in 656 Only slave nodes execute this method. 658 @param dataRef Data reference 659 @param exposure CCD exposure to write 660 @param outputName Output dataset name for butler. 662 dataRef.put(exposure, outputName)
665 """Extract processing results from a processed exposure 667 This method generates what is gathered by the master node. 668 This can be a background measurement or similar for scaling 669 flat-fields. It must be picklable! 671 Only slave nodes execute this method. 676 """!Determine scaling across CCDs and exposures 678 This is necessary mainly for flats, so as to determine a 679 consistent scaling across the entire focal plane. This 680 implementation is simply a placeholder. 682 Only the master node executes this method. 684 @param ccdIdLists Dict of data identifier lists for each CCD tuple 685 @param data Dict of lists of returned data for each CCD tuple 686 @return dict of Struct(ccdScale: scaling for CCD, 687 expScales: scaling for each exposure 690 self.
log.
info(
"Scale on %s" % NODE)
691 return dict((name,
Struct(ccdScale=
None, expScales=[
None] * len(ccdIdLists[name])))
692 for name
in ccdIdLists)
695 """!Scatter the combination of exposures across multiple nodes 697 In this case, we can only scatter across as many nodes as 700 Only the master node executes this method. 702 @param pool Process pool 703 @param outputId Output identifier (exposure part only) 704 @param ccdIdLists Dict of data identifier lists for each CCD name 705 @param scales Dict of structs with scales, for each CCD name 706 @param dict of binned images 708 self.
log.
info(
"Scatter combination")
709 data = [
Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName])
for 710 ccdName
in ccdIdLists]
711 images = pool.map(self.
combine, data, outputId)
712 return dict(zip(ccdIdLists.keys(), images))
715 """Get fully-qualified output data identifier 717 We may need to look up keys that aren't in the output dataId. 719 @param ccdName Name tuple for CCD 720 @param butler Data butler 721 @param outputId Data identifier for combined image (exposure part only) 722 @return fully-qualified output dataId 724 fullOutputId = {k: ccdName[i]
for i, k
in enumerate(self.
config.ccdKeys)}
725 fullOutputId.update(outputId)
727 fullOutputId.update(outputId)
731 """!Combine multiple exposures of a particular CCD and write the output 733 Only the slave nodes execute this method. 735 @param cache Process pool cache 736 @param struct Parameters for the combination, which has the following components: 737 * ccdName Name tuple for CCD 738 * ccdIdList List of data identifiers for combination 739 * scales Scales to apply (expScales are scalings for each exposure, 740 ccdScale is final scale for combined image) 741 @param outputId Data identifier for combined image (exposure part only) 742 @return binned calib image 745 dataRefList = [
getDataRef(cache.butler, dataId)
if dataId
is not None else None for 746 dataId
in struct.ccdIdList]
747 self.
log.
info(
"Combining %s on %s" % (outputId, NODE))
748 calib = self.combination.
run(dataRefList, expScales=struct.scales.expScales,
749 finalScale=struct.scales.ccdScale)
751 if not hasattr(calib,
"getMetadata"):
752 if hasattr(calib,
"getVariance"):
755 calib = afwImage.DecoratedImageF(calib.getImage())
762 struct.ccdIdList, outputId)
766 self.
write(cache.butler, calib, outputId)
771 """!Calculate the output header from the raw headers. 773 This metadata will go into the output FITS header. It will include all 774 headers that are identical in all inputs. 776 @param butler Data butler 777 @param calib Combined calib exposure. 778 @param dataIdList List of data identifiers for calibration inputs 779 @param outputId Data identifier for output 781 header = calib.getMetadata()
783 rawmd = [butler.get(
"raw_md", dataId)
for dataId
in dataIdList
if 786 merged = merge_headers(rawmd, mode=
"drop")
791 for k, v
in merged.items():
793 comment = rawmd[0].getComment(k)
if k
in rawmd[0]
else None 794 header.set(k, v, comment=comment)
800 group = ObservationGroup(rawmd, pedantic=
False)
804 comments = {
"TIMESYS":
"Time scale for all dates",
805 "DATE-OBS":
"Start date of earliest input observation",
806 "MJD-OBS":
"[d] Start MJD of earliest input observation",
807 "DATE-END":
"End date of oldest input observation",
808 "MJD-END":
"[d] End MJD of oldest input observation",
809 "MJD-AVG":
"[d] MJD midpoint of all input observations",
810 "DATE-AVG":
"Midpoint date of all input observations"}
812 if group
is not None:
813 oldest, newest = group.extremes()
814 dateCards = dates_to_fits(oldest.datetime_begin, newest.datetime_end)
817 dateCards = {
"DATE-OBS":
"{}T00:00:00.00".
format(outputId[self.
config.dateCalib])}
818 comments[
"DATE-OBS"] =
"Date of start of day of calibration midpoint" 820 for k, v
in dateCards.items():
821 header.set(k, v, comment=comments.get(k,
None))
824 """!Record metadata including the inputs and creation details 826 This metadata will go into the FITS header. 828 @param butler Data butler 829 @param calib Combined calib exposure. 830 @param dataIdList List of data identifiers for calibration inputs 831 @param outputId Data identifier for output 833 header = calib.getMetadata()
837 now = time.localtime()
838 header.set(
"CALIB_CREATION_DATE", time.strftime(
"%Y-%m-%d", now))
839 header.set(
"CALIB_CREATION_TIME", time.strftime(
"%X %Z", now))
842 visits = [str(
dictToTuple(dataId, self.
config.visitKeys))
for dataId
in dataIdList
if 844 for i, v
in enumerate(sorted(
set(visits))):
845 header.set(
"CALIB_INPUT_%d" % (i,), v)
847 header.set(
"CALIB_ID",
" ".join(
"%s=%s" % (key, value)
848 for key, value
in outputId.items()))
852 """Interpolate over NANs in the combined image 854 NANs can result from masked areas on the CCD. We don't want them getting 855 into our science images, so we replace them with the median of the image. 857 if hasattr(image,
"getMaskedImage"):
859 image = image.getMaskedImage().getImage()
860 if hasattr(image,
"getImage"):
861 image = image.getImage()
862 array = image.getArray()
863 bad = np.isnan(array)
864 array[bad] = np.median(array[np.logical_not(bad)])
866 def write(self, butler, exposure, dataId):
867 """!Write the final combined calib 869 Only the slave nodes execute this method 871 @param butler Data butler 872 @param exposure CCD exposure to write 873 @param dataId Data identifier for output 875 self.
log.
info(
"Writing %s on %s" % (dataId, NODE))
876 butler.put(exposure, self.
calibName, dataId)
879 """!Create and write an image of the entire camera 881 This is useful for judging the quality or getting an overview of 882 the features of the calib. 884 @param camera Camera object 885 @param dataId Data identifier for output 886 @param calibs Dict mapping CCD detector ID to calib image 891 """Check that the list of CCD dataIds is consistent 893 @param ccdIdLists Dict of data identifier lists for each CCD name 894 @return Number of exposures, number of CCDs 896 visitIdLists = collections.defaultdict(list)
897 for ccdName
in ccdIdLists:
898 for dataId
in ccdIdLists[ccdName]:
900 visitIdLists[visitName].
append(dataId)
902 numExps =
set(len(expList)
for expList
in ccdIdLists.values())
903 numCcds =
set(len(ccdList)
for ccdList
in visitIdLists.values())
905 if len(numExps) != 1
or len(numCcds) != 1:
908 self.
log.
warn(
"Number of visits for each CCD: %s",
909 {ccdName: len(ccdIdLists[ccdName])
for ccdName
in ccdIdLists})
910 self.
log.
warn(
"Number of CCDs for each visit: %s",
911 {vv: len(visitIdLists[vv])
for vv
in visitIdLists})
912 raise RuntimeError(
"Inconsistent number of exposures/CCDs")
914 return numExps.pop(), numCcds.pop()
918 """Configuration for bias construction. 920 No changes required compared to the base class, but 921 subclassed for distinction. 926 class BiasTask(CalibTask):
927 """Bias construction""" 928 ConfigClass = BiasConfig
929 _DefaultName =
"bias" 936 """Overrides to apply for bias construction""" 937 config.isr.doBias =
False 938 config.isr.doDark =
False 939 config.isr.doFlat =
False 940 config.isr.doFringe =
False 944 """Configuration for dark construction""" 945 doRepair = Field(dtype=bool, default=
True, doc=
"Repair artifacts?")
946 psfFwhm = Field(dtype=float, default=3.0, doc=
"Repair PSF FWHM (pixels)")
947 psfSize = Field(dtype=int, default=21, doc=
"Repair PSF size (pixels)")
948 crGrow = Field(dtype=int, default=2, doc=
"Grow radius for CR (pixels)")
949 repair = ConfigurableField(
950 target=RepairTask, doc=
"Task to repair artifacts")
953 CalibConfig.setDefaults(self)
960 The only major difference from the base class is a cosmic-ray 961 identification stage, and dividing each image by the dark time 962 to generate images of the dark rate. 964 ConfigClass = DarkConfig
965 _DefaultName =
"dark" 970 CalibTask.__init__(self, *args, **kwargs)
975 """Overrides to apply for dark construction""" 976 config.isr.doDark =
False 977 config.isr.doFlat =
False 978 config.isr.doFringe =
False 981 """Process a single CCD 983 Besides the regular ISR, also masks cosmic-rays and divides each 984 processed image by the dark time to generate images of the dark rate. 985 The dark time is provided by the 'getDarkTime' method. 987 exposure = CalibTask.processSingle(self, sensorRef)
990 psf = measAlg.DoubleGaussianPsf(self.
config.psfSize, self.
config.psfSize,
991 self.
config.psfFwhm/(2*math.sqrt(2*math.log(2))))
993 self.repair.
run(exposure, keepCRs=
False)
994 if self.
config.crGrow > 0:
995 mask = exposure.getMaskedImage().getMask().
clone()
996 mask &= mask.getPlaneBitMask(
"CR")
1000 fpSet.setMask(exposure.getMaskedImage().getMask(),
"CR")
1002 mi = exposure.getMaskedImage()
1007 """Retrieve the dark time for an exposure""" 1008 darkTime = exposure.getInfo().getVisitInfo().
getDarkTime()
1009 if not np.isfinite(darkTime):
1010 raise RuntimeError(
"Non-finite darkTime")
1015 """Configuration for flat construction""" 1016 iterations = Field(dtype=int, default=10,
1017 doc=
"Number of iterations for scale determination")
1018 stats = ConfigurableField(target=CalibStatsTask,
1019 doc=
"Background statistics configuration")
1023 """Flat construction 1025 The principal change from the base class involves gathering the background 1026 values from each image and using them to determine the scalings for the final 1029 ConfigClass = FlatConfig
1030 _DefaultName =
"flat" 1035 """Overrides for flat construction""" 1036 config.isr.doFlat =
False 1037 config.isr.doFringe =
False 1040 CalibTask.__init__(self, *args, **kwargs)
1044 return self.stats.
run(exposure)
1047 """Determine the scalings for the final combination 1049 We have a matrix B_ij = C_i E_j, where C_i is the relative scaling 1050 of one CCD to all the others in an exposure, and E_j is the scaling 1051 of the exposure. We convert everything to logarithms so we can work 1052 with a linear system. We determine the C_i and E_j from B_ij by iteration, 1053 under the additional constraint that the average CCD scale is unity. 1055 This algorithm comes from Eugene Magnier and Pan-STARRS. 1057 assert len(ccdIdLists.values()) > 0,
"No successful CCDs" 1058 lengths =
set([len(expList)
for expList
in ccdIdLists.values()])
1059 assert len(lengths) == 1,
"Number of successful exposures for each CCD differs" 1060 assert tuple(lengths)[0] > 0,
"No successful exposures" 1062 indices = dict((name, i)
for i, name
in enumerate(ccdIdLists))
1063 bgMatrix = np.array([[0.0] * len(expList)
for expList
in ccdIdLists.values()])
1064 for name
in ccdIdLists:
1066 bgMatrix[i] = [d
if d
is not None else np.nan
for d
in data[name]]
1068 numpyPrint = np.get_printoptions()
1069 np.set_printoptions(threshold=np.inf)
1070 self.
log.
info(
"Input backgrounds: %s" % bgMatrix)
1073 numCcds = len(ccdIdLists)
1074 numExps = bgMatrix.shape[1]
1076 bgMatrix = np.log(bgMatrix)
1077 bgMatrix = np.ma.masked_array(bgMatrix, ~np.isfinite(bgMatrix))
1079 compScales = np.zeros(numCcds)
1080 expScales = np.array([(bgMatrix[:, i0] - compScales).mean()
for i0
in range(numExps)])
1082 for iterate
in range(self.
config.iterations):
1083 compScales = np.array([(bgMatrix[i1, :] - expScales).mean()
for i1
in range(numCcds)])
1084 bad = np.isnan(compScales)
1087 compScales[bad] = compScales[~bad].mean()
1088 expScales = np.array([(bgMatrix[:, i2] - compScales).mean()
for i2
in range(numExps)])
1090 avgScale = np.average(np.exp(compScales))
1091 compScales -= np.log(avgScale)
1092 self.
log.
debug(
"Iteration %d exposure scales: %s", iterate, np.exp(expScales))
1093 self.
log.
debug(
"Iteration %d component scales: %s", iterate, np.exp(compScales))
1095 expScales = np.array([(bgMatrix[:, i3] - compScales).mean()
for i3
in range(numExps)])
1097 if np.any(np.isnan(expScales)):
1098 raise RuntimeError(
"Bad exposure scales: %s --> %s" % (bgMatrix, expScales))
1100 expScales = np.exp(expScales)
1101 compScales = np.exp(compScales)
1103 self.
log.
info(
"Exposure scales: %s" % expScales)
1104 self.
log.
info(
"Component relative scaling: %s" % compScales)
1105 np.set_printoptions(**numpyPrint)
1107 return dict((ccdName,
Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1108 for ccdName
in ccdIdLists)
1112 """Configuration for fringe construction""" 1113 stats = ConfigurableField(target=CalibStatsTask,
1114 doc=
"Background statistics configuration")
1115 subtractBackground = ConfigurableField(target=measAlg.SubtractBackgroundTask,
1116 doc=
"Background configuration")
1117 detection = ConfigurableField(
1118 target=measAlg.SourceDetectionTask, doc=
"Detection configuration")
1119 detectSigma = Field(dtype=float, default=1.0,
1120 doc=
"Detection PSF gaussian sigma")
1123 CalibConfig.setDefaults(self)
1124 self.
detection.reEstimateBackground =
False 1128 """Fringe construction task 1130 The principal change from the base class is that the images are 1131 background-subtracted and rescaled by the background. 1133 XXX This is probably not right for a straight-up combination, as we 1134 are currently doing, since the fringe amplitudes need not scale with 1137 XXX Would like to have this do PCA and generate multiple images, but 1138 that will take a bit of work with the persistence code. 1140 ConfigClass = FringeConfig
1141 _DefaultName =
"fringe" 1142 calibName =
"fringe" 1146 """Overrides for fringe construction""" 1147 config.isr.doFringe =
False 1150 CalibTask.__init__(self, *args, **kwargs)
1156 """Subtract the background and normalise by the background level""" 1157 exposure = CalibTask.processSingle(self, sensorRef)
1158 bgLevel = self.stats.
run(exposure)
1159 self.subtractBackground.
run(exposure)
1160 mi = exposure.getMaskedImage()
1162 footprintSets = self.detection.detectFootprints(
1163 exposure, sigma=self.
config.detectSigma)
1164 mask = exposure.getMaskedImage().getMask()
1165 detected = 1 << mask.addMaskPlane(
"DETECTED")
1166 for fpSet
in (footprintSets.positive, footprintSets.negative):
1167 if fpSet
is not None:
1168 afwDet.setMaskFromFootprintList(
1169 mask, fpSet.getFootprints(), detected)
1174 """Configuration for sky frame construction""" 1175 detectSigma = Field(dtype=float, default=2.0, doc=
"Detection PSF gaussian sigma")
1176 maskObjects = ConfigurableField(target=MaskObjectsTask,
1177 doc=
"Configuration for masking objects aggressively")
1178 largeScaleBackground = ConfigField(dtype=FocalPlaneBackgroundConfig,
1179 doc=
"Large-scale background configuration")
1180 sky = ConfigurableField(target=SkyMeasurementTask, doc=
"Sky measurement")
1181 maskThresh = Field(dtype=float, default=3.0, doc=
"k-sigma threshold for masking pixels")
1182 mask = ListField(dtype=str, default=[
"BAD",
"SAT",
"DETECTED",
"NO_DATA"],
1183 doc=
"Mask planes to consider as contaminated")
1187 """Task for sky frame construction 1189 The sky frame is a (relatively) small-scale background 1190 model, the response of the camera to the sky. 1192 To construct, we first remove a large-scale background (e.g., caused 1193 by moonlight) which may vary from image to image. Then we construct a 1194 model of the sky, which is essentially a binned version of the image 1195 (important configuration parameters: sky.background.[xy]BinSize). 1196 It is these models which are coadded to yield the sky frame. 1198 ConfigClass = SkyConfig
1199 _DefaultName =
"sky" 1203 CalibTask.__init__(self, *args, **kwargs)
1208 """!Scatter the processing among the nodes 1210 Only the master node executes this method, assigning work to the 1213 We measure and subtract off a large-scale background model across 1214 all CCDs, which requires a scatter/gather. Then we process the 1215 individual CCDs, subtracting the large-scale background model and 1216 the residual background model measured. These residuals will be 1217 combined for the sky frame. 1219 @param pool Process pool 1220 @param ccdIdLists Dict of data identifier lists for each CCD name 1221 @return Dict of lists of returned data for each CCD name 1223 self.
log.
info(
"Scatter processing")
1225 numExps =
set(len(expList)
for expList
in ccdIdLists.values()).pop()
1233 for exp
in range(numExps):
1234 bgModels = [bgModelList[ccdName][exp]
for ccdName
in ccdIdLists]
1235 visit =
set(tuple(ccdIdLists[ccdName][exp][key]
for key
in sorted(self.
config.visitKeys))
for 1236 ccdName
in ccdIdLists)
1237 assert len(visit) == 1
1239 bgModel = bgModels[0]
1240 for bg
in bgModels[1:]:
1242 self.
log.
info(
"Background model min/max for visit %s: %f %f", visit,
1243 np.min(bgModel.getStatsImage().getArray()),
1244 np.max(bgModel.getStatsImage().getArray()))
1245 backgrounds[visit] = bgModel
1246 scales[visit] = np.median(bgModel.getStatsImage().getArray())
1248 return mapToMatrix(pool, self.
process, ccdIdLists, backgrounds=backgrounds, scales=scales)
1251 """!Measure background model for CCD 1253 This method is executed by the slaves. 1255 The background models for all CCDs in an exposure will be 1256 combined to form a full focal-plane background model. 1258 @param cache Process pool cache 1259 @param dataId Data identifier 1260 @return Bcakground model 1267 config = self.
config.largeScaleBackground
1268 camera = dataRef.get(
"camera")
1269 bgModel = FocalPlaneBackground.fromCamera(config, camera)
1270 bgModel.addCcd(exposure)
1274 """!Process a single CCD for the background 1276 This method is executed by the slaves. 1278 Because we're interested in the background, we detect and mask astrophysical 1279 sources, and pixels above the noise level. 1281 @param dataRef Data reference for CCD. 1282 @return processed exposure 1284 if not self.
config.clobber
and dataRef.datasetExists(
"postISRCCD"):
1285 return dataRef.get(
"postISRCCD")
1286 exposure = CalibTask.processSingle(self, dataRef)
1288 self.maskObjects.
run(exposure, self.
config.mask)
1289 dataRef.put(exposure,
"postISRCCD")
1293 """Process a single CCD, specified by a data reference 1295 We subtract the appropriate focal plane background model, 1296 divide by the appropriate scale and measure the background. 1298 Only slave nodes execute this method. 1300 @param dataRef Data reference for single CCD 1301 @param backgrounds Background model for each visit 1302 @param scales Scales for each visit 1303 @return Processed exposure 1305 visit = tuple(dataRef.dataId[key]
for key
in sorted(self.
config.visitKeys))
1306 exposure = dataRef.get(
"postISRCCD", immediate=
True)
1307 image = exposure.getMaskedImage()
1308 detector = exposure.getDetector()
1309 bbox = image.getBBox()
1311 bgModel = backgrounds[visit]
1312 bg = bgModel.toCcdBackground(detector, bbox)
1313 image -= bg.getImage()
1314 image /= scales[visit]
1317 dataRef.put(bg,
"icExpBackground")
1321 """!Combine multiple background models of a particular CCD and write the output 1323 Only the slave nodes execute this method. 1325 @param cache Process pool cache 1326 @param struct Parameters for the combination, which has the following components: 1327 * ccdName Name tuple for CCD 1328 * ccdIdList List of data identifiers for combination 1329 @param outputId Data identifier for combined image (exposure part only) 1330 @return binned calib image 1333 dataRefList = [
getDataRef(cache.butler, dataId)
if dataId
is not None else None for 1334 dataId
in struct.ccdIdList]
1335 self.
log.
info(
"Combining %s on %s" % (outputId, NODE))
1336 bgList = [dataRef.get(
"icExpBackground", immediate=
True).
clone()
for dataRef
in dataRefList]
1338 bgExp = self.sky.averageBackgrounds(bgList)
1341 cache.butler.put(bgExp,
"sky", outputId)
def applyOverrides(cls, config)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def getFilter(self, butler, dataId)
def __init__(self, args, kwargs)
def processWrite(self, dataRef, exposure, outputName="postISRCCD")
Write the processed CCD.
def makeSubtask(self, name, keyArgs)
Class for handling dates/times, including MJD, UTC, and TAI.
def scatterCombine(self, pool, outputId, ccdIdLists, scales)
Scatter the combination of exposures across multiple nodes.
def __init__(self, args, kwargs)
def run(self, exposureOrImage)
Measure a particular statistic on an image (of some sort).
def processResult(self, exposure)
def checksum(obj, header=None, sumType="MD5")
Calculate a checksum of an object.
Information about a single exposure of an imaging camera.
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.
def __init__(self, calibName, args, kwargs)
def applyScale(self, exposure, scale=None)
def processSingle(self, sensorRef)
A Threshold is used to pass a threshold value to detection algorithms.
def applyOverrides(cls, config)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
def processSingle(self, sensorRef)
def getFullyQualifiedOutputId(self, ccdName, butler, outputId)
def __call__(self, parser, namespace, values, option_string)
def processSingle(self, dataRef)
Statistics makeStatistics(lsst::afw::math::MaskedVector< EntryT > const &mv, std::vector< WeightPixel > const &vweights, int const flags, StatisticsControl const &sctrl=StatisticsControl())
The makeStatistics() overload to handle lsst::afw::math::MaskedVector<>
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.
def interpolateNans(self, image)
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
def getDataRef(butler, dataId, datasetType="raw")
def updateMetadata(self, calibImage, exposureTime, darkTime=None, kwargs)
Update the metadata from the VisitInfo.
Pass parameters to a Statistics object.
def scale(self, ccdIdLists, data)
def process(self, cache, ccdId, outputName="postISRCCD", kwargs)
Process a CCD, specified by a data identifier.
def parse_args(self, args, kwargs)
def processSingle(self, dataRef, backgrounds, scales)
def makeCameraImage(self, camera, dataId, calibs)
Create and write an image of the entire camera.
def combine(self, target, imageList, stats)
Combine multiple images.
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC)
def getDarkTime(self, exposure)
def combine(self, cache, struct, outputId)
Combine multiple exposures of a particular CCD and write the output.
def checkCcdIdLists(self, ccdIdLists)
def runDataRef(self, expRefList, butler, calibId)
Construct a calib from a list of exposure references.
std::shared_ptr< image::MaskedImage< PixelT > > statisticsStack(image::MaskedImage< PixelT > const &image, Property flags, char dimension, StatisticsControl const &sctrl)
A function to compute statistics on the rows or columns of an image.
def write(self, butler, exposure, dataId)
Write the final combined calib.
def combine(self, cache, struct, outputId)
Combine multiple background models of a particular CCD and write the output.
def measureBackground(self, cache, dataId)
Measure background model for CCD.
def recordCalibInputs(self, butler, calib, dataIdList, outputId)
Record metadata including the inputs and creation details.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def run(self, sensorRefList, expScales=None, finalScale=None, inputName="postISRCCD")
Combine calib images for a single sensor.
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binsize, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
def __init__(self, args, kwargs)
def __init__(self, args, kwargs)
def applyOverrides(cls, config)
def getOutputId(self, expRefList, calibId)
Generate the data identifier for the output calib.
def processResult(self, exposure)
def mapToMatrix(pool, func, ccdIdLists, args, kwargs)
def __init__(self, args, kwargs)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def scale(self, ccdIdLists, data)
Determine scaling across CCDs and exposures.
def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None)
Property
control what is calculated
def getDimensions(self, sensorRefList, inputName="postISRCCD")
def getTargetList(parsedCmd, kwargs)
def __init__(self, args, kwargs)
An integer coordinate rectangle.
daf::base::PropertyList * list
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Base class for constructing calibs.
def applyOverrides(cls, config)
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId)
Calculate the output header from the raw headers.
def processSingleBackground(self, dataRef)
Process a single CCD for the background.
def scatterProcess(self, pool, ccdIdLists)
Scatter the processing among the nodes.
def batchWallTime(cls, time, parsedCmd, numCores)