LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Static Public Attributes | List of all members
lsst.pipe.drivers.constructCalibs.FlatTask Class Reference
Inheritance diagram for lsst.pipe.drivers.constructCalibs.FlatTask:
lsst.pipe.drivers.constructCalibs.CalibTask lsst.ctrl.pool.parallel.BatchPoolTask lsst.ctrl.pool.parallel.BatchCmdLineTask

Public Member Functions

def applyOverrides (cls, config)
 
def __init__ (self, *args, **kwargs)
 
def processResult (self, exposure)
 
def scale (self, ccdIdLists, data)
 
def batchWallTime (cls, time, parsedCmd, numCores)
 Return walltime request for batch job. More...
 
def runDataRef (self, expRefList, butler, calibId)
 Construct a calib from a list of exposure references. More...
 
def getOutputId (self, expRefList, calibId)
 Generate the data identifier for the output calib. More...
 
def getMjd (self, butler, dataId, timescale=dafBase.DateTime.UTC)
 
def getFilter (self, butler, dataId)
 
def addMissingKeys (self, dataId, butler, missingKeys=None, calibName=None)
 
def updateMetadata (self, calibImage, exposureTime, darkTime=None, **kwargs)
 Update the metadata from the VisitInfo. More...
 
def scatterProcess (self, pool, ccdIdLists)
 Scatter the processing among the nodes. More...
 
def process (self, cache, ccdId, outputName="postISRCCD", **kwargs)
 Process a CCD, specified by a data identifier. More...
 
def processSingle (self, dataRef)
 
def processWrite (self, dataRef, exposure, outputName="postISRCCD")
 Write the processed CCD. More...
 
def scatterCombine (self, pool, outputId, ccdIdLists, scales)
 Scatter the combination of exposures across multiple nodes. More...
 
def getFullyQualifiedOutputId (self, ccdName, butler, outputId)
 
def combine (self, cache, struct, outputId)
 Combine multiple exposures of a particular CCD and write the output. More...
 
def calculateOutputHeaderFromRaws (self, butler, calib, dataIdList, outputId)
 Calculate the output header from the raw headers. More...
 
def recordCalibInputs (self, butler, calib, dataIdList, outputId)
 Record metadata including the inputs and creation details. More...
 
def interpolateNans (self, image)
 
def write (self, butler, exposure, dataId)
 Write the final combined calib. More...
 
def makeCameraImage (self, camera, dataId, calibs)
 Create and write an image of the entire camera. More...
 
def checkCcdIdLists (self, ccdIdLists)
 
def parseAndRun (cls, *args, **kwargs)
 
def parseAndSubmit (cls, args=None, **kwargs)
 
def batchCommand (cls, args)
 Return command to run CmdLineTask. More...
 
def logOperation (self, operation, catch=False, trace=True)
 Provide a context manager for logging an operation. More...
 

Static Public Attributes

 ConfigClass = FlatConfig
 
string calibName = "flat"
 
 RunnerClass = CalibTaskRunner
 
 filterName = None
 
float exposureTime = 1.0
 

Detailed Description

Flat construction

The principal change from the base class involves gathering the background
values from each image and using them to determine the scalings for the final
combination.

Definition at line 1018 of file constructCalibs.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.drivers.constructCalibs.FlatTask.__init__ (   self,
args,
**  kwargs 
)
Constructor

Reimplemented from lsst.pipe.drivers.constructCalibs.CalibTask.

Definition at line 1035 of file constructCalibs.py.

1035  def __init__(self, *args, **kwargs):
1036  CalibTask.__init__(self, *args, **kwargs)
1037  self.makeSubtask("stats")
1038 

Member Function Documentation

◆ addMissingKeys()

def lsst.pipe.drivers.constructCalibs.CalibTask.addMissingKeys (   self,
  dataId,
  butler,
  missingKeys = None,
  calibName = None 
)
inherited

Definition at line 549 of file constructCalibs.py.

549  def addMissingKeys(self, dataId, butler, missingKeys=None, calibName=None):
550  if calibName is None:
551  calibName = self.calibName
552 
553  if missingKeys is None:
554  missingKeys = set(butler.getKeys(calibName).keys()) - set(dataId.keys())
555 
556  for k in missingKeys:
557  try:
558  v = butler.queryMetadata('raw', [k], dataId) # n.b. --id refers to 'raw'
559  except Exception:
560  continue
561 
562  if len(v) == 0: # failed to lookup value
563  continue
564 
565  if len(v) == 1:
566  dataId[k] = v[0]
567  else:
568  raise RuntimeError("No unique lookup for %s: %s" % (k, v))
569 
daf::base::PropertySet * set
Definition: fits.cc:912

◆ applyOverrides()

def lsst.pipe.drivers.constructCalibs.FlatTask.applyOverrides (   cls,
  config 
)
Overrides for flat construction

Definition at line 1030 of file constructCalibs.py.

1030  def applyOverrides(cls, config):
1031  """Overrides for flat construction"""
1032  config.isr.doFlat = False
1033  config.isr.doFringe = False
1034 

◆ batchCommand()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchCommand (   cls,
  args 
)
inherited

Return command to run CmdLineTask.

    @param cls: Class
    @param args: Parsed batch job arguments (from BatchArgumentParser)

Definition at line 476 of file parallel.py.

476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
def shCommandFromArgs(args)
Definition: parallel.py:42

◆ batchWallTime()

def lsst.pipe.drivers.constructCalibs.CalibTask.batchWallTime (   cls,
  time,
  parsedCmd,
  numCores 
)
inherited

Return walltime request for batch job.

    Subclasses should override if the walltime should be calculated
    differently (e.g., addition of some serial time).

    @param cls: Class
    @param time: Requested time per iteration
    @param parsedCmd: Results of argument parsing
    @param numCores: Number of cores

Reimplemented from lsst.ctrl.pool.parallel.BatchCmdLineTask.

Definition at line 410 of file constructCalibs.py.

410  def batchWallTime(cls, time, parsedCmd, numCores):
411  numCcds = len(parsedCmd.butler.get("camera"))
412  numExps = len(cls.RunnerClass.getTargetList(
413  parsedCmd)[0]['expRefList'])
414  numCycles = int(numCcds/float(numCores) + 0.5)
415  return time*numExps*numCycles
416 

◆ calculateOutputHeaderFromRaws()

def lsst.pipe.drivers.constructCalibs.CalibTask.calculateOutputHeaderFromRaws (   self,
  butler,
  calib,
  dataIdList,
  outputId 
)
inherited

Calculate the output header from the raw headers.

    This metadata will go into the output FITS header. It will include all
    headers that are identical in all inputs.

    @param butler  Data butler
    @param calib  Combined calib exposure.
    @param dataIdList  List of data identifiers for calibration inputs
    @param outputId  Data identifier for output

Definition at line 766 of file constructCalibs.py.

766  def calculateOutputHeaderFromRaws(self, butler, calib, dataIdList, outputId):
767  """!Calculate the output header from the raw headers.
768 
769  This metadata will go into the output FITS header. It will include all
770  headers that are identical in all inputs.
771 
772  @param butler Data butler
773  @param calib Combined calib exposure.
774  @param dataIdList List of data identifiers for calibration inputs
775  @param outputId Data identifier for output
776  """
777  header = calib.getMetadata()
778 
779  rawmd = [butler.get("raw_md", dataId) for dataId in dataIdList if
780  dataId is not None]
781 
782  merged = merge_headers(rawmd, mode="drop")
783 
784  # Place merged set into the PropertyList if a value is not
785  # present already
786  # Comments are not present in the merged version so copy them across
787  for k, v in merged.items():
788  if k not in header:
789  comment = rawmd[0].getComment(k) if k in rawmd[0] else None
790  header.set(k, v, comment=comment)
791 
792  # Create an observation group so we can add some standard headers
793  # independent of the form in the input files.
794  # Use try block in case we are dealing with unexpected data headers
795  try:
796  group = ObservationGroup(rawmd, pedantic=False)
797  except Exception:
798  group = None
799 
800  comments = {"TIMESYS": "Time scale for all dates",
801  "DATE-OBS": "Start date of earliest input observation",
802  "MJD-OBS": "[d] Start MJD of earliest input observation",
803  "DATE-END": "End date of oldest input observation",
804  "MJD-END": "[d] End MJD of oldest input observation",
805  "MJD-AVG": "[d] MJD midpoint of all input observations",
806  "DATE-AVG": "Midpoint date of all input observations"}
807 
808  if group is not None:
809  oldest, newest = group.extremes()
810  dateCards = dates_to_fits(oldest.datetime_begin, newest.datetime_end)
811  else:
812  # Fall back to setting a DATE-OBS from the calibDate
813  dateCards = {"DATE-OBS": "{}T00:00:00.00".format(outputId[self.config.dateCalib])}
814  comments["DATE-OBS"] = "Date of start of day of calibration midpoint"
815 
816  for k, v in dateCards.items():
817  header.set(k, v, comment=comments.get(k, None))
818 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174

◆ checkCcdIdLists()

def lsst.pipe.drivers.constructCalibs.CalibTask.checkCcdIdLists (   self,
  ccdIdLists 
)
inherited
Check that the list of CCD dataIds is consistent

@param ccdIdLists  Dict of data identifier lists for each CCD name
@return Number of exposures, number of CCDs

Definition at line 886 of file constructCalibs.py.

886  def checkCcdIdLists(self, ccdIdLists):
887  """Check that the list of CCD dataIds is consistent
888 
889  @param ccdIdLists Dict of data identifier lists for each CCD name
890  @return Number of exposures, number of CCDs
891  """
892  visitIdLists = collections.defaultdict(list)
893  for ccdName in ccdIdLists:
894  for dataId in ccdIdLists[ccdName]:
895  visitName = dictToTuple(dataId, self.config.visitKeys)
896  visitIdLists[visitName].append(dataId)
897 
898  numExps = set(len(expList) for expList in ccdIdLists.values())
899  numCcds = set(len(ccdList) for ccdList in visitIdLists.values())
900 
901  if len(numExps) != 1 or len(numCcds) != 1:
902  # Presumably a visit somewhere doesn't have the full complement available.
903  # Dump the information so the user can figure it out.
904  self.log.warn("Number of visits for each CCD: %s",
905  {ccdName: len(ccdIdLists[ccdName]) for ccdName in ccdIdLists})
906  self.log.warn("Number of CCDs for each visit: %s",
907  {vv: len(visitIdLists[vv]) for vv in visitIdLists})
908  raise RuntimeError("Inconsistent number of exposures/CCDs")
909 
910  return numExps.pop(), numCcds.pop()
911 
912 
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def dictToTuple(dict_, keys)
Return a tuple of specific values from a dict.

◆ combine()

def lsst.pipe.drivers.constructCalibs.CalibTask.combine (   self,
  cache,
  struct,
  outputId 
)
inherited

Combine multiple exposures of a particular CCD and write the output.

    Only the slave nodes execute this method.

    @param cache  Process pool cache
    @param struct  Parameters for the combination, which has the following components:
        * ccdName     Name tuple for CCD
        * ccdIdList   List of data identifiers for combination
        * scales      Scales to apply (expScales are scalings for each exposure,
                           ccdScale is final scale for combined image)
    @param outputId    Data identifier for combined image (exposure part only)
    @return binned calib image

Reimplemented in lsst.pipe.drivers.constructCalibs.SkyTask.

Definition at line 726 of file constructCalibs.py.

726  def combine(self, cache, struct, outputId):
727  """!Combine multiple exposures of a particular CCD and write the output
728 
729  Only the slave nodes execute this method.
730 
731  @param cache Process pool cache
732  @param struct Parameters for the combination, which has the following components:
733  * ccdName Name tuple for CCD
734  * ccdIdList List of data identifiers for combination
735  * scales Scales to apply (expScales are scalings for each exposure,
736  ccdScale is final scale for combined image)
737  @param outputId Data identifier for combined image (exposure part only)
738  @return binned calib image
739  """
740  outputId = self.getFullyQualifiedOutputId(struct.ccdName, cache.butler, outputId)
741  dataRefList = [getDataRef(cache.butler, dataId) if dataId is not None else None for
742  dataId in struct.ccdIdList]
743  self.log.info("Combining %s on %s" % (outputId, NODE))
744  calib = self.combination.run(dataRefList, expScales=struct.scales.expScales,
745  finalScale=struct.scales.ccdScale)
746 
747  if not hasattr(calib, "getMetadata"):
748  if hasattr(calib, "getVariance"):
749  calib = afwImage.makeExposure(calib)
750  else:
751  calib = afwImage.DecoratedImageF(calib.getImage()) # n.b. hardwires "F" for the output type
752 
753  self.calculateOutputHeaderFromRaws(cache.butler, calib, struct.ccdIdList, outputId)
754 
755  self.updateMetadata(calib, self.exposureTime)
756 
757  self.recordCalibInputs(cache.butler, calib,
758  struct.ccdIdList, outputId)
759 
760  self.interpolateNans(calib)
761 
762  self.write(cache.butler, calib, outputId)
763 
764  return afwMath.binImage(calib.getImage(), self.config.binning)
765 
std::shared_ptr< Exposure< ImagePixelT, MaskPixelT, VariancePixelT > > makeExposure(MaskedImage< ImagePixelT, MaskPixelT, VariancePixelT > &mimage, std::shared_ptr< geom::SkyWcs const > wcs=std::shared_ptr< geom::SkyWcs const >())
A function to return an Exposure of the correct type (cf.
Definition: Exposure.h:462
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binX, int const binY, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:44
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:16

◆ getFilter()

def lsst.pipe.drivers.constructCalibs.CalibTask.getFilter (   self,
  butler,
  dataId 
)
inherited
Determine the filter from a data identifier

Definition at line 544 of file constructCalibs.py.

544  def getFilter(self, butler, dataId):
545  """Determine the filter from a data identifier"""
546  filt = butler.queryMetadata('raw', [self.config.filter], dataId)[0]
547  return filt
548 

◆ getFullyQualifiedOutputId()

def lsst.pipe.drivers.constructCalibs.CalibTask.getFullyQualifiedOutputId (   self,
  ccdName,
  butler,
  outputId 
)
inherited
Get fully-qualified output data identifier

We may need to look up keys that aren't in the output dataId.

@param ccdName  Name tuple for CCD
@param butler  Data butler
@param outputId  Data identifier for combined image (exposure part only)
@return fully-qualified output dataId

Definition at line 710 of file constructCalibs.py.

710  def getFullyQualifiedOutputId(self, ccdName, butler, outputId):
711  """Get fully-qualified output data identifier
712 
713  We may need to look up keys that aren't in the output dataId.
714 
715  @param ccdName Name tuple for CCD
716  @param butler Data butler
717  @param outputId Data identifier for combined image (exposure part only)
718  @return fully-qualified output dataId
719  """
720  fullOutputId = {k: ccdName[i] for i, k in enumerate(self.config.ccdKeys)}
721  fullOutputId.update(outputId)
722  self.addMissingKeys(fullOutputId, butler)
723  fullOutputId.update(outputId) # must be after the call to queryMetadata in 'addMissingKeys'
724  return fullOutputId
725 

◆ getMjd()

def lsst.pipe.drivers.constructCalibs.CalibTask.getMjd (   self,
  butler,
  dataId,
  timescale = dafBase.DateTime.UTC 
)
inherited
Determine the Modified Julian Date (MJD; in TAI) from a data identifier

Definition at line 531 of file constructCalibs.py.

531  def getMjd(self, butler, dataId, timescale=dafBase.DateTime.UTC):
532  """Determine the Modified Julian Date (MJD; in TAI) from a data identifier"""
533  if self.config.dateObs in dataId:
534  dateObs = dataId[self.config.dateObs]
535  else:
536  dateObs = butler.queryMetadata('raw', [self.config.dateObs], dataId)[0]
537  if "T" not in dateObs:
538  dateObs = dateObs + "T12:00:00.0Z"
539  elif not dateObs.endswith("Z"):
540  dateObs += "Z"
541 
542  return dafBase.DateTime(dateObs, timescale).get(dafBase.DateTime.MJD)
543 
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64

◆ getOutputId()

def lsst.pipe.drivers.constructCalibs.CalibTask.getOutputId (   self,
  expRefList,
  calibId 
)
inherited

Generate the data identifier for the output calib.

    The mean date and the common filter are included, using keywords
    from the configuration.  The CCD-specific part is not included
    in the data identifier.

    @param expRefList  List of data references at exposure level
    @param calibId  Data identifier elements for the calib provided by the user
    @return data identifier

Definition at line 496 of file constructCalibs.py.

496  def getOutputId(self, expRefList, calibId):
497  """!Generate the data identifier for the output calib
498 
499  The mean date and the common filter are included, using keywords
500  from the configuration. The CCD-specific part is not included
501  in the data identifier.
502 
503  @param expRefList List of data references at exposure level
504  @param calibId Data identifier elements for the calib provided by the user
505  @return data identifier
506  """
507  midTime = 0
508  filterName = None
509  for expRef in expRefList:
510  butler = expRef.getButler()
511  dataId = expRef.dataId
512 
513  midTime += self.getMjd(butler, dataId)
514  thisFilter = self.getFilter(
515  butler, dataId) if self.filterName is None else self.filterName
516  if filterName is None:
517  filterName = thisFilter
518  elif filterName != thisFilter:
519  raise RuntimeError("Filter mismatch for %s: %s vs %s" % (
520  dataId, thisFilter, filterName))
521 
522  midTime /= len(expRefList)
523  date = str(dafBase.DateTime(
524  midTime, dafBase.DateTime.MJD).toPython().date())
525 
526  outputId = {self.config.filter: filterName,
527  self.config.dateCalib: date}
528  outputId.update(calibId)
529  return outputId
530 

◆ interpolateNans()

def lsst.pipe.drivers.constructCalibs.CalibTask.interpolateNans (   self,
  image 
)
inherited
Interpolate over NANs in the combined image

NANs can result from masked areas on the CCD.  We don't want them getting
into our science images, so we replace them with the median of the image.

Definition at line 847 of file constructCalibs.py.

847  def interpolateNans(self, image):
848  """Interpolate over NANs in the combined image
849 
850  NANs can result from masked areas on the CCD. We don't want them getting
851  into our science images, so we replace them with the median of the image.
852  """
853  if hasattr(image, "getMaskedImage"): # Deal with Exposure vs Image
854  self.interpolateNans(image.getMaskedImage().getVariance())
855  image = image.getMaskedImage().getImage()
856  if hasattr(image, "getImage"): # Deal with DecoratedImage or MaskedImage vs Image
857  image = image.getImage()
858  array = image.getArray()
859  bad = np.isnan(array)
860  array[bad] = np.median(array[np.logical_not(bad)])
861 

◆ logOperation()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.logOperation (   self,
  operation,
  catch = False,
  trace = True 
)
inherited

Provide a context manager for logging an operation.

    @param operation: description of operation (string)
    @param catch: Catch all exceptions?
    @param trace: Log a traceback of caught exception?

    Note that if 'catch' is True, all exceptions are swallowed, but there may
    be other side-effects such as undefined variables.

Definition at line 502 of file parallel.py.

502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 

◆ makeCameraImage()

def lsst.pipe.drivers.constructCalibs.CalibTask.makeCameraImage (   self,
  camera,
  dataId,
  calibs 
)
inherited

Create and write an image of the entire camera.

    This is useful for judging the quality or getting an overview of
    the features of the calib.

    @param camera  Camera object
    @param dataId  Data identifier for output
    @param calibs  Dict mapping CCD detector ID to calib image

Definition at line 874 of file constructCalibs.py.

874  def makeCameraImage(self, camera, dataId, calibs):
875  """!Create and write an image of the entire camera
876 
877  This is useful for judging the quality or getting an overview of
878  the features of the calib.
879 
880  @param camera Camera object
881  @param dataId Data identifier for output
882  @param calibs Dict mapping CCD detector ID to calib image
883  """
884  return makeCameraImage(camera, calibs, self.config.binning)
885 
def makeCameraImage(camera, exposures, filename=None, binning=8)

◆ parseAndRun()

def lsst.ctrl.pool.parallel.BatchPoolTask.parseAndRun (   cls,
args,
**  kwargs 
)
inherited
Run with a MPI process pool

Definition at line 534 of file parallel.py.

534  def parseAndRun(cls, *args, **kwargs):
535  """Run with a MPI process pool"""
536  pool = startPool()
537  super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
538  pool.exit()
539 
540 
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216

◆ parseAndSubmit()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.parseAndSubmit (   cls,
  args = None,
**  kwargs 
)
inherited

Definition at line 435 of file parallel.py.

435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
def setBatchType(batchType)
Definition: pool.py:101

◆ process()

def lsst.pipe.drivers.constructCalibs.CalibTask.process (   self,
  cache,
  ccdId,
  outputName = "postISRCCD",
**  kwargs 
)
inherited

Process a CCD, specified by a data identifier.

    After processing, optionally returns a result (produced by
    the 'processResult' method) calculated from the processed
    exposure.  These results will be gathered by the master node,
    and is a means for coordinated scaling of all CCDs for flats,
    etc.

    Only slave nodes execute this method.

    @param cache  Process pool cache
    @param ccdId  Data identifier for CCD
    @param outputName  Output dataset name for butler
    @return result from 'processResult'

Definition at line 602 of file constructCalibs.py.

602  def process(self, cache, ccdId, outputName="postISRCCD", **kwargs):
603  """!Process a CCD, specified by a data identifier
604 
605  After processing, optionally returns a result (produced by
606  the 'processResult' method) calculated from the processed
607  exposure. These results will be gathered by the master node,
608  and is a means for coordinated scaling of all CCDs for flats,
609  etc.
610 
611  Only slave nodes execute this method.
612 
613  @param cache Process pool cache
614  @param ccdId Data identifier for CCD
615  @param outputName Output dataset name for butler
616  @return result from 'processResult'
617  """
618  if ccdId is None:
619  self.log.warn("Null identifier received on %s" % NODE)
620  return None
621  sensorRef = getDataRef(cache.butler, ccdId)
622  if self.config.clobber or not sensorRef.datasetExists(outputName):
623  self.log.info("Processing %s on %s" % (ccdId, NODE))
624  try:
625  exposure = self.processSingle(sensorRef, **kwargs)
626  except Exception as e:
627  self.log.warn("Unable to process %s: %s" % (ccdId, e))
628  raise
629  return None
630  self.processWrite(sensorRef, exposure)
631  else:
632  self.log.info(
633  "Using previously persisted processed exposure for %s" % (sensorRef.dataId,))
634  exposure = sensorRef.get(outputName)
635  return self.processResult(exposure)
636 

◆ processResult()

def lsst.pipe.drivers.constructCalibs.FlatTask.processResult (   self,
  exposure 
)
Extract processing results from a processed exposure

This method generates what is gathered by the master node.
This can be a background measurement or similar for scaling
flat-fields.  It must be picklable!

Only slave nodes execute this method.

Reimplemented from lsst.pipe.drivers.constructCalibs.CalibTask.

Definition at line 1039 of file constructCalibs.py.

1039  def processResult(self, exposure):
1040  return self.stats.run(exposure)
1041 

◆ processSingle()

def lsst.pipe.drivers.constructCalibs.CalibTask.processSingle (   self,
  dataRef 
)
inherited
Process a single CCD, specified by a data reference

Generally, this simply means doing ISR.

Only slave nodes execute this method.

Reimplemented in lsst.pipe.drivers.constructCalibs.FringeTask, and lsst.pipe.drivers.constructCalibs.DarkTask.

Definition at line 637 of file constructCalibs.py.

637  def processSingle(self, dataRef):
638  """Process a single CCD, specified by a data reference
639 
640  Generally, this simply means doing ISR.
641 
642  Only slave nodes execute this method.
643  """
644  return self.isr.runDataRef(dataRef).exposure
645 

◆ processWrite()

def lsst.pipe.drivers.constructCalibs.CalibTask.processWrite (   self,
  dataRef,
  exposure,
  outputName = "postISRCCD" 
)
inherited

Write the processed CCD.

    We need to write these out because we can't hold them all in
    memory at once.

    Only slave nodes execute this method.

    @param dataRef     Data reference
    @param exposure    CCD exposure to write
    @param outputName  Output dataset name for butler.

Definition at line 646 of file constructCalibs.py.

646  def processWrite(self, dataRef, exposure, outputName="postISRCCD"):
647  """!Write the processed CCD
648 
649  We need to write these out because we can't hold them all in
650  memory at once.
651 
652  Only slave nodes execute this method.
653 
654  @param dataRef Data reference
655  @param exposure CCD exposure to write
656  @param outputName Output dataset name for butler.
657  """
658  dataRef.put(exposure, outputName)
659 

◆ recordCalibInputs()

def lsst.pipe.drivers.constructCalibs.CalibTask.recordCalibInputs (   self,
  butler,
  calib,
  dataIdList,
  outputId 
)
inherited

Record metadata including the inputs and creation details.

    This metadata will go into the FITS header.

    @param butler  Data butler
    @param calib  Combined calib exposure.
    @param dataIdList  List of data identifiers for calibration inputs
    @param outputId  Data identifier for output

Definition at line 819 of file constructCalibs.py.

819  def recordCalibInputs(self, butler, calib, dataIdList, outputId):
820  """!Record metadata including the inputs and creation details
821 
822  This metadata will go into the FITS header.
823 
824  @param butler Data butler
825  @param calib Combined calib exposure.
826  @param dataIdList List of data identifiers for calibration inputs
827  @param outputId Data identifier for output
828  """
829  header = calib.getMetadata()
830  header.set("OBSTYPE", self.calibName) # Used by ingestCalibs.py
831 
832  # date, time, host, and root
833  now = time.localtime()
834  header.set("CALIB_CREATION_DATE", time.strftime("%Y-%m-%d", now))
835  header.set("CALIB_CREATION_TIME", time.strftime("%X %Z", now))
836 
837  # Inputs
838  visits = [str(dictToTuple(dataId, self.config.visitKeys)) for dataId in dataIdList if
839  dataId is not None]
840  for i, v in enumerate(sorted(set(visits))):
841  header.set("CALIB_INPUT_%d" % (i,), v)
842 
843  header.set("CALIB_ID", " ".join("%s=%s" % (key, value)
844  for key, value in outputId.items()))
845  checksum(calib, header)
846 
def checksum(obj, header=None, sumType="MD5")
Calculate a checksum of an object.
Definition: checksum.py:24

◆ runDataRef()

def lsst.pipe.drivers.constructCalibs.CalibTask.runDataRef (   self,
  expRefList,
  butler,
  calibId 
)
inherited

Construct a calib from a list of exposure references.

    This is the entry point, called by the TaskRunner.__call__

    Only the master node executes this method.

    @param expRefList  List of data references at the exposure level
    @param butler      Data butler
    @param calibId   Identifier dict for calib

Definition at line 422 of file constructCalibs.py.

422  def runDataRef(self, expRefList, butler, calibId):
423  """!Construct a calib from a list of exposure references
424 
425  This is the entry point, called by the TaskRunner.__call__
426 
427  Only the master node executes this method.
428 
429  @param expRefList List of data references at the exposure level
430  @param butler Data butler
431  @param calibId Identifier dict for calib
432  """
433  if len(expRefList) < 1:
434  raise RuntimeError("No valid input data")
435 
436  for expRef in expRefList:
437  self.addMissingKeys(expRef.dataId, butler, self.config.ccdKeys, 'raw')
438 
439  outputId = self.getOutputId(expRefList, calibId)
440  ccdIdLists = getCcdIdListFromExposures(
441  expRefList, level="sensor", ccdKeys=self.config.ccdKeys)
442  self.checkCcdIdLists(ccdIdLists)
443 
444  # Ensure we can generate filenames for each output
445  outputIdItemList = list(outputId.items())
446  for ccdName in ccdIdLists:
447  dataId = dict([(k, ccdName[i]) for i, k in enumerate(self.config.ccdKeys)])
448  dataId.update(outputIdItemList)
449  self.addMissingKeys(dataId, butler)
450  dataId.update(outputIdItemList)
451 
452  try:
453  butler.get(self.calibName + "_filename", dataId)
454  except Exception as e:
455  raise RuntimeError(
456  "Unable to determine output filename \"%s_filename\" from %s: %s" %
457  (self.calibName, dataId, e))
458 
459  processPool = Pool("process")
460  processPool.storeSet(butler=butler)
461 
462  # Scatter: process CCDs independently
463  data = self.scatterProcess(processPool, ccdIdLists)
464 
465  # Gather: determine scalings
466  scales = self.scale(ccdIdLists, data)
467 
468  combinePool = Pool("combine")
469  combinePool.storeSet(butler=butler)
470 
471  # Scatter: combine
472  calibs = self.scatterCombine(combinePool, outputId, ccdIdLists, scales)
473 
474  if self.config.doCameraImage:
475  camera = butler.get("camera")
476  # Convert indexing of calibs from "ccdName" to detector ID (as used by makeImageFromCamera)
477  calibs = {butler.get("postISRCCD_detector",
478  dict(zip(self.config.ccdKeys, ccdName))).getId(): calibs[ccdName]
479  for ccdName in ccdIdLists}
480 
481  try:
482  cameraImage = self.makeCameraImage(camera, outputId, calibs)
483  butler.put(cameraImage, self.calibName + "_camera", dataId)
484  except Exception as exc:
485  self.log.warn("Unable to create camera image: %s" % (exc,))
486 
487  return Struct(
488  outputId=outputId,
489  ccdIdLists=ccdIdLists,
490  scales=scales,
491  calibs=calibs,
492  processPool=processPool,
493  combinePool=combinePool,
494  )
495 
daf::base::PropertyList * list
Definition: fits.cc:913
def getCcdIdListFromExposures(expRefList, level="sensor", ccdKeys=["ccd"])
Determine a list of CCDs from exposure references.

◆ scale()

def lsst.pipe.drivers.constructCalibs.FlatTask.scale (   self,
  ccdIdLists,
  data 
)
Determine the scalings for the final combination

We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
of one CCD to all the others in an exposure, and E_j is the scaling
of the exposure.  We convert everything to logarithms so we can work
with a linear system.  We determine the C_i and E_j from B_ij by iteration,
under the additional constraint that the average CCD scale is unity.

This algorithm comes from Eugene Magnier and Pan-STARRS.

Reimplemented from lsst.pipe.drivers.constructCalibs.CalibTask.

Definition at line 1042 of file constructCalibs.py.

1042  def scale(self, ccdIdLists, data):
1043  """Determine the scalings for the final combination
1044 
1045  We have a matrix B_ij = C_i E_j, where C_i is the relative scaling
1046  of one CCD to all the others in an exposure, and E_j is the scaling
1047  of the exposure. We convert everything to logarithms so we can work
1048  with a linear system. We determine the C_i and E_j from B_ij by iteration,
1049  under the additional constraint that the average CCD scale is unity.
1050 
1051  This algorithm comes from Eugene Magnier and Pan-STARRS.
1052  """
1053  assert len(ccdIdLists.values()) > 0, "No successful CCDs"
1054  lengths = set([len(expList) for expList in ccdIdLists.values()])
1055  assert len(lengths) == 1, "Number of successful exposures for each CCD differs"
1056  assert tuple(lengths)[0] > 0, "No successful exposures"
1057  # Format background measurements into a matrix
1058  indices = dict((name, i) for i, name in enumerate(ccdIdLists))
1059  bgMatrix = np.array([[0.0] * len(expList) for expList in ccdIdLists.values()])
1060  for name in ccdIdLists:
1061  i = indices[name]
1062  bgMatrix[i] = [d if d is not None else np.nan for d in data[name]]
1063 
1064  numpyPrint = np.get_printoptions()
1065  np.set_printoptions(threshold=np.inf)
1066  self.log.info("Input backgrounds: %s" % bgMatrix)
1067 
1068  # Flat-field scaling
1069  numCcds = len(ccdIdLists)
1070  numExps = bgMatrix.shape[1]
1071  # log(Background) for each exposure/component
1072  bgMatrix = np.log(bgMatrix)
1073  bgMatrix = np.ma.masked_array(bgMatrix, ~np.isfinite(bgMatrix))
1074  # Initial guess at log(scale) for each component
1075  compScales = np.zeros(numCcds)
1076  expScales = np.array([(bgMatrix[:, i0] - compScales).mean() for i0 in range(numExps)])
1077 
1078  for iterate in range(self.config.iterations):
1079  compScales = np.array([(bgMatrix[i1, :] - expScales).mean() for i1 in range(numCcds)])
1080  bad = np.isnan(compScales)
1081  if np.any(bad):
1082  # Bad CCDs: just set them to the mean scale
1083  compScales[bad] = compScales[~bad].mean()
1084  expScales = np.array([(bgMatrix[:, i2] - compScales).mean() for i2 in range(numExps)])
1085 
1086  avgScale = np.average(np.exp(compScales))
1087  compScales -= np.log(avgScale)
1088  self.log.debug("Iteration %d exposure scales: %s", iterate, np.exp(expScales))
1089  self.log.debug("Iteration %d component scales: %s", iterate, np.exp(compScales))
1090 
1091  expScales = np.array([(bgMatrix[:, i3] - compScales).mean() for i3 in range(numExps)])
1092 
1093  if np.any(np.isnan(expScales)):
1094  raise RuntimeError("Bad exposure scales: %s --> %s" % (bgMatrix, expScales))
1095 
1096  expScales = np.exp(expScales)
1097  compScales = np.exp(compScales)
1098 
1099  self.log.info("Exposure scales: %s" % expScales)
1100  self.log.info("Component relative scaling: %s" % compScales)
1101  np.set_printoptions(**numpyPrint)
1102 
1103  return dict((ccdName, Struct(ccdScale=compScales[indices[ccdName]], expScales=expScales))
1104  for ccdName in ccdIdLists)
1105 
1106 
def scale(algorithm, min, max=None, frame=None)
Definition: ds9.py:108

◆ scatterCombine()

def lsst.pipe.drivers.constructCalibs.CalibTask.scatterCombine (   self,
  pool,
  outputId,
  ccdIdLists,
  scales 
)
inherited

Scatter the combination of exposures across multiple nodes.

    In this case, we can only scatter across as many nodes as
    there are CCDs.

    Only the master node executes this method.

    @param pool  Process pool
    @param outputId  Output identifier (exposure part only)
    @param ccdIdLists  Dict of data identifier lists for each CCD name
    @param scales  Dict of structs with scales, for each CCD name
    @param dict of binned images

Definition at line 690 of file constructCalibs.py.

690  def scatterCombine(self, pool, outputId, ccdIdLists, scales):
691  """!Scatter the combination of exposures across multiple nodes
692 
693  In this case, we can only scatter across as many nodes as
694  there are CCDs.
695 
696  Only the master node executes this method.
697 
698  @param pool Process pool
699  @param outputId Output identifier (exposure part only)
700  @param ccdIdLists Dict of data identifier lists for each CCD name
701  @param scales Dict of structs with scales, for each CCD name
702  @param dict of binned images
703  """
704  self.log.info("Scatter combination")
705  data = [Struct(ccdName=ccdName, ccdIdList=ccdIdLists[ccdName], scales=scales[ccdName]) for
706  ccdName in ccdIdLists]
707  images = pool.map(self.combine, data, outputId)
708  return dict(zip(ccdIdLists.keys(), images))
709 

◆ scatterProcess()

def lsst.pipe.drivers.constructCalibs.CalibTask.scatterProcess (   self,
  pool,
  ccdIdLists 
)
inherited

Scatter the processing among the nodes.

    We scatter each CCD independently (exposures aren't grouped together),
    to make full use of all available processors. This necessitates piecing
    everything back together in the same format as ccdIdLists afterwards.

    Only the master node executes this method.

    @param pool  Process pool
    @param ccdIdLists  Dict of data identifier lists for each CCD name
    @return Dict of lists of returned data for each CCD name

Reimplemented in lsst.pipe.drivers.constructCalibs.SkyTask.

Definition at line 586 of file constructCalibs.py.

586  def scatterProcess(self, pool, ccdIdLists):
587  """!Scatter the processing among the nodes
588 
589  We scatter each CCD independently (exposures aren't grouped together),
590  to make full use of all available processors. This necessitates piecing
591  everything back together in the same format as ccdIdLists afterwards.
592 
593  Only the master node executes this method.
594 
595  @param pool Process pool
596  @param ccdIdLists Dict of data identifier lists for each CCD name
597  @return Dict of lists of returned data for each CCD name
598  """
599  self.log.info("Scatter processing")
600  return mapToMatrix(pool, self.process, ccdIdLists)
601 
def mapToMatrix(pool, func, ccdIdLists, *args, **kwargs)

◆ updateMetadata()

def lsst.pipe.drivers.constructCalibs.CalibTask.updateMetadata (   self,
  calibImage,
  exposureTime,
  darkTime = None,
**  kwargs 
)
inherited

Update the metadata from the VisitInfo.

    @param calibImage       The image whose metadata is to be set
    @param exposureTime     The exposure time for the image
    @param darkTime         The time since the last read (default: exposureTime)

Definition at line 570 of file constructCalibs.py.

570  def updateMetadata(self, calibImage, exposureTime, darkTime=None, **kwargs):
571  """!Update the metadata from the VisitInfo
572 
573  @param calibImage The image whose metadata is to be set
574  @param exposureTime The exposure time for the image
575  @param darkTime The time since the last read (default: exposureTime)
576  """
577 
578  if darkTime is None:
579  darkTime = exposureTime # avoid warning messages when using calibration products
580 
581  visitInfo = afwImage.VisitInfo(exposureTime=exposureTime, darkTime=darkTime, **kwargs)
582  md = calibImage.getMetadata()
583 
584  afwImage.setVisitInfoMetadata(md, visitInfo)
585 
Information about a single exposure of an imaging camera.
Definition: VisitInfo.h:68

◆ write()

def lsst.pipe.drivers.constructCalibs.CalibTask.write (   self,
  butler,
  exposure,
  dataId 
)
inherited

Write the final combined calib.

    Only the slave nodes execute this method

    @param butler  Data butler
    @param exposure  CCD exposure to write
    @param dataId  Data identifier for output

Definition at line 862 of file constructCalibs.py.

862  def write(self, butler, exposure, dataId):
863  """!Write the final combined calib
864 
865  Only the slave nodes execute this method
866 
867  @param butler Data butler
868  @param exposure CCD exposure to write
869  @param dataId Data identifier for output
870  """
871  self.log.info("Writing %s on %s" % (dataId, NODE))
872  butler.put(exposure, self.calibName, dataId)
873 
void write(OutputArchiveHandle &handle) const override

Member Data Documentation

◆ calibName

string lsst.pipe.drivers.constructCalibs.FlatTask.calibName = "flat"
static

Definition at line 1027 of file constructCalibs.py.

◆ ConfigClass

lsst.pipe.drivers.constructCalibs.FlatTask.ConfigClass = FlatConfig
static

Definition at line 1025 of file constructCalibs.py.

◆ exposureTime

float lsst.pipe.drivers.constructCalibs.CalibTask.exposureTime = 1.0
staticinherited

Definition at line 401 of file constructCalibs.py.

◆ filterName

lsst.pipe.drivers.constructCalibs.CalibTask.filterName = None
staticinherited

Definition at line 399 of file constructCalibs.py.

◆ RunnerClass

lsst.pipe.drivers.constructCalibs.CalibTask.RunnerClass = CalibTaskRunner
staticinherited

Definition at line 398 of file constructCalibs.py.


The documentation for this class was generated from the following file: