LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
coaddDriver.py
Go to the documentation of this file.
1 from __future__ import absolute_import, division, print_function
2 
3 from builtins import zip
4 from builtins import map
5 
6 import lsst.afw.image as afwImage
7 import lsst.afw.geom as afwGeom
8 from lsst.afw.fits.fitsLib import FitsError
9 import lsst.geom as geom
10 from lsst.ctrl.pool.parallel import BatchPoolTask
11 from lsst.ctrl.pool.pool import Pool, abortOnError, NODE
12 import lsst.sphgeom
13 from lsst.pex.config import Config, Field, ConfigurableField
14 from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
15 from lsst.pipe.tasks.coaddBase import CoaddTaskRunner
16 from lsst.pipe.tasks.makeCoaddTempExp import MakeCoaddTempExpTask
17 from lsst.pipe.tasks.multiBand import DetectCoaddSourcesTask
18 from lsst.pipe.tasks.selectImages import WcsSelectImagesTask
19 from lsst.pipe.tasks.assembleCoadd import SafeClipAssembleCoaddTask
20 from lsst.pipe.drivers.utils import getDataRef, NullSelectImagesTask, TractDataIdContainer
21 
22 
23 class CoaddDriverConfig(Config):
24  coaddName = Field(dtype=str, default="deep", doc="Name for coadd")
25  select = ConfigurableField(
26  target=WcsSelectImagesTask, doc="Select images to process")
27  makeCoaddTempExp = ConfigurableField(
28  target=MakeCoaddTempExpTask, doc="Warp images to sky")
29  doBackgroundReference = Field(
30  dtype=bool, default=False, doc="Build background reference?")
31  backgroundReference = ConfigurableField(
32  target=NullSelectImagesTask, doc="Build background reference")
33  assembleCoadd = ConfigurableField(
34  target=SafeClipAssembleCoaddTask, doc="Assemble warps into coadd")
35  doDetection = Field(dtype=bool, default=True,
36  doc="Run detection on the coaddition product")
37  detectCoaddSources = ConfigurableField(
38  target=DetectCoaddSourcesTask, doc="Detect sources on coadd")
39  hasFakes = Field(dtype=bool, default=False,
40  doc="Should be set to True if fake sources were added to the data before processing.")
41  calexpType = Field(dtype=str, default="calexp",
42  doc="Should be set to fakes_calexp if you want to process calexps with fakes in.")
43 
44  def setDefaults(self):
45  self.makeCoaddTempExp.select.retarget(NullSelectImagesTask)
46  self.assembleCoadd.select.retarget(NullSelectImagesTask)
47  self.assembleCoadd.doWrite = False
48 
49  def validate(self):
50  if self.makeCoaddTempExp.coaddName != self.coaddName:
51  raise RuntimeError(
52  "makeCoaddTempExp.coaddName and coaddName don't match")
53  if self.assembleCoadd.coaddName != self.coaddName:
54  raise RuntimeError(
55  "assembleCoadd.coaddName and coaddName don't match")
56  if self.assembleCoadd.matchingKernelSize != self.makeCoaddTempExp.matchingKernelSize:
57  message = ("assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
58  " don't match" % (self.assembleCoadd.matchingKernelSize,
59  self.makeCoaddTempExp.matchingKernelSize))
60  raise RuntimeError(message)
61 
62 
64 
65  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
66  CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
67  self.reuse = parsedCmd.reuse
68 
69  def makeTask(self, parsedCmd=None, args=None):
70  return self.TaskClass(config=self.config, log=self.log, reuse=self.reuse)
71 
72  @staticmethod
73  def getTargetList(parsedCmd, **kwargs):
74  """!Get bare butler into Task
75 
76  @param parsedCmd results of parsing command input
77  """
78  kwargs["butler"] = parsedCmd.butler
79  kwargs["selectIdList"] = [
80  ref.dataId for ref in parsedCmd.selectId.refList]
81  return [(parsedCmd.id.refList, kwargs), ]
82 
83 
84 def unpickle(factory, args, kwargs):
85  """Unpickle something by calling a factory"""
86  return factory(*args, **kwargs)
87 
88 
90  ConfigClass = CoaddDriverConfig
91  _DefaultName = "coaddDriver"
92  RunnerClass = CoaddDriverTaskRunner
93 
94  def __init__(self, reuse=tuple(), **kwargs):
95  BatchPoolTask.__init__(self, **kwargs)
96  self.reuse = reuse
97  self.makeSubtask("select")
98  self.makeSubtask("makeCoaddTempExp", reuse=("makeCoaddTempExp" in self.reuse))
99  self.makeSubtask("backgroundReference")
100  self.makeSubtask("assembleCoadd")
101  self.makeSubtask("detectCoaddSources")
102  if self.config.hasFakes:
103  self.calexpType = "fakes_calexp"
104  else:
105  self.calexpType = "calexp"
106 
107  def __reduce__(self):
108  """Pickler"""
109  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
110  parentTask=self._parentTask, log=self.log,
111  reuse=self.reuse))
112 
113  @classmethod
114  def _makeArgumentParser(cls, **kwargs):
115  """!Build argument parser
116 
117  Selection references are not cheap (reads Wcs), so are generated
118  only if we're not doing a batch submission.
119  """
120  parser = ArgumentParser(name=cls._DefaultName)
121  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
122  ContainerClass=TractDataIdContainer)
123  datasetType = ConfigDatasetType(name="calexpType")
124  parser.add_id_argument("--selectId", datasetType=datasetType,
125  help="data ID, e.g. --selectId visit=6789 ccd=0..9")
126  parser.addReuseOption(["makeCoaddTempExp", "assembleCoadd", "detectCoaddSources"])
127  return parser
128 
129  @classmethod
130  def batchWallTime(cls, time, parsedCmd, numCores):
131  """!
132  Return walltime request for batch job
133 
134  @param time: Requested time per iteration
135  @param parsedCmd: Results of argument parsing
136  @param numCores: Number of cores
137  @return float walltime request length
138  """
139  numTargets = len(parsedCmd.selectId.refList)
140  return time*numTargets/float(numCores)
141 
142  @abortOnError
143  def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
144  """!Determine which tracts are non-empty before processing
145 
146  @param tractPatchRefList: List of tracts and patches to include in the coaddition
147  @param butler: butler reference object
148  @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
149  @return list of references to sel.runTract function evaluation for each tractPatchRefList member
150  """
151  pool = Pool("tracts")
152  pool.storeSet(butler=butler, skymap=butler.get(
153  self.config.coaddName + "Coadd_skyMap"))
154  tractIdList = []
155  for patchRefList in tractPatchRefList:
156  tractSet = set([patchRef.dataId["tract"]
157  for patchRef in patchRefList])
158  assert len(tractSet) == 1
159  tractIdList.append(tractSet.pop())
160 
161  selectDataList = [data for data in pool.mapNoBalance(self.readSelection, selectIdList) if
162  data is not None]
163  nonEmptyList = pool.mapNoBalance(
164  self.checkTract, tractIdList, selectDataList)
165  tractPatchRefList = [patchRefList for patchRefList, nonEmpty in
166  zip(tractPatchRefList, nonEmptyList) if nonEmpty]
167  self.log.info("Non-empty tracts (%d): %s" % (len(tractPatchRefList),
168  [patchRefList[0].dataId["tract"] for patchRefList in
169  tractPatchRefList]))
170  # Install the dataRef in the selectDataList
171  for data in selectDataList:
172  data.dataRef = getDataRef(butler, data.dataId, self.calexpType)
173 
174  # Process the non-empty tracts
175  return [self.run(patchRefList, butler, selectDataList) for patchRefList in tractPatchRefList]
176 
177  @abortOnError
178  def run(self, patchRefList, butler, selectDataList=[]):
179  """!Run stacking on a tract
180 
181  This method only runs on the master node.
182 
183  @param patchRefList: List of patch data references for tract
184  @param butler: Data butler
185  @param selectDataList: List of SelectStruct for inputs
186  """
187  pool = Pool("stacker")
188  pool.cacheClear()
189  pool.storeSet(butler=butler, warpType=self.config.coaddName + "Coadd_directWarp",
190  coaddType=self.config.coaddName + "Coadd")
191  patchIdList = [patchRef.dataId for patchRef in patchRefList]
192 
193  selectedData = pool.map(self.warp, patchIdList, selectDataList)
194  if self.config.doBackgroundReference:
195  self.backgroundReference.runDataRef(patchRefList, selectDataList)
196 
197  def refNamer(patchRef):
198  return tuple(map(int, patchRef.dataId["patch"].split(",")))
199 
200  lookup = dict(zip(map(refNamer, patchRefList), selectedData))
201  coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) for
202  patchRef in patchRefList]
203  pool.map(self.coadd, coaddData)
204 
205  def readSelection(self, cache, selectId):
206  """!Read Wcs of selected inputs
207 
208  This method only runs on slave nodes.
209  This method is similar to SelectDataIdContainer.makeDataRefList,
210  creating a Struct like a SelectStruct, except with a dataId instead
211  of a dataRef (to ease MPI).
212 
213  @param cache: Pool cache
214  @param selectId: Data identifier for selected input
215  @return a SelectStruct with a dataId instead of dataRef
216  """
217  try:
218  ref = getDataRef(cache.butler, selectId, self.calexpType)
219  self.log.info("Reading Wcs from %s" % (selectId,))
220  md = ref.get("calexp_md", immediate=True)
221  wcs = afwGeom.makeSkyWcs(md)
222  data = Struct(dataId=selectId, wcs=wcs, bbox=afwImage.bboxFromMetadata(md))
223  except FitsError:
224  self.log.warn("Unable to construct Wcs from %s" % (selectId,))
225  return None
226  return data
227 
228  def checkTract(self, cache, tractId, selectIdList):
229  """!Check whether a tract has any overlapping inputs
230 
231  This method only runs on slave nodes.
232 
233  @param cache: Pool cache
234  @param tractId: Data identifier for tract
235  @param selectDataList: List of selection data
236  @return whether tract has any overlapping inputs
237  """
238  def makePolygon(wcs, bbox):
239  """Return a polygon for the image, given Wcs and bounding box"""
240  boxPixelCorners = geom.Box2D(bbox).getCorners()
241  boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
242  return lsst.sphgeom.ConvexPolygon.convexHull([coord.getVector() for coord in boxSkyCorners])
243 
244  skymap = cache.skymap
245  tract = skymap[tractId]
246  tractWcs = tract.getWcs()
247  tractPoly = makePolygon(tractWcs, tract.getBBox())
248 
249  for selectData in selectIdList:
250  if not hasattr(selectData, "poly"):
251  selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
252  if tractPoly.intersects(selectData.poly):
253  return True
254  return False
255 
256  def warp(self, cache, patchId, selectDataList):
257  """!Warp all images for a patch
258 
259  Only slave nodes execute this method.
260 
261  Because only one argument may be passed, it is expected to
262  contain multiple elements, which are:
263 
264  @param patchRef: data reference for patch
265  @param selectDataList: List of SelectStruct for inputs
266  @return selectDataList with non-overlapping elements removed
267  """
268  patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
269  selectDataList = self.selectExposures(patchRef, selectDataList)
270  with self.logOperation("warping %s" % (patchRef.dataId,), catch=True):
271  self.makeCoaddTempExp.runDataRef(patchRef, selectDataList)
272  return selectDataList
273 
274  def coadd(self, cache, data):
275  """!Construct coadd for a patch and measure
276 
277  Only slave nodes execute this method.
278 
279  Because only one argument may be passed, it is expected to
280  contain multiple elements, which are:
281 
282  @param patchRef: data reference for patch
283  @param selectDataList: List of SelectStruct for inputs
284  """
285  patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
286  selectDataList = data.selectDataList
287  coadd = None
288 
289  # We skip the assembleCoadd step if either the *Coadd dataset exists
290  # or we aren't configured to write it, we're supposed to reuse
291  # detectCoaddSources outputs too, and those outputs already exist.
292  canSkipDetection = (
293  "detectCoaddSources" in self.reuse and
294  patchRef.datasetExists(self.detectCoaddSources.config.coaddName+"Coadd_det", write=True)
295  )
296  if "assembleCoadd" in self.reuse:
297  if patchRef.datasetExists(cache.coaddType, write=True):
298  self.log.info("%s: Skipping assembleCoadd for %s; outputs already exist." %
299  (NODE, patchRef.dataId))
300  coadd = patchRef.get(cache.coaddType, immediate=True)
301  elif not self.config.assembleCoadd.doWrite and self.config.doDetection and canSkipDetection:
302  self.log.info(
303  "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
304  (NODE, patchRef.dataId)
305  )
306  return
307  if coadd is None:
308  with self.logOperation("coadding %s" % (patchRef.dataId,), catch=True):
309  coaddResults = self.assembleCoadd.runDataRef(patchRef, selectDataList)
310  if coaddResults is not None:
311  coadd = coaddResults.coaddExposure
312  canSkipDetection = False # can't skip it because coadd may have changed
313  if coadd is None:
314  return
315 
316  # The section of code below determines if the detection task should be
317  # run. If detection is run, then the products are written out as
318  # deepCoadd_calexp. If detection is not run, then the outputs of the
319  # assemble task are written out as deepCoadd.
320  if self.config.doDetection:
321  if canSkipDetection:
322  self.log.info("%s: Skipping detectCoaddSources for %s; outputs already exist." %
323  (NODE, patchRef.dataId))
324  return
325  with self.logOperation("detection on {}".format(patchRef.dataId),
326  catch=True):
327  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
328  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
329  # This includes background subtraction, so do it before writing
330  # the coadd
331  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
332  self.detectCoaddSources.write(detResults, patchRef)
333  else:
334  if self.config.hasFakes:
335  patchRef.put(coadd, "fakes_" + self.assembleCoadd.config.coaddName + "Coadd")
336  else:
337  patchRef.put(coadd, self.assembleCoadd.config.coaddName + "Coadd")
338 
339  def selectExposures(self, patchRef, selectDataList):
340  """!Select exposures to operate upon, via the SelectImagesTask
341 
342  This is very similar to CoaddBaseTask.selectExposures, except we return
343  a list of SelectStruct (same as the input), so we can plug the results into
344  future uses of SelectImagesTask.
345 
346  @param patchRef data reference to a particular patch
347  @param selectDataList list of references to specific data products (i.e. visit, ccd)
348  @return filtered list of SelectStruct
349  """
350  def key(dataRef):
351  return tuple(dataRef.dataId[k] for k in sorted(dataRef.dataId))
352  inputs = dict((key(select.dataRef), select)
353  for select in selectDataList)
354  skyMap = patchRef.get(self.config.coaddName + "Coadd_skyMap")
355  tract = skyMap[patchRef.dataId["tract"]]
356  patch = tract[(tuple(int(i)
357  for i in patchRef.dataId["patch"].split(",")))]
358  bbox = patch.getOuterBBox()
359  wcs = tract.getWcs()
360  cornerPosList = geom.Box2D(bbox).getCorners()
361  coordList = [wcs.pixelToSky(pos) for pos in cornerPosList]
362  dataRefList = self.select.runDataRef(
363  patchRef, coordList, selectDataList=selectDataList).dataRefList
364  return [inputs[key(dataRef)] for dataRef in dataRefList]
365 
366  def writeMetadata(self, dataRef):
367  pass
def write(self, patchRef, catalog)
Write the output.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: coaddDriver.py:130
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def unpickle(factory, args, kwargs)
Definition: coaddDriver.py:84
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
Definition: coaddDriver.py:339
daf::base::PropertySet * set
Definition: fits.cc:902
def makeTask(self, parsedCmd=None, args=None)
Definition: coaddDriver.py:69
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:17
def __init__(self, reuse=tuple(), kwargs)
Definition: coaddDriver.py:94
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
Definition: ConvexPolygon.h:65
def coadd(self, cache, data)
Construct coadd for a patch and measure.
Definition: coaddDriver.py:274
Key< U > key
Definition: Schema.cc:281
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
Definition: coaddDriver.py:143
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
Definition: coaddDriver.py:256
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def getTargetList(parsedCmd, kwargs)
Get bare butler into Task.
Definition: coaddDriver.py:73
std::shared_ptr< SkyWcs > makeSkyWcs(TransformPoint2ToPoint2 const &pixelsToFieldAngle, lsst::geom::Angle const &orientation, bool flipX, lsst::geom::SpherePoint const &boresight, std::string const &projection="TAN")
Construct a FITS SkyWcs from camera geometry.
Definition: SkyWcs.cc:516
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
Definition: coaddDriver.py:205
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: coaddDriver.py:65
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
Definition: coaddDriver.py:228
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:694
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
Definition: coaddDriver.py:178