LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
Public Member Functions | Static Public Attributes | List of all members
lsst.pipe.drivers.skyCorrection.SkyCorrectionTask Class Reference
Inheritance diagram for lsst.pipe.drivers.skyCorrection.SkyCorrectionTask:
lsst.ctrl.pool.parallel.BatchPoolTask lsst.ctrl.pool.parallel.BatchCmdLineTask

Public Member Functions

def runQuantum (self, butlerQC, inputRefs, outputRefs)
 
def __init__ (self, *args, **kwargs)
 
def batchWallTime (cls, time, parsedCmd, numCores)
 
def runDataRef (self, expRef)
 
def focalPlaneBackground (self, camera, pool, dataIdList, config)
 
def focalPlaneBackgroundRun (self, camera, cacheExposures, idList, config)
 
def run (self, calExpArray, calBkgArray, skyCalibs, camera)
 
def loadImage (self, cache, dataId)
 
def loadImageRun (self, calExp, calExpBkg)
 
def measureSkyFrame (self, cache, dataId)
 
def subtractSkyFrame (self, cache, dataId, scale)
 
def accumulateModel (self, cache, data)
 
def subtractModel (self, cache, dataId, bgModel)
 
def subtractModelRun (self, exposure, bgModel)
 
def realiseModel (self, cache, dataId, bgModel)
 
def collectBinnedImage (self, exposure, image)
 
def collect (self, cache)
 
def collectOriginal (self, cache, dataId)
 
def collectSky (self, cache, dataId)
 
def collectMask (self, cache, dataId)
 
def write (self, cache, dataId)
 
def parseAndRun (cls, *args, **kwargs)
 
def parseAndSubmit (cls, args=None, **kwargs)
 
def batchCommand (cls, args)
 Return command to run CmdLineTask. More...
 
def logOperation (self, operation, catch=False, trace=True)
 Provide a context manager for logging an operation. More...
 

Static Public Attributes

 ConfigClass = SkyCorrectionConfig
 

Detailed Description

Correct sky over entire focal plane

Definition at line 197 of file skyCorrection.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.__init__ (   self,
args,
**  kwargs 
)

Definition at line 220 of file skyCorrection.py.

220  def __init__(self, *args, **kwargs):
221  super().__init__(**kwargs)
222 
223  self.makeSubtask("sky")
224  self.makeSubtask("maskObjects")
225 

Member Function Documentation

◆ accumulateModel()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.accumulateModel (   self,
  cache,
  data 
)
Fit background model for CCD

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
data : `lsst.pipe.base.Struct`
    Data identifier, with `dataId` (data identifier) and `bgModel`
    (background model) elements.

Returns
-------
bgModel : `lsst.pipe.drivers.background.FocalPlaneBackground`
    Background model.

Definition at line 621 of file skyCorrection.py.

621  def accumulateModel(self, cache, data):
622  """Fit background model for CCD
623 
624  This method runs on the slave nodes.
625 
626  Parameters
627  ----------
628  cache : `lsst.pipe.base.Struct`
629  Process pool cache.
630  data : `lsst.pipe.base.Struct`
631  Data identifier, with `dataId` (data identifier) and `bgModel`
632  (background model) elements.
633 
634  Returns
635  -------
636  bgModel : `lsst.pipe.drivers.background.FocalPlaneBackground`
637  Background model.
638  """
639  assert cache.dataId == data.dataId
640  data.bgModel.addCcd(cache.exposure)
641  return data.bgModel
642 

◆ batchCommand()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchCommand (   cls,
  args 
)
inherited

Return command to run CmdLineTask.

    @param cls: Class
    @param args: Parsed batch job arguments (from BatchArgumentParser)

Definition at line 476 of file parallel.py.

476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
def shCommandFromArgs(args)
Definition: parallel.py:42

◆ batchWallTime()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.batchWallTime (   cls,
  time,
  parsedCmd,
  numCores 
)
Return walltime request for batch job

Subclasses should override if the walltime should be calculated
differently (e.g., addition of some serial time).

Parameters
----------
time : `float`
    Requested time per iteration.
parsedCmd : `argparse.Namespace`
    Results of argument parsing.
numCores : `int`
    Number of cores.

Reimplemented from lsst.ctrl.pool.parallel.BatchCmdLineTask.

Definition at line 236 of file skyCorrection.py.

236  def batchWallTime(cls, time, parsedCmd, numCores):
237  """Return walltime request for batch job
238 
239  Subclasses should override if the walltime should be calculated
240  differently (e.g., addition of some serial time).
241 
242  Parameters
243  ----------
244  time : `float`
245  Requested time per iteration.
246  parsedCmd : `argparse.Namespace`
247  Results of argument parsing.
248  numCores : `int`
249  Number of cores.
250  """
251  numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
252  return time*numTargets
253 

◆ collect()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.collect (   self,
  cache 
)
Collect exposure for potential visualisation

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.MaskedImage`
    Binned image.

Definition at line 752 of file skyCorrection.py.

752  def collect(self, cache):
753  """Collect exposure for potential visualisation
754 
755  This method runs on the slave nodes.
756 
757  Parameters
758  ----------
759  cache : `lsst.pipe.base.Struct`
760  Process pool cache.
761 
762  Returns
763  -------
764  detId : `int`
765  Detector identifier.
766  image : `lsst.afw.image.MaskedImage`
767  Binned image.
768  """
769  return self.collectBinnedImage(cache.exposure, cache.exposure.maskedImage)
770 

◆ collectBinnedImage()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.collectBinnedImage (   self,
  exposure,
  image 
)
Return the binned image required for visualization

This method just helps to cut down on boilerplate.

Parameters
----------
image : `lsst.afw.image.MaskedImage`
    Image to go into visualisation.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.MaskedImage`
    Binned image.

Definition at line 733 of file skyCorrection.py.

733  def collectBinnedImage(self, exposure, image):
734  """Return the binned image required for visualization
735 
736  This method just helps to cut down on boilerplate.
737 
738  Parameters
739  ----------
740  image : `lsst.afw.image.MaskedImage`
741  Image to go into visualisation.
742 
743  Returns
744  -------
745  detId : `int`
746  Detector identifier.
747  image : `lsst.afw.image.MaskedImage`
748  Binned image.
749  """
750  return (exposure.getDetector().getId(), afwMath.binImage(image, self.config.binning))
751 
std::shared_ptr< ImageT > binImage(ImageT const &inImage, int const binX, int const binY, lsst::afw::math::Property const flags=lsst::afw::math::MEAN)
Definition: binImage.cc:44

◆ collectMask()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.collectMask (   self,
  cache,
  dataId 
)
Collect mask for visualisation

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.Image`
    Binned image.

Definition at line 814 of file skyCorrection.py.

814  def collectMask(self, cache, dataId):
815  """Collect mask for visualisation
816 
817  This method runs on the slave nodes.
818 
819  Parameters
820  ----------
821  cache : `lsst.pipe.base.Struct`
822  Process pool cache.
823  dataId : `dict`
824  Data identifier.
825 
826  Returns
827  -------
828  detId : `int`
829  Detector identifier.
830  image : `lsst.afw.image.Image`
831  Binned image.
832  """
833  # Convert Mask to floating-point image, because that's what's required for focal plane construction
834  image = afwImage.ImageF(cache.exposure.maskedImage.getBBox())
835  image.array[:] = cache.exposure.maskedImage.mask.array
836  return self.collectBinnedImage(cache.exposure, image)
837 

◆ collectOriginal()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.collectOriginal (   self,
  cache,
  dataId 
)
Collect original image for visualisation

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.MaskedImage`
    Binned image.

Definition at line 771 of file skyCorrection.py.

771  def collectOriginal(self, cache, dataId):
772  """Collect original image for visualisation
773 
774  This method runs on the slave nodes.
775 
776  Parameters
777  ----------
778  cache : `lsst.pipe.base.Struct`
779  Process pool cache.
780  dataId : `dict`
781  Data identifier.
782 
783  Returns
784  -------
785  detId : `int`
786  Detector identifier.
787  image : `lsst.afw.image.MaskedImage`
788  Binned image.
789  """
790  exposure = cache.butler.get("calexp", dataId, immediate=True)
791  return self.collectBinnedImage(exposure, exposure.maskedImage)
792 

◆ collectSky()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.collectSky (   self,
  cache,
  dataId 
)
Collect original image for visualisation

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.MaskedImage`
    Binned image.

Definition at line 793 of file skyCorrection.py.

793  def collectSky(self, cache, dataId):
794  """Collect original image for visualisation
795 
796  This method runs on the slave nodes.
797 
798  Parameters
799  ----------
800  cache : `lsst.pipe.base.Struct`
801  Process pool cache.
802  dataId : `dict`
803  Data identifier.
804 
805  Returns
806  -------
807  detId : `int`
808  Detector identifier.
809  image : `lsst.afw.image.MaskedImage`
810  Binned image.
811  """
812  return self.collectBinnedImage(cache.exposure, cache.sky.getImage())
813 

◆ focalPlaneBackground()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.focalPlaneBackground (   self,
  camera,
  pool,
  dataIdList,
  config 
)
Perform full focal-plane background subtraction

This method runs on the master node.

Parameters
----------
camera : `lsst.afw.cameraGeom.Camera`
    Camera description.
pool : `lsst.ctrl.pool.Pool`
    Process pool.
dataIdList : iterable of `dict`
    List of data identifiers for the CCDs.
config : `lsst.pipe.drivers.background.FocalPlaneBackgroundConfig`
    Configuration to use for background subtraction.

Returns
-------
exposures : `list` of `lsst.afw.image.Image`
    List of binned images, for creating focal plane image.

Definition at line 321 of file skyCorrection.py.

321  def focalPlaneBackground(self, camera, pool, dataIdList, config):
322  """Perform full focal-plane background subtraction
323 
324  This method runs on the master node.
325 
326  Parameters
327  ----------
328  camera : `lsst.afw.cameraGeom.Camera`
329  Camera description.
330  pool : `lsst.ctrl.pool.Pool`
331  Process pool.
332  dataIdList : iterable of `dict`
333  List of data identifiers for the CCDs.
334  config : `lsst.pipe.drivers.background.FocalPlaneBackgroundConfig`
335  Configuration to use for background subtraction.
336 
337  Returns
338  -------
339  exposures : `list` of `lsst.afw.image.Image`
340  List of binned images, for creating focal plane image.
341  """
342  bgModel = FocalPlaneBackground.fromCamera(config, camera)
343  data = [pipeBase.Struct(dataId=dataId, bgModel=bgModel.clone()) for dataId in dataIdList]
344  bgModelList = pool.mapToPrevious(self.accumulateModel, data)
345  for ii, bg in enumerate(bgModelList):
346  self.log.info("Background %d: %d pixels", ii, bg._numbers.array.sum())
347  bgModel.merge(bg)
348  return pool.mapToPrevious(self.subtractModel, dataIdList, bgModel)
349 

◆ focalPlaneBackgroundRun()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.focalPlaneBackgroundRun (   self,
  camera,
  cacheExposures,
  idList,
  config 
)
Perform full focal-plane background subtraction

This method runs on the master node.

Parameters
----------
camera : `lsst.afw.cameraGeom.Camera`
    Camera description.
cacheExposures : `list` of `lsst.afw.image.Exposures`
    List of loaded and processed input calExp.
idList : `list` of `int`
    List of detector ids to iterate over.
config : `lsst.pipe.drivers.background.FocalPlaneBackgroundConfig`
    Configuration to use for background subtraction.

Returns
-------
exposures : `list` of `lsst.afw.image.Image`
    List of binned images, for creating focal plane image.
newCacheBgList : `list` of `lsst.afwMath.backgroundList`
    Background lists generated.
cacheBgModel : `FocalPlaneBackground`
    Full focal plane background model.

Definition at line 350 of file skyCorrection.py.

350  def focalPlaneBackgroundRun(self, camera, cacheExposures, idList, config):
351  """Perform full focal-plane background subtraction
352 
353  This method runs on the master node.
354 
355  Parameters
356  ----------
357  camera : `lsst.afw.cameraGeom.Camera`
358  Camera description.
359  cacheExposures : `list` of `lsst.afw.image.Exposures`
360  List of loaded and processed input calExp.
361  idList : `list` of `int`
362  List of detector ids to iterate over.
363  config : `lsst.pipe.drivers.background.FocalPlaneBackgroundConfig`
364  Configuration to use for background subtraction.
365 
366  Returns
367  -------
368  exposures : `list` of `lsst.afw.image.Image`
369  List of binned images, for creating focal plane image.
370  newCacheBgList : `list` of `lsst.afwMath.backgroundList`
371  Background lists generated.
372  cacheBgModel : `FocalPlaneBackground`
373  Full focal plane background model.
374  """
375  bgModel = FocalPlaneBackground.fromCamera(config, camera)
376  data = [pipeBase.Struct(id=id, bgModel=bgModel.clone()) for id in idList]
377 
378  bgModelList = []
379  for nodeData, cacheExp in zip(data, cacheExposures):
380  nodeData.bgModel.addCcd(cacheExp)
381  bgModelList.append(nodeData.bgModel)
382 
383  for ii, bg in enumerate(bgModelList):
384  self.log.info("Background %d: %d pixels", ii, bg._numbers.getArray().sum())
385  bgModel.merge(bg)
386 
387  exposures = []
388  newCacheBgList = []
389  cacheBgModel = []
390  for cacheExp in cacheExposures:
391  nodeExp, nodeBgModel, nodeBgList = self.subtractModelRun(cacheExp, bgModel)
392  exposures.append(afwMath.binImage(nodeExp.getMaskedImage(), self.config.binning))
393  cacheBgModel.append(nodeBgModel)
394  newCacheBgList.append(nodeBgList)
395 
396  return exposures, newCacheBgList, cacheBgModel
397 

◆ loadImage()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.loadImage (   self,
  cache,
  dataId 
)
Load original image and restore the sky

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Returns
-------
exposure : `lsst.afw.image.Exposure`
    Resultant exposure.

Definition at line 504 of file skyCorrection.py.

504  def loadImage(self, cache, dataId):
505  """Load original image and restore the sky
506 
507  This method runs on the slave nodes.
508 
509  Parameters
510  ----------
511  cache : `lsst.pipe.base.Struct`
512  Process pool cache.
513  dataId : `dict`
514  Data identifier.
515 
516  Returns
517  -------
518  exposure : `lsst.afw.image.Exposure`
519  Resultant exposure.
520  """
521  cache.dataId = dataId
522  cache.exposure = cache.butler.get(self.config.calexpType, dataId, immediate=True).clone()
523  bgOld = cache.butler.get("calexpBackground", dataId, immediate=True)
524  image = cache.exposure.getMaskedImage()
525 
526  # We're removing the old background, so change the sense of all its components
527  for bgData in bgOld:
528  statsImage = bgData[0].getStatsImage()
529  statsImage *= -1
530 
531  image -= bgOld.getImage()
532  cache.bgList = afwMath.BackgroundList()
533  for bgData in bgOld:
534  cache.bgList.append(bgData)
535 
536  if self.config.doMaskObjects:
537  self.maskObjects.findObjects(cache.exposure)
538 
539  return self.collect(cache)
540 

◆ loadImageRun()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.loadImageRun (   self,
  calExp,
  calExpBkg 
)
Serial implementation of self.loadImage() for Gen3.

Load and restore background to calExp and calExpBkg.

Parameters
----------
calExp : `lsst.afw.image.Exposure`
    Detector level calExp image to process.
calExpBkg : `lsst.afw.math.BackgroundList`
    Detector level background list associated with the calExp.

Returns
-------
calExp : `lsst.afw.image.Exposure`
    Background restored calExp.
bgList : `lsst.afw.math.BackgroundList`
    New background list containing the restoration background.

Definition at line 541 of file skyCorrection.py.

541  def loadImageRun(self, calExp, calExpBkg):
542  """Serial implementation of self.loadImage() for Gen3.
543 
544  Load and restore background to calExp and calExpBkg.
545 
546  Parameters
547  ----------
548  calExp : `lsst.afw.image.Exposure`
549  Detector level calExp image to process.
550  calExpBkg : `lsst.afw.math.BackgroundList`
551  Detector level background list associated with the calExp.
552 
553  Returns
554  -------
555  calExp : `lsst.afw.image.Exposure`
556  Background restored calExp.
557  bgList : `lsst.afw.math.BackgroundList`
558  New background list containing the restoration background.
559  """
560  image = calExp.getMaskedImage()
561 
562  for bgOld in calExpBkg:
563  statsImage = bgOld[0].getStatsImage()
564  statsImage *= -1
565 
566  image -= calExpBkg.getImage()
567  bgList = afwMath.BackgroundList()
568  for bgData in calExpBkg:
569  bgList.append(bgData)
570 
571  if self.config.doMaskObjects:
572  self.maskObjects.findObjects(calExp)
573 
574  return (calExp, bgList)
575 

◆ logOperation()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.logOperation (   self,
  operation,
  catch = False,
  trace = True 
)
inherited

Provide a context manager for logging an operation.

    @param operation: description of operation (string)
    @param catch: Catch all exceptions?
    @param trace: Log a traceback of caught exception?

    Note that if 'catch' is True, all exceptions are swallowed, but there may
    be other side-effects such as undefined variables.

Definition at line 502 of file parallel.py.

502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 

◆ measureSkyFrame()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.measureSkyFrame (   self,
  cache,
  dataId 
)
Measure scale for sky frame

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Returns
-------
scale : `float`
    Scale for sky frame.

Definition at line 576 of file skyCorrection.py.

576  def measureSkyFrame(self, cache, dataId):
577  """Measure scale for sky frame
578 
579  This method runs on the slave nodes.
580 
581  Parameters
582  ----------
583  cache : `lsst.pipe.base.Struct`
584  Process pool cache.
585  dataId : `dict`
586  Data identifier.
587 
588  Returns
589  -------
590  scale : `float`
591  Scale for sky frame.
592  """
593  assert cache.dataId == dataId
594  cache.sky = self.sky.getSkyData(cache.butler, dataId)
595  scale = self.sky.measureScale(cache.exposure.getMaskedImage(), cache.sky)
596  return scale
597 

◆ parseAndRun()

def lsst.ctrl.pool.parallel.BatchPoolTask.parseAndRun (   cls,
args,
**  kwargs 
)
inherited
Run with a MPI process pool

Definition at line 534 of file parallel.py.

534  def parseAndRun(cls, *args, **kwargs):
535  """Run with a MPI process pool"""
536  pool = startPool()
537  super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
538  pool.exit()
539 
540 
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216

◆ parseAndSubmit()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.parseAndSubmit (   cls,
  args = None,
**  kwargs 
)
inherited

Definition at line 435 of file parallel.py.

435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
def setBatchType(batchType)
Definition: pool.py:101

◆ realiseModel()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.realiseModel (   self,
  cache,
  dataId,
  bgModel 
)
Generate an image of the background model for visualisation

Useful for debugging.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.
bgModel : `lsst.pipe.drivers.background.FocalPlaneBackround`
    Background model.

Returns
-------
detId : `int`
    Detector identifier.
image : `lsst.afw.image.MaskedImage`
    Binned background model image.

Definition at line 705 of file skyCorrection.py.

705  def realiseModel(self, cache, dataId, bgModel):
706  """Generate an image of the background model for visualisation
707 
708  Useful for debugging.
709 
710  Parameters
711  ----------
712  cache : `lsst.pipe.base.Struct`
713  Process pool cache.
714  dataId : `dict`
715  Data identifier.
716  bgModel : `lsst.pipe.drivers.background.FocalPlaneBackround`
717  Background model.
718 
719  Returns
720  -------
721  detId : `int`
722  Detector identifier.
723  image : `lsst.afw.image.MaskedImage`
724  Binned background model image.
725  """
726  assert cache.dataId == dataId
727  exposure = cache.exposure
728  detector = exposure.getDetector()
729  bbox = exposure.getMaskedImage().getBBox()
730  image = bgModel.toCcdBackground(detector, bbox).getImage()
731  return self.collectBinnedImage(exposure, image)
732 

◆ run()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.run (   self,
  calExpArray,
  calBkgArray,
  skyCalibs,
  camera 
)
Duplicate runDataRef method without ctrl_pool for Gen3.

Parameters
----------
calExpArray : `list` of `lsst.afw.image.Exposure`
    Array of detector input calExp images for the exposure to
    process.
calBkgArray : `list` of `lsst.afw.math.BackgroundList`
    Array of detector input background lists matching the
    calExps to process.
skyCalibs : `list` of `lsst.afw.image.Exposure`
    Array of SKY calibrations for the input detectors to be
    processed.
camera : `lsst.afw.cameraGeom.Camera`
    Camera matching the input data to process.

Returns
-------
results : `pipeBase.Struct` containing
    calExpCamera : `lsst.afw.image.Exposure`
        Full camera image of the sky-corrected data.
    skyCorr : `list` of `lsst.afw.math.BackgroundList`
        Detector-level sky-corrected background lists.

See Also
--------
~lsst.pipe.drivers.SkyCorrectionTask.runDataRef()

Definition at line 398 of file skyCorrection.py.

398  def run(self, calExpArray, calBkgArray, skyCalibs, camera):
399  """Duplicate runDataRef method without ctrl_pool for Gen3.
400 
401  Parameters
402  ----------
403  calExpArray : `list` of `lsst.afw.image.Exposure`
404  Array of detector input calExp images for the exposure to
405  process.
406  calBkgArray : `list` of `lsst.afw.math.BackgroundList`
407  Array of detector input background lists matching the
408  calExps to process.
409  skyCalibs : `list` of `lsst.afw.image.Exposure`
410  Array of SKY calibrations for the input detectors to be
411  processed.
412  camera : `lsst.afw.cameraGeom.Camera`
413  Camera matching the input data to process.
414 
415  Returns
416  -------
417  results : `pipeBase.Struct` containing
418  calExpCamera : `lsst.afw.image.Exposure`
419  Full camera image of the sky-corrected data.
420  skyCorr : `list` of `lsst.afw.math.BackgroundList`
421  Detector-level sky-corrected background lists.
422 
423  See Also
424  --------
425  ~lsst.pipe.drivers.SkyCorrectionTask.runDataRef()
426  """
427  # To allow SkyCorrectionTask to run in the Gen3 butler
428  # environment, a new run() method was added that performs the
429  # same operations in a serial environment (pipetask processing
430  # does not support MPI processing as of 2019-05-03). Methods
431  # used in runDataRef() are used as appropriate in run(), but
432  # some have been rewritten in serial form. Please ensure that
433  # any updates to runDataRef() or the methods it calls with
434  # pool.mapToPrevious() are duplicated in run() and its
435  # methods.
436  #
437  # Variable names here should match those in runDataRef() as
438  # closely as possible. Variables matching data stored in the
439  # pool cache have a prefix indicating this. Variables that
440  # would be local to an MPI processing client have a prefix
441  # "node".
442  idList = [exp.getDetector().getId() for exp in calExpArray]
443 
444  # Construct arrays that match the cache in self.runDataRef() after
445  # self.loadImage() is map/reduced.
446  cacheExposures = []
447  cacheBgList = []
448  exposures = []
449  for calExp, calBgModel in zip(calExpArray, calBkgArray):
450  nodeExp, nodeBgList = self.loadImageRun(calExp, calBgModel)
451  cacheExposures.append(nodeExp)
452  cacheBgList.append(nodeBgList)
453  exposures.append(afwMath.binImage(nodeExp.getMaskedImage(), self.config.binning))
454 
455  if self.config.doBgModel:
456  # Generate focal plane background, updating backgrounds in the "cache".
457  exposures, newCacheBgList, cacheBgModel = self.focalPlaneBackgroundRun(
458  camera, cacheExposures, idList, self.config.bgModel
459  )
460  for cacheBg, newBg in zip(cacheBgList, newCacheBgList):
461  cacheBg.append(newBg)
462 
463  if self.config.doSky:
464  # Measure the sky frame scale on all inputs. Results in
465  # values equal to self.measureSkyFrame() and
466  # self.sky.solveScales() in runDataRef().
467  cacheSky = []
468  measScales = []
469  for cacheExp, skyCalib in zip(cacheExposures, skyCalibs):
470  skyExp = self.sky.exposureToBackground(skyCalib)
471  cacheSky.append(skyExp)
472  scale = self.sky.measureScale(cacheExp.getMaskedImage(), skyExp)
473  measScales.append(scale)
474 
475  scale = self.sky.solveScales(measScales)
476  self.log.info("Sky frame scale: %s" % (scale, ))
477 
478  # Subtract sky frame, as in self.subtractSkyFrame(), with
479  # appropriate scale from the "cache".
480  exposures = []
481  newBgList = []
482  for cacheExp, nodeSky, nodeBgList in zip(cacheExposures, cacheSky, cacheBgList):
483  self.sky.subtractSkyFrame(cacheExp.getMaskedImage(), nodeSky, scale, nodeBgList)
484  exposures.append(afwMath.binImage(cacheExp.getMaskedImage(), self.config.binning))
485 
486  if self.config.doBgModel2:
487  # As above, generate a focal plane background model and
488  # update the cache models.
489  exposures, newBgList, cacheBgModel = self.focalPlaneBackgroundRun(
490  camera, cacheExposures, idList, self.config.bgModel2
491  )
492  for cacheBg, newBg in zip(cacheBgList, newBgList):
493  cacheBg.append(newBg)
494 
495  # Generate camera-level image of calexp and return it along
496  # with the list of sky corrected background models.
497  image = makeCameraImage(camera, zip(idList, exposures))
498 
499  return pipeBase.Struct(
500  calExpCamera=image,
501  skyCorr=cacheBgList,
502  )
503 
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def makeCameraImage(camera, exposures, filename=None, binning=8)

◆ runDataRef()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.runDataRef (   self,
  expRef 
)
Perform sky correction on an exposure

We restore the original sky, and remove it again using multiple
algorithms. We optionally apply:

1. A large-scale background model.
    This step removes very-large-scale sky such as moonlight.
2. A sky frame.
3. A medium-scale background model.
    This step removes residual sky (This is smooth on the focal plane).

Only the master node executes this method. The data is held on
the slave nodes, which do all the hard work.

Parameters
----------
expRef : `lsst.daf.persistence.ButlerDataRef`
    Data reference for exposure.

See Also
--------
~lsst.pipe.drivers.SkyCorrectionTask.run

Definition at line 254 of file skyCorrection.py.

254  def runDataRef(self, expRef):
255  """Perform sky correction on an exposure
256 
257  We restore the original sky, and remove it again using multiple
258  algorithms. We optionally apply:
259 
260  1. A large-scale background model.
261  This step removes very-large-scale sky such as moonlight.
262  2. A sky frame.
263  3. A medium-scale background model.
264  This step removes residual sky (This is smooth on the focal plane).
265 
266  Only the master node executes this method. The data is held on
267  the slave nodes, which do all the hard work.
268 
269  Parameters
270  ----------
271  expRef : `lsst.daf.persistence.ButlerDataRef`
272  Data reference for exposure.
273 
274  See Also
275  --------
276  ~lsst.pipe.drivers.SkyCorrectionTask.run
277  """
278  if DEBUG:
279  extension = "-%(visit)d.fits" % expRef.dataId
280 
281  with self.logOperation("processing %s" % (expRef.dataId,)):
282  pool = Pool()
283  pool.cacheClear()
284  pool.storeSet(butler=expRef.getButler())
285  camera = expRef.get("camera")
286 
287  dataIdList = [ccdRef.dataId for ccdRef in expRef.subItems("ccd") if
288  ccdRef.datasetExists(self.config.calexpType)]
289 
290  exposures = pool.map(self.loadImage, dataIdList)
291  if DEBUG:
292  makeCameraImage(camera, exposures, "restored" + extension)
293  exposures = pool.mapToPrevious(self.collectOriginal, dataIdList)
294  makeCameraImage(camera, exposures, "original" + extension)
295  exposures = pool.mapToPrevious(self.collectMask, dataIdList)
296  makeCameraImage(camera, exposures, "mask" + extension)
297 
298  if self.config.doBgModel:
299  exposures = self.focalPlaneBackground(camera, pool, dataIdList, self.config.bgModel)
300 
301  if self.config.doSky:
302  measScales = pool.mapToPrevious(self.measureSkyFrame, dataIdList)
303  scale = self.sky.solveScales(measScales)
304  self.log.info("Sky frame scale: %s" % (scale,))
305 
306  exposures = pool.mapToPrevious(self.subtractSkyFrame, dataIdList, scale)
307  if DEBUG:
308  makeCameraImage(camera, exposures, "skysub" + extension)
309  calibs = pool.mapToPrevious(self.collectSky, dataIdList)
310  makeCameraImage(camera, calibs, "sky" + extension)
311 
312  if self.config.doBgModel2:
313  exposures = self.focalPlaneBackground(camera, pool, dataIdList, self.config.bgModel2)
314 
315  # Persist camera-level image of calexp
316  image = makeCameraImage(camera, exposures)
317  expRef.put(image, "calexp_camera")
318 
319  pool.mapToPrevious(self.write, dataIdList)
320 

◆ runQuantum()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.runQuantum (   self,
  butlerQC,
  inputRefs,
  outputRefs 
)

Definition at line 202 of file skyCorrection.py.

202  def runQuantum(self, butlerQC, inputRefs, outputRefs):
203 
204  # reorder skyCalibs and calBkgArray per calExpArray
205  detectorOrder = [ref.dataId['detector'] for ref in inputRefs.calExpArray]
206  inputRefs.skyCalibs = reorderAndPadList(inputRefs.skyCalibs,
207  [ref.dataId['detector'] for ref in inputRefs.skyCalibs],
208  detectorOrder)
209  inputRefs.calBkgArray = reorderAndPadList(inputRefs.calBkgArray,
210  [ref.dataId['detector'] for ref in inputRefs.calBkgArray],
211  detectorOrder)
212  outputRefs.skyCorr = reorderAndPadList(outputRefs.skyCorr,
213  [ref.dataId['detector'] for ref in outputRefs.skyCorr],
214  detectorOrder)
215  inputs = butlerQC.get(inputRefs)
216  inputs.pop("rawLinker", None)
217  outputs = self.run(**inputs)
218  butlerQC.put(outputs, outputRefs)
219 
def reorderAndPadList(inputList, inputKeys, outputKeys, padWith=None)

◆ subtractModel()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.subtractModel (   self,
  cache,
  dataId,
  bgModel 
)
Subtract background model

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.
bgModel : `lsst.pipe.drivers.background.FocalPlaneBackround`
    Background model.

Returns
-------
exposure : `lsst.afw.image.Exposure`
    Resultant exposure.

Definition at line 643 of file skyCorrection.py.

643  def subtractModel(self, cache, dataId, bgModel):
644  """Subtract background model
645 
646  This method runs on the slave nodes.
647 
648  Parameters
649  ----------
650  cache : `lsst.pipe.base.Struct`
651  Process pool cache.
652  dataId : `dict`
653  Data identifier.
654  bgModel : `lsst.pipe.drivers.background.FocalPlaneBackround`
655  Background model.
656 
657  Returns
658  -------
659  exposure : `lsst.afw.image.Exposure`
660  Resultant exposure.
661  """
662  assert cache.dataId == dataId
663  exposure = cache.exposure
664  image = exposure.getMaskedImage()
665  detector = exposure.getDetector()
666  bbox = image.getBBox()
667  try:
668  cache.bgModel = bgModel.toCcdBackground(detector, bbox)
669  image -= cache.bgModel.getImage()
670  except RuntimeError:
671  self.log.error(f"There was an error processing {dataId}, no calib file produced")
672  return
673  cache.bgList.append(cache.bgModel[0])
674  return self.collect(cache)
675 

◆ subtractModelRun()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.subtractModelRun (   self,
  exposure,
  bgModel 
)
Serial implementation of self.subtractModel() for Gen3.

Load and restore background to calExp and calExpBkg.

Parameters
----------
exposure : `lsst.afw.image.Exposure`
    Exposure to subtract the background model from.
bgModel : `lsst.pipe.drivers.background.FocalPlaneBackground`
    Full camera level background model.

Returns
-------
exposure : `lsst.afw.image.Exposure`
    Background subtracted input exposure.
bgModelCcd : `lsst.afw.math.BackgroundList`
    Detector level realization of the full background model.
bgModelMaskedImage : `lsst.afw.image.MaskedImage`
    Background model from the bgModelCcd realization.

Definition at line 676 of file skyCorrection.py.

676  def subtractModelRun(self, exposure, bgModel):
677  """Serial implementation of self.subtractModel() for Gen3.
678 
679  Load and restore background to calExp and calExpBkg.
680 
681  Parameters
682  ----------
683  exposure : `lsst.afw.image.Exposure`
684  Exposure to subtract the background model from.
685  bgModel : `lsst.pipe.drivers.background.FocalPlaneBackground`
686  Full camera level background model.
687 
688  Returns
689  -------
690  exposure : `lsst.afw.image.Exposure`
691  Background subtracted input exposure.
692  bgModelCcd : `lsst.afw.math.BackgroundList`
693  Detector level realization of the full background model.
694  bgModelMaskedImage : `lsst.afw.image.MaskedImage`
695  Background model from the bgModelCcd realization.
696  """
697  image = exposure.getMaskedImage()
698  detector = exposure.getDetector()
699  bbox = image.getBBox()
700  bgModelCcd = bgModel.toCcdBackground(detector, bbox)
701  image -= bgModelCcd.getImage()
702 
703  return (exposure, bgModelCcd, bgModelCcd[0])
704 

◆ subtractSkyFrame()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.subtractSkyFrame (   self,
  cache,
  dataId,
  scale 
)
Subtract sky frame

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.
scale : `float`
    Scale for sky frame.

Returns
-------
exposure : `lsst.afw.image.Exposure`
    Resultant exposure.

Definition at line 598 of file skyCorrection.py.

598  def subtractSkyFrame(self, cache, dataId, scale):
599  """Subtract sky frame
600 
601  This method runs on the slave nodes.
602 
603  Parameters
604  ----------
605  cache : `lsst.pipe.base.Struct`
606  Process pool cache.
607  dataId : `dict`
608  Data identifier.
609  scale : `float`
610  Scale for sky frame.
611 
612  Returns
613  -------
614  exposure : `lsst.afw.image.Exposure`
615  Resultant exposure.
616  """
617  assert cache.dataId == dataId
618  self.sky.subtractSkyFrame(cache.exposure.getMaskedImage(), cache.sky, scale, cache.bgList)
619  return self.collect(cache)
620 

◆ write()

def lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.write (   self,
  cache,
  dataId 
)
Write resultant background list

This method runs on the slave nodes.

Parameters
----------
cache : `lsst.pipe.base.Struct`
    Process pool cache.
dataId : `dict`
    Data identifier.

Definition at line 838 of file skyCorrection.py.

838  def write(self, cache, dataId):
839  """Write resultant background list
840 
841  This method runs on the slave nodes.
842 
843  Parameters
844  ----------
845  cache : `lsst.pipe.base.Struct`
846  Process pool cache.
847  dataId : `dict`
848  Data identifier.
849  """
850  cache.butler.put(cache.bgList, "skyCorr", dataId)
851 
void write(OutputArchiveHandle &handle) const override

Member Data Documentation

◆ ConfigClass

lsst.pipe.drivers.skyCorrection.SkyCorrectionTask.ConfigClass = SkyCorrectionConfig
static

Definition at line 199 of file skyCorrection.py.


The documentation for this class was generated from the following file: