LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
coaddDriver.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 
3 from builtins import zip
4 from builtins import map
5 
6 import lsst.afw.image as afwImage
7 import lsst.afw.geom as afwGeom
8 from lsst.afw.fits.fitsLib import FitsError
9 from lsst.ctrl.pool.parallel import BatchPoolTask
10 from lsst.ctrl.pool.pool import Pool, abortOnError, NODE
11 import lsst.sphgeom
12 from lsst.pex.config import Config, Field, ConfigurableField
13 from lsst.pipe.base import Struct, ArgumentParser
14 from lsst.pipe.tasks.coaddBase import CoaddTaskRunner
15 from lsst.pipe.tasks.makeCoaddTempExp import MakeCoaddTempExpTask
16 from lsst.pipe.tasks.multiBand import DetectCoaddSourcesTask
17 from lsst.pipe.tasks.selectImages import WcsSelectImagesTask
18 from lsst.pipe.tasks.assembleCoadd import SafeClipAssembleCoaddTask
19 from lsst.pipe.drivers.utils import getDataRef, NullSelectImagesTask, TractDataIdContainer
20 
21 
23  coaddName = Field(dtype=str, default="deep", doc="Name for coadd")
25  target=WcsSelectImagesTask, doc="Select images to process")
26  makeCoaddTempExp = ConfigurableField(
27  target=MakeCoaddTempExpTask, doc="Warp images to sky")
28  doBackgroundReference = Field(
29  dtype=bool, default=False, doc="Build background reference?")
30  backgroundReference = ConfigurableField(
31  target=NullSelectImagesTask, doc="Build background reference")
32  assembleCoadd = ConfigurableField(
33  target=SafeClipAssembleCoaddTask, doc="Assemble warps into coadd")
34  doDetection = Field(dtype=bool, default=True,
35  doc="Run detection on the coaddition product")
36  detectCoaddSources = ConfigurableField(
37  target=DetectCoaddSourcesTask, doc="Detect sources on coadd")
38 
39  def setDefaults(self):
40  self.makeCoaddTempExp.select.retarget(NullSelectImagesTask)
41  self.assembleCoadd.select.retarget(NullSelectImagesTask)
42  self.assembleCoadd.doWrite = False
43 
44  def validate(self):
45  if self.makeCoaddTempExp.coaddName != self.coaddName:
46  raise RuntimeError(
47  "makeCoaddTempExp.coaddName and coaddName don't match")
48  if self.assembleCoadd.coaddName != self.coaddName:
49  raise RuntimeError(
50  "assembleCoadd.coaddName and coaddName don't match")
51  if self.assembleCoadd.matchingKernelSize != self.makeCoaddTempExp.matchingKernelSize:
52  message = ("assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
53  " don't match" % (self.assembleCoadd.matchingKernelSize,
54  self.makeCoaddTempExp.matchingKernelSize))
55  raise RuntimeError(message)
56 
57 
59 
60  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
61  CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
62  self.reuse = parsedCmd.reuse
63 
64  def makeTask(self, parsedCmd=None, args=None):
65  return self.TaskClass(config=self.config, log=self.log, reuse=self.reuse)
66 
67  @staticmethod
68  def getTargetList(parsedCmd, **kwargs):
69  """!Get bare butler into Task
70 
71  @param parsedCmd results of parsing command input
72  """
73  kwargs["butler"] = parsedCmd.butler
74  kwargs["selectIdList"] = [
75  ref.dataId for ref in parsedCmd.selectId.refList]
76  return [(parsedCmd.id.refList, kwargs), ]
77 
78 
79 def unpickle(factory, args, kwargs):
80  """Unpickle something by calling a factory"""
81  return factory(*args, **kwargs)
82 
83 
85  ConfigClass = CoaddDriverConfig
86  _DefaultName = "coaddDriver"
87  RunnerClass = CoaddDriverTaskRunner
88 
89  def __init__(self, reuse=tuple(), **kwargs):
90  BatchPoolTask.__init__(self, **kwargs)
91  self.reuse = reuse
92  self.makeSubtask("select")
93  self.makeSubtask("makeCoaddTempExp", reuse=("makeCoaddTempExp" in self.reuse))
94  self.makeSubtask("backgroundReference")
95  self.makeSubtask("assembleCoadd")
96  self.makeSubtask("detectCoaddSources")
97 
98  def __reduce__(self):
99  """Pickler"""
100  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
101  parentTask=self._parentTask, log=self.log,
102  reuse=self.reuse))
103 
104  @classmethod
105  def _makeArgumentParser(cls, **kwargs):
106  """!Build argument parser
107 
108  Selection references are not cheap (reads Wcs), so are generated
109  only if we're not doing a batch submission.
110  """
111  parser = ArgumentParser(name=cls._DefaultName)
112  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
113  ContainerClass=TractDataIdContainer)
114  parser.add_id_argument(
115  "--selectId", "calexp", help="data ID, e.g. --selectId visit=6789 ccd=0..9")
116  parser.addReuseOption(["makeCoaddTempExp", "assembleCoadd", "detectCoaddSources"])
117  return parser
118 
119  @classmethod
120  def batchWallTime(cls, time, parsedCmd, numCores):
121  """!
122  Return walltime request for batch job
123 
124  @param time: Requested time per iteration
125  @param parsedCmd: Results of argument parsing
126  @param numCores: Number of cores
127  @return float walltime request length
128  """
129  numTargets = len(parsedCmd.selectId.refList)
130  return time*numTargets/float(numCores)
131 
132  @abortOnError
133  def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
134  """!Determine which tracts are non-empty before processing
135 
136  @param tractPatchRefList: List of tracts and patches to include in the coaddition
137  @param butler: butler reference object
138  @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
139  @return list of references to sel.runTract function evaluation for each tractPatchRefList member
140  """
141  pool = Pool("tracts")
142  pool.storeSet(butler=butler, skymap=butler.get(
143  self.config.coaddName + "Coadd_skyMap"))
144  tractIdList = []
145  for patchRefList in tractPatchRefList:
146  tractSet = set([patchRef.dataId["tract"]
147  for patchRef in patchRefList])
148  assert len(tractSet) == 1
149  tractIdList.append(tractSet.pop())
150 
151  selectDataList = [data for data in pool.mapNoBalance(self.readSelection, selectIdList) if
152  data is not None]
153  nonEmptyList = pool.mapNoBalance(
154  self.checkTract, tractIdList, selectDataList)
155  tractPatchRefList = [patchRefList for patchRefList, nonEmpty in
156  zip(tractPatchRefList, nonEmptyList) if nonEmpty]
157  self.log.info("Non-empty tracts (%d): %s" % (len(tractPatchRefList),
158  [patchRefList[0].dataId["tract"] for patchRefList in
159  tractPatchRefList]))
160 
161  # Install the dataRef in the selectDataList
162  for data in selectDataList:
163  data.dataRef = getDataRef(butler, data.dataId, "calexp")
164 
165  # Process the non-empty tracts
166  return [self.run(patchRefList, butler, selectDataList) for patchRefList in tractPatchRefList]
167 
168  @abortOnError
169  def run(self, patchRefList, butler, selectDataList=[]):
170  """!Run stacking on a tract
171 
172  This method only runs on the master node.
173 
174  @param patchRefList: List of patch data references for tract
175  @param butler: Data butler
176  @param selectDataList: List of SelectStruct for inputs
177  """
178  pool = Pool("stacker")
179  pool.cacheClear()
180  pool.storeSet(butler=butler, warpType=self.config.coaddName + "Coadd_directWarp",
181  coaddType=self.config.coaddName + "Coadd")
182  patchIdList = [patchRef.dataId for patchRef in patchRefList]
183 
184  selectedData = pool.map(self.warp, patchIdList, selectDataList)
185  if self.config.doBackgroundReference:
186  self.backgroundReference.runDataRef(patchRefList, selectDataList)
187 
188  def refNamer(patchRef):
189  return tuple(map(int, patchRef.dataId["patch"].split(",")))
190 
191  lookup = dict(zip(map(refNamer, patchRefList), selectedData))
192  coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) for
193  patchRef in patchRefList]
194  pool.map(self.coadd, coaddData)
195 
196  def readSelection(self, cache, selectId):
197  """!Read Wcs of selected inputs
198 
199  This method only runs on slave nodes.
200  This method is similar to SelectDataIdContainer.makeDataRefList,
201  creating a Struct like a SelectStruct, except with a dataId instead
202  of a dataRef (to ease MPI).
203 
204  @param cache: Pool cache
205  @param selectId: Data identifier for selected input
206  @return a SelectStruct with a dataId instead of dataRef
207  """
208  try:
209  ref = getDataRef(cache.butler, selectId, "calexp")
210  self.log.info("Reading Wcs from %s" % (selectId,))
211  md = ref.get("calexp_md", immediate=True)
212  wcs = afwGeom.makeSkyWcs(md)
213  data = Struct(dataId=selectId, wcs=wcs, bbox=afwImage.bboxFromMetadata(md))
214  except FitsError:
215  self.log.warn("Unable to construct Wcs from %s" % (selectId,))
216  return None
217  return data
218 
219  def checkTract(self, cache, tractId, selectIdList):
220  """!Check whether a tract has any overlapping inputs
221 
222  This method only runs on slave nodes.
223 
224  @param cache: Pool cache
225  @param tractId: Data identifier for tract
226  @param selectDataList: List of selection data
227  @return whether tract has any overlapping inputs
228  """
229  def makePolygon(wcs, bbox):
230  """Return a polygon for the image, given Wcs and bounding box"""
231  boxPixelCorners = afwGeom.Box2D(bbox).getCorners()
232  boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
233  return lsst.sphgeom.ConvexPolygon.convexHull([coord.getVector() for coord in boxSkyCorners])
234 
235  skymap = cache.skymap
236  tract = skymap[tractId]
237  tractWcs = tract.getWcs()
238  tractPoly = makePolygon(tractWcs, tract.getBBox())
239 
240  for selectData in selectIdList:
241  if not hasattr(selectData, "poly"):
242  selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
243  if tractPoly.intersects(selectData.poly):
244  return True
245  return False
246 
247  def warp(self, cache, patchId, selectDataList):
248  """!Warp all images for a patch
249 
250  Only slave nodes execute this method.
251 
252  Because only one argument may be passed, it is expected to
253  contain multiple elements, which are:
254 
255  @param patchRef: data reference for patch
256  @param selectDataList: List of SelectStruct for inputs
257  @return selectDataList with non-overlapping elements removed
258  """
259  patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
260  selectDataList = self.selectExposures(patchRef, selectDataList)
261  with self.logOperation("warping %s" % (patchRef.dataId,), catch=True):
262  self.makeCoaddTempExp.runDataRef(patchRef, selectDataList)
263  return selectDataList
264 
265  def coadd(self, cache, data):
266  """!Construct coadd for a patch and measure
267 
268  Only slave nodes execute this method.
269 
270  Because only one argument may be passed, it is expected to
271  contain multiple elements, which are:
272 
273  @param patchRef: data reference for patch
274  @param selectDataList: List of SelectStruct for inputs
275  """
276  patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
277  selectDataList = data.selectDataList
278  coadd = None
279 
280  # We skip the assembleCoadd step if either the *Coadd dataset exists
281  # or we aren't configured to write it, we're supposed to reuse
282  # detectCoaddSources outputs too, and those outputs already exist.
283  canSkipDetection = (
284  "detectCoaddSources" in self.reuse and
285  patchRef.datasetExists(self.detectCoaddSources.config.coaddName+"Coadd_det", write=True)
286  )
287  if "assembleCoadd" in self.reuse:
288  if patchRef.datasetExists(cache.coaddType, write=True):
289  self.log.info("%s: Skipping assembleCoadd for %s; outputs already exist." %
290  (NODE, patchRef.dataId))
291  coadd = patchRef.get(cache.coaddType, immediate=True)
292  elif not self.config.assembleCoadd.doWrite and self.config.doDetection and canSkipDetection:
293  self.log.info(
294  "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
295  (NODE, patchRef.dataId)
296  )
297  return
298  if coadd is None:
299  with self.logOperation("coadding %s" % (patchRef.dataId,), catch=True):
300  coaddResults = self.assembleCoadd.runDataRef(patchRef, selectDataList)
301  if coaddResults is not None:
302  coadd = coaddResults.coaddExposure
303  canSkipDetection = False # can't skip it because coadd may have changed
304  if coadd is None:
305  return
306 
307  # The section of code below determines if the detection task should be
308  # run. If detection is run, then the products are written out as
309  # deepCoadd_calexp. If detection is not run, then the outputs of the
310  # assemble task are written out as deepCoadd.
311  if self.config.doDetection:
312  if canSkipDetection:
313  self.log.info("%s: Skipping detectCoaddSources for %s; outputs already exist." %
314  (NODE, patchRef.dataId))
315  return
316  with self.logOperation("detection on {}".format(patchRef.dataId),
317  catch=True):
318  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
319  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
320  # This includes background subtraction, so do it before writing
321  # the coadd
322  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
323  self.detectCoaddSources.write(detResults, patchRef)
324  else:
325  patchRef.put(coadd, self.assembleCoadd.config.coaddName+"Coadd")
326 
327  def selectExposures(self, patchRef, selectDataList):
328  """!Select exposures to operate upon, via the SelectImagesTask
329 
330  This is very similar to CoaddBaseTask.selectExposures, except we return
331  a list of SelectStruct (same as the input), so we can plug the results into
332  future uses of SelectImagesTask.
333 
334  @param patchRef data reference to a particular patch
335  @param selectDataList list of references to specific data products (i.e. visit, ccd)
336  @return filtered list of SelectStruct
337  """
338  def key(dataRef):
339  return tuple(dataRef.dataId[k] for k in sorted(dataRef.dataId))
340  inputs = dict((key(select.dataRef), select)
341  for select in selectDataList)
342  skyMap = patchRef.get(self.config.coaddName + "Coadd_skyMap")
343  tract = skyMap[patchRef.dataId["tract"]]
344  patch = tract[(tuple(int(i)
345  for i in patchRef.dataId["patch"].split(",")))]
346  bbox = patch.getOuterBBox()
347  wcs = tract.getWcs()
348  cornerPosList = afwGeom.Box2D(bbox).getCorners()
349  coordList = [wcs.pixelToSky(pos) for pos in cornerPosList]
350  dataRefList = self.select.runDataRef(
351  patchRef, coordList, selectDataList=selectDataList).dataRefList
352  return [inputs[key(dataRef)] for dataRef in dataRefList]
353 
354  def writeMetadata(self, dataRef):
355  pass
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: coaddDriver.py:120
def unpickle(factory, args, kwargs)
Definition: coaddDriver.py:79
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
A floating-point coordinate rectangle geometry.
Definition: Box.h:294
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
Definition: coaddDriver.py:327
daf::base::PropertySet * set
Definition: fits.cc:832
def makeTask(self, parsedCmd=None, args=None)
Definition: coaddDriver.py:64
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
def __init__(self, reuse=tuple(), kwargs)
Definition: coaddDriver.py:89
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
Definition: ConvexPolygon.h:65
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def coadd(self, cache, data)
Construct coadd for a patch and measure.
Definition: coaddDriver.py:265
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
Definition: coaddDriver.py:133
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
Definition: coaddDriver.py:247
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def getTargetList(parsedCmd, kwargs)
Get bare butler into Task.
Definition: coaddDriver.py:68
Key< U > key
Definition: Schema.cc:281
std::shared_ptr< SkyWcs > makeSkyWcs(TransformPoint2ToPoint2 const &pixelsToFieldAngle, lsst::geom::Angle const &orientation, bool flipX, lsst::geom::SpherePoint const &boresight, std::string const &projection="TAN")
Construct a FITS SkyWcs from camera geometry.
Definition: SkyWcs.cc:486
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
Definition: coaddDriver.py:196
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: coaddDriver.py:60
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
Definition: coaddDriver.py:219
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:703
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
Definition: coaddDriver.py:169