LSSTApplications  18.1.0
LSSTDataManagementBasePackage
coaddDriver.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 
3 from builtins import zip
4 from builtins import map
5 
6 import lsst.afw.image as afwImage
7 import lsst.afw.geom as afwGeom
8 from lsst.afw.fits.fitsLib import FitsError
9 from lsst.ctrl.pool.parallel import BatchPoolTask
10 from lsst.ctrl.pool.pool import Pool, abortOnError, NODE
11 import lsst.sphgeom
12 from lsst.pex.config import Config, Field, ConfigurableField
13 from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
14 from lsst.pipe.tasks.coaddBase import CoaddTaskRunner
15 from lsst.pipe.tasks.makeCoaddTempExp import MakeCoaddTempExpTask
16 from lsst.pipe.tasks.multiBand import DetectCoaddSourcesTask
17 from lsst.pipe.tasks.selectImages import WcsSelectImagesTask
18 from lsst.pipe.tasks.assembleCoadd import SafeClipAssembleCoaddTask
19 from lsst.pipe.drivers.utils import getDataRef, NullSelectImagesTask, TractDataIdContainer
20 
21 
23  coaddName = Field(dtype=str, default="deep", doc="Name for coadd")
25  target=WcsSelectImagesTask, doc="Select images to process")
26  makeCoaddTempExp = ConfigurableField(
27  target=MakeCoaddTempExpTask, doc="Warp images to sky")
28  doBackgroundReference = Field(
29  dtype=bool, default=False, doc="Build background reference?")
30  backgroundReference = ConfigurableField(
31  target=NullSelectImagesTask, doc="Build background reference")
32  assembleCoadd = ConfigurableField(
33  target=SafeClipAssembleCoaddTask, doc="Assemble warps into coadd")
34  doDetection = Field(dtype=bool, default=True,
35  doc="Run detection on the coaddition product")
36  detectCoaddSources = ConfigurableField(
37  target=DetectCoaddSourcesTask, doc="Detect sources on coadd")
38  hasFakes = Field(dtype=bool, default=False,
39  doc="Should be set to True if fake sources were added to the data before processing.")
40  calexpType = Field(dtype=str, default="calexp",
41  doc="Should be set to fakes_calexp if you want to process calexps with fakes in.")
42 
43  def setDefaults(self):
44  self.makeCoaddTempExp.select.retarget(NullSelectImagesTask)
45  self.assembleCoadd.select.retarget(NullSelectImagesTask)
46  self.assembleCoadd.doWrite = False
47 
48  def validate(self):
49  if self.makeCoaddTempExp.coaddName != self.coaddName:
50  raise RuntimeError(
51  "makeCoaddTempExp.coaddName and coaddName don't match")
52  if self.assembleCoadd.coaddName != self.coaddName:
53  raise RuntimeError(
54  "assembleCoadd.coaddName and coaddName don't match")
55  if self.assembleCoadd.matchingKernelSize != self.makeCoaddTempExp.matchingKernelSize:
56  message = ("assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
57  " don't match" % (self.assembleCoadd.matchingKernelSize,
58  self.makeCoaddTempExp.matchingKernelSize))
59  raise RuntimeError(message)
60 
61 
63 
64  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
65  CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
66  self.reuse = parsedCmd.reuse
67 
68  def makeTask(self, parsedCmd=None, args=None):
69  return self.TaskClass(config=self.config, log=self.log, reuse=self.reuse)
70 
71  @staticmethod
72  def getTargetList(parsedCmd, **kwargs):
73  """!Get bare butler into Task
74 
75  @param parsedCmd results of parsing command input
76  """
77  kwargs["butler"] = parsedCmd.butler
78  kwargs["selectIdList"] = [
79  ref.dataId for ref in parsedCmd.selectId.refList]
80  return [(parsedCmd.id.refList, kwargs), ]
81 
82 
83 def unpickle(factory, args, kwargs):
84  """Unpickle something by calling a factory"""
85  return factory(*args, **kwargs)
86 
87 
89  ConfigClass = CoaddDriverConfig
90  _DefaultName = "coaddDriver"
91  RunnerClass = CoaddDriverTaskRunner
92 
93  def __init__(self, reuse=tuple(), **kwargs):
94  BatchPoolTask.__init__(self, **kwargs)
95  self.reuse = reuse
96  self.makeSubtask("select")
97  self.makeSubtask("makeCoaddTempExp", reuse=("makeCoaddTempExp" in self.reuse))
98  self.makeSubtask("backgroundReference")
99  self.makeSubtask("assembleCoadd")
100  self.makeSubtask("detectCoaddSources")
101  if self.config.hasFakes:
102  self.calexpType = "fakes_calexp"
103  else:
104  self.calexpType = "calexp"
105 
106  def __reduce__(self):
107  """Pickler"""
108  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
109  parentTask=self._parentTask, log=self.log,
110  reuse=self.reuse))
111 
112  @classmethod
113  def _makeArgumentParser(cls, **kwargs):
114  """!Build argument parser
115 
116  Selection references are not cheap (reads Wcs), so are generated
117  only if we're not doing a batch submission.
118  """
119  parser = ArgumentParser(name=cls._DefaultName)
120  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
121  ContainerClass=TractDataIdContainer)
122  datasetType = ConfigDatasetType(name="calexpType")
123  parser.add_id_argument("--selectId", datasetType=datasetType,
124  help="data ID, e.g. --selectId visit=6789 ccd=0..9")
125  parser.addReuseOption(["makeCoaddTempExp", "assembleCoadd", "detectCoaddSources"])
126  return parser
127 
128  @classmethod
129  def batchWallTime(cls, time, parsedCmd, numCores):
130  """!
131  Return walltime request for batch job
132 
133  @param time: Requested time per iteration
134  @param parsedCmd: Results of argument parsing
135  @param numCores: Number of cores
136  @return float walltime request length
137  """
138  numTargets = len(parsedCmd.selectId.refList)
139  return time*numTargets/float(numCores)
140 
141  @abortOnError
142  def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
143  """!Determine which tracts are non-empty before processing
144 
145  @param tractPatchRefList: List of tracts and patches to include in the coaddition
146  @param butler: butler reference object
147  @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
148  @return list of references to sel.runTract function evaluation for each tractPatchRefList member
149  """
150  pool = Pool("tracts")
151  pool.storeSet(butler=butler, skymap=butler.get(
152  self.config.coaddName + "Coadd_skyMap"))
153  tractIdList = []
154  for patchRefList in tractPatchRefList:
155  tractSet = set([patchRef.dataId["tract"]
156  for patchRef in patchRefList])
157  assert len(tractSet) == 1
158  tractIdList.append(tractSet.pop())
159 
160  selectDataList = [data for data in pool.mapNoBalance(self.readSelection, selectIdList) if
161  data is not None]
162  nonEmptyList = pool.mapNoBalance(
163  self.checkTract, tractIdList, selectDataList)
164  tractPatchRefList = [patchRefList for patchRefList, nonEmpty in
165  zip(tractPatchRefList, nonEmptyList) if nonEmpty]
166  self.log.info("Non-empty tracts (%d): %s" % (len(tractPatchRefList),
167  [patchRefList[0].dataId["tract"] for patchRefList in
168  tractPatchRefList]))
169  # Install the dataRef in the selectDataList
170  for data in selectDataList:
171  data.dataRef = getDataRef(butler, data.dataId, self.calexpType)
172 
173  # Process the non-empty tracts
174  return [self.run(patchRefList, butler, selectDataList) for patchRefList in tractPatchRefList]
175 
176  @abortOnError
177  def run(self, patchRefList, butler, selectDataList=[]):
178  """!Run stacking on a tract
179 
180  This method only runs on the master node.
181 
182  @param patchRefList: List of patch data references for tract
183  @param butler: Data butler
184  @param selectDataList: List of SelectStruct for inputs
185  """
186  pool = Pool("stacker")
187  pool.cacheClear()
188  pool.storeSet(butler=butler, warpType=self.config.coaddName + "Coadd_directWarp",
189  coaddType=self.config.coaddName + "Coadd")
190  patchIdList = [patchRef.dataId for patchRef in patchRefList]
191 
192  selectedData = pool.map(self.warp, patchIdList, selectDataList)
193  if self.config.doBackgroundReference:
194  self.backgroundReference.runDataRef(patchRefList, selectDataList)
195 
196  def refNamer(patchRef):
197  return tuple(map(int, patchRef.dataId["patch"].split(",")))
198 
199  lookup = dict(zip(map(refNamer, patchRefList), selectedData))
200  coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) for
201  patchRef in patchRefList]
202  pool.map(self.coadd, coaddData)
203 
204  def readSelection(self, cache, selectId):
205  """!Read Wcs of selected inputs
206 
207  This method only runs on slave nodes.
208  This method is similar to SelectDataIdContainer.makeDataRefList,
209  creating a Struct like a SelectStruct, except with a dataId instead
210  of a dataRef (to ease MPI).
211 
212  @param cache: Pool cache
213  @param selectId: Data identifier for selected input
214  @return a SelectStruct with a dataId instead of dataRef
215  """
216  try:
217  ref = getDataRef(cache.butler, selectId, self.calexpType)
218  self.log.info("Reading Wcs from %s" % (selectId,))
219  md = ref.get("calexp_md", immediate=True)
220  wcs = afwGeom.makeSkyWcs(md)
221  data = Struct(dataId=selectId, wcs=wcs, bbox=afwImage.bboxFromMetadata(md))
222  except FitsError:
223  self.log.warn("Unable to construct Wcs from %s" % (selectId,))
224  return None
225  return data
226 
227  def checkTract(self, cache, tractId, selectIdList):
228  """!Check whether a tract has any overlapping inputs
229 
230  This method only runs on slave nodes.
231 
232  @param cache: Pool cache
233  @param tractId: Data identifier for tract
234  @param selectDataList: List of selection data
235  @return whether tract has any overlapping inputs
236  """
237  def makePolygon(wcs, bbox):
238  """Return a polygon for the image, given Wcs and bounding box"""
239  boxPixelCorners = afwGeom.Box2D(bbox).getCorners()
240  boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
241  return lsst.sphgeom.ConvexPolygon.convexHull([coord.getVector() for coord in boxSkyCorners])
242 
243  skymap = cache.skymap
244  tract = skymap[tractId]
245  tractWcs = tract.getWcs()
246  tractPoly = makePolygon(tractWcs, tract.getBBox())
247 
248  for selectData in selectIdList:
249  if not hasattr(selectData, "poly"):
250  selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
251  if tractPoly.intersects(selectData.poly):
252  return True
253  return False
254 
255  def warp(self, cache, patchId, selectDataList):
256  """!Warp all images for a patch
257 
258  Only slave nodes execute this method.
259 
260  Because only one argument may be passed, it is expected to
261  contain multiple elements, which are:
262 
263  @param patchRef: data reference for patch
264  @param selectDataList: List of SelectStruct for inputs
265  @return selectDataList with non-overlapping elements removed
266  """
267  patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
268  selectDataList = self.selectExposures(patchRef, selectDataList)
269  with self.logOperation("warping %s" % (patchRef.dataId,), catch=True):
270  self.makeCoaddTempExp.runDataRef(patchRef, selectDataList)
271  return selectDataList
272 
273  def coadd(self, cache, data):
274  """!Construct coadd for a patch and measure
275 
276  Only slave nodes execute this method.
277 
278  Because only one argument may be passed, it is expected to
279  contain multiple elements, which are:
280 
281  @param patchRef: data reference for patch
282  @param selectDataList: List of SelectStruct for inputs
283  """
284  patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
285  selectDataList = data.selectDataList
286  coadd = None
287 
288  # We skip the assembleCoadd step if either the *Coadd dataset exists
289  # or we aren't configured to write it, we're supposed to reuse
290  # detectCoaddSources outputs too, and those outputs already exist.
291  canSkipDetection = (
292  "detectCoaddSources" in self.reuse and
293  patchRef.datasetExists(self.detectCoaddSources.config.coaddName+"Coadd_det", write=True)
294  )
295  if "assembleCoadd" in self.reuse:
296  if patchRef.datasetExists(cache.coaddType, write=True):
297  self.log.info("%s: Skipping assembleCoadd for %s; outputs already exist." %
298  (NODE, patchRef.dataId))
299  coadd = patchRef.get(cache.coaddType, immediate=True)
300  elif not self.config.assembleCoadd.doWrite and self.config.doDetection and canSkipDetection:
301  self.log.info(
302  "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
303  (NODE, patchRef.dataId)
304  )
305  return
306  if coadd is None:
307  with self.logOperation("coadding %s" % (patchRef.dataId,), catch=True):
308  coaddResults = self.assembleCoadd.runDataRef(patchRef, selectDataList)
309  if coaddResults is not None:
310  coadd = coaddResults.coaddExposure
311  canSkipDetection = False # can't skip it because coadd may have changed
312  if coadd is None:
313  return
314 
315  # The section of code below determines if the detection task should be
316  # run. If detection is run, then the products are written out as
317  # deepCoadd_calexp. If detection is not run, then the outputs of the
318  # assemble task are written out as deepCoadd.
319  if self.config.doDetection:
320  if canSkipDetection:
321  self.log.info("%s: Skipping detectCoaddSources for %s; outputs already exist." %
322  (NODE, patchRef.dataId))
323  return
324  with self.logOperation("detection on {}".format(patchRef.dataId),
325  catch=True):
326  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
327  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
328  # This includes background subtraction, so do it before writing
329  # the coadd
330  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
331  self.detectCoaddSources.write(detResults, patchRef)
332  else:
333  if self.config.hasFakes:
334  patchRef.put(coadd, "fakes_" + self.assembleCoadd.config.coaddName + "Coadd")
335  else:
336  patchRef.put(coadd, self.assembleCoadd.config.coaddName + "Coadd")
337 
338  def selectExposures(self, patchRef, selectDataList):
339  """!Select exposures to operate upon, via the SelectImagesTask
340 
341  This is very similar to CoaddBaseTask.selectExposures, except we return
342  a list of SelectStruct (same as the input), so we can plug the results into
343  future uses of SelectImagesTask.
344 
345  @param patchRef data reference to a particular patch
346  @param selectDataList list of references to specific data products (i.e. visit, ccd)
347  @return filtered list of SelectStruct
348  """
349  def key(dataRef):
350  return tuple(dataRef.dataId[k] for k in sorted(dataRef.dataId))
351  inputs = dict((key(select.dataRef), select)
352  for select in selectDataList)
353  skyMap = patchRef.get(self.config.coaddName + "Coadd_skyMap")
354  tract = skyMap[patchRef.dataId["tract"]]
355  patch = tract[(tuple(int(i)
356  for i in patchRef.dataId["patch"].split(",")))]
357  bbox = patch.getOuterBBox()
358  wcs = tract.getWcs()
359  cornerPosList = afwGeom.Box2D(bbox).getCorners()
360  coordList = [wcs.pixelToSky(pos) for pos in cornerPosList]
361  dataRefList = self.select.runDataRef(
362  patchRef, coordList, selectDataList=selectDataList).dataRefList
363  return [inputs[key(dataRef)] for dataRef in dataRefList]
364 
365  def writeMetadata(self, dataRef):
366  pass
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: coaddDriver.py:129
def unpickle(factory, args, kwargs)
Definition: coaddDriver.py:83
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
A floating-point coordinate rectangle geometry.
Definition: Box.h:305
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
Definition: coaddDriver.py:338
daf::base::PropertySet * set
Definition: fits.cc:884
def makeTask(self, parsedCmd=None, args=None)
Definition: coaddDriver.py:68
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
def __init__(self, reuse=tuple(), kwargs)
Definition: coaddDriver.py:93
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
Definition: ConvexPolygon.h:65
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def coadd(self, cache, data)
Construct coadd for a patch and measure.
Definition: coaddDriver.py:273
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
Definition: coaddDriver.py:142
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
Definition: coaddDriver.py:255
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def getTargetList(parsedCmd, kwargs)
Get bare butler into Task.
Definition: coaddDriver.py:72
Key< U > key
Definition: Schema.cc:281
std::shared_ptr< SkyWcs > makeSkyWcs(TransformPoint2ToPoint2 const &pixelsToFieldAngle, lsst::geom::Angle const &orientation, bool flipX, lsst::geom::SpherePoint const &boresight, std::string const &projection="TAN")
Construct a FITS SkyWcs from camera geometry.
Definition: SkyWcs.cc:496
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
Definition: coaddDriver.py:204
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: coaddDriver.py:64
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
Definition: coaddDriver.py:227
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:709
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
Definition: coaddDriver.py:177