LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
coaddDriver.py
Go to the documentation of this file.
1 import lsst.afw.image as afwImage
2 import lsst.afw.geom as afwGeom
3 from lsst.afw.fits.fitsLib import FitsError
4 import lsst.geom as geom
5 from lsst.ctrl.pool.parallel import BatchPoolTask
6 from lsst.ctrl.pool.pool import Pool, abortOnError, NODE
7 import lsst.sphgeom
8 from lsst.pex.config import Config, Field, ConfigurableField
9 from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
10 from lsst.pipe.tasks.coaddBase import CoaddTaskRunner
11 from lsst.pipe.tasks.makeCoaddTempExp import MakeCoaddTempExpTask
12 from lsst.pipe.tasks.multiBand import DetectCoaddSourcesTask
13 from lsst.pipe.tasks.selectImages import WcsSelectImagesTask
14 from lsst.pipe.tasks.assembleCoadd import SafeClipAssembleCoaddTask
15 from lsst.pipe.drivers.utils import getDataRef, NullSelectImagesTask, TractDataIdContainer
16 
17 
18 class CoaddDriverConfig(Config):
19  coaddName = Field(dtype=str, default="deep", doc="Name for coadd")
20  select = ConfigurableField(
21  target=WcsSelectImagesTask, doc="Select images to process")
22  makeCoaddTempExp = ConfigurableField(
23  target=MakeCoaddTempExpTask, doc="Warp images to sky")
24  doBackgroundReference = Field(
25  dtype=bool, default=False, doc="Build background reference?")
26  backgroundReference = ConfigurableField(
27  target=NullSelectImagesTask, doc="Build background reference")
28  assembleCoadd = ConfigurableField(
29  target=SafeClipAssembleCoaddTask, doc="Assemble warps into coadd")
30  doDetection = Field(dtype=bool, default=True,
31  doc="Run detection on the coaddition product")
32  detectCoaddSources = ConfigurableField(
33  target=DetectCoaddSourcesTask, doc="Detect sources on coadd")
34  hasFakes = Field(dtype=bool, default=False,
35  doc="Should be set to True if fake sources were added to the data before processing.")
36  calexpType = Field(dtype=str, default="calexp",
37  doc="Should be set to fakes_calexp if you want to process calexps with fakes in.")
38 
39  def setDefaults(self):
40  self.makeCoaddTempExp.select.retarget(NullSelectImagesTask)
41  self.assembleCoadd.select.retarget(NullSelectImagesTask)
42  self.assembleCoadd.doWrite = False
43 
44  def validate(self):
45  if self.makeCoaddTempExp.coaddName != self.coaddName:
46  raise RuntimeError(
47  "makeCoaddTempExp.coaddName and coaddName don't match")
48  if self.assembleCoadd.coaddName != self.coaddName:
49  raise RuntimeError(
50  "assembleCoadd.coaddName and coaddName don't match")
51  if self.assembleCoadd.matchingKernelSize != self.makeCoaddTempExp.matchingKernelSize:
52  message = ("assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
53  " don't match" % (self.assembleCoadd.matchingKernelSize,
54  self.makeCoaddTempExp.matchingKernelSize))
55  raise RuntimeError(message)
56 
57 
59 
60  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
61  CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
62  self.reuse = parsedCmd.reuse
63 
64  def makeTask(self, parsedCmd=None, args=None):
65  return self.TaskClass(config=self.config, log=self.log, reuse=self.reuse)
66 
67  @staticmethod
68  def getTargetList(parsedCmd, **kwargs):
69  """!Get bare butler into Task
70 
71  @param parsedCmd results of parsing command input
72  """
73  kwargs["butler"] = parsedCmd.butler
74  kwargs["selectIdList"] = [
75  ref.dataId for ref in parsedCmd.selectId.refList]
76  return [(parsedCmd.id.refList, kwargs), ]
77 
78 
79 def unpickle(factory, args, kwargs):
80  """Unpickle something by calling a factory"""
81  return factory(*args, **kwargs)
82 
83 
85  ConfigClass = CoaddDriverConfig
86  _DefaultName = "coaddDriver"
87  RunnerClass = CoaddDriverTaskRunner
88 
89  def __init__(self, reuse=tuple(), **kwargs):
90  BatchPoolTask.__init__(self, **kwargs)
91  self.reuse = reuse
92  self.makeSubtask("select")
93  self.makeSubtask("makeCoaddTempExp", reuse=("makeCoaddTempExp" in self.reuse))
94  self.makeSubtask("backgroundReference")
95  self.makeSubtask("assembleCoadd")
96  self.makeSubtask("detectCoaddSources")
97  if self.config.hasFakes:
98  self.calexpType = "fakes_calexp"
99  else:
100  self.calexpType = "calexp"
101 
102  def __reduce__(self):
103  """Pickler"""
104  return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
105  parentTask=self._parentTask, log=self.log,
106  reuse=self.reuse))
107 
108  @classmethod
109  def _makeArgumentParser(cls, **kwargs):
110  """!Build argument parser
111 
112  Selection references are not cheap (reads Wcs), so are generated
113  only if we're not doing a batch submission.
114  """
115  parser = ArgumentParser(name=cls._DefaultName)
116  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
117  ContainerClass=TractDataIdContainer)
118  datasetType = ConfigDatasetType(name="calexpType")
119  parser.add_id_argument("--selectId", datasetType=datasetType,
120  help="data ID, e.g. --selectId visit=6789 ccd=0..9")
121  parser.addReuseOption(["makeCoaddTempExp", "assembleCoadd", "detectCoaddSources"])
122  return parser
123 
124  @classmethod
125  def batchWallTime(cls, time, parsedCmd, numCores):
126  """!
127  Return walltime request for batch job
128 
129  @param time: Requested time per iteration
130  @param parsedCmd: Results of argument parsing
131  @param numCores: Number of cores
132  @return float walltime request length
133  """
134  numTargets = len(parsedCmd.selectId.refList)
135  return time*numTargets/float(numCores)
136 
137  @abortOnError
138  def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
139  """!Determine which tracts are non-empty before processing
140 
141  @param tractPatchRefList: List of tracts and patches to include in the coaddition
142  @param butler: butler reference object
143  @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
144  @return list of references to sel.runTract function evaluation for each tractPatchRefList member
145  """
146  pool = Pool("tracts")
147  pool.storeSet(butler=butler, skymap=butler.get(
148  self.config.coaddName + "Coadd_skyMap"))
149  tractIdList = []
150  for patchRefList in tractPatchRefList:
151  tractSet = set([patchRef.dataId["tract"]
152  for patchRef in patchRefList])
153  assert len(tractSet) == 1
154  tractIdList.append(tractSet.pop())
155 
156  selectDataList = [data for data in pool.mapNoBalance(self.readSelection, selectIdList) if
157  data is not None]
158  nonEmptyList = pool.mapNoBalance(
159  self.checkTract, tractIdList, selectDataList)
160  tractPatchRefList = [patchRefList for patchRefList, nonEmpty in
161  zip(tractPatchRefList, nonEmptyList) if nonEmpty]
162  self.log.info("Non-empty tracts (%d): %s" % (len(tractPatchRefList),
163  [patchRefList[0].dataId["tract"] for patchRefList in
164  tractPatchRefList]))
165  # Install the dataRef in the selectDataList
166  for data in selectDataList:
167  data.dataRef = getDataRef(butler, data.dataId, self.calexpType)
168 
169  # Process the non-empty tracts
170  return [self.run(patchRefList, butler, selectDataList) for patchRefList in tractPatchRefList]
171 
172  @abortOnError
173  def run(self, patchRefList, butler, selectDataList=[]):
174  """!Run stacking on a tract
175 
176  This method only runs on the master node.
177 
178  @param patchRefList: List of patch data references for tract
179  @param butler: Data butler
180  @param selectDataList: List of SelectStruct for inputs
181  """
182  pool = Pool("stacker")
183  pool.cacheClear()
184  pool.storeSet(butler=butler, warpType=self.config.coaddName + "Coadd_directWarp",
185  coaddType=self.config.coaddName + "Coadd")
186  patchIdList = [patchRef.dataId for patchRef in patchRefList]
187 
188  selectedData = pool.map(self.warp, patchIdList, selectDataList)
189  if self.config.doBackgroundReference:
190  self.backgroundReference.runDataRef(patchRefList, selectDataList)
191 
192  def refNamer(patchRef):
193  return tuple(map(int, patchRef.dataId["patch"].split(",")))
194 
195  lookup = dict(zip(map(refNamer, patchRefList), selectedData))
196  coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) for
197  patchRef in patchRefList]
198  pool.map(self.coadd, coaddData)
199 
200  def readSelection(self, cache, selectId):
201  """!Read Wcs of selected inputs
202 
203  This method only runs on slave nodes.
204  This method is similar to SelectDataIdContainer.makeDataRefList,
205  creating a Struct like a SelectStruct, except with a dataId instead
206  of a dataRef (to ease MPI).
207 
208  @param cache: Pool cache
209  @param selectId: Data identifier for selected input
210  @return a SelectStruct with a dataId instead of dataRef
211  """
212  try:
213  ref = getDataRef(cache.butler, selectId, self.calexpType)
214  self.log.info("Reading Wcs from %s" % (selectId,))
215  md = ref.get("calexp_md", immediate=True)
216  wcs = afwGeom.makeSkyWcs(md)
217  data = Struct(dataId=selectId, wcs=wcs, bbox=afwImage.bboxFromMetadata(md))
218  except FitsError:
219  self.log.warn("Unable to construct Wcs from %s" % (selectId,))
220  return None
221  return data
222 
223  def checkTract(self, cache, tractId, selectIdList):
224  """!Check whether a tract has any overlapping inputs
225 
226  This method only runs on slave nodes.
227 
228  @param cache: Pool cache
229  @param tractId: Data identifier for tract
230  @param selectDataList: List of selection data
231  @return whether tract has any overlapping inputs
232  """
233  def makePolygon(wcs, bbox):
234  """Return a polygon for the image, given Wcs and bounding box"""
235  boxPixelCorners = geom.Box2D(bbox).getCorners()
236  boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
237  return lsst.sphgeom.ConvexPolygon.convexHull([coord.getVector() for coord in boxSkyCorners])
238 
239  skymap = cache.skymap
240  tract = skymap[tractId]
241  tractWcs = tract.getWcs()
242  tractPoly = makePolygon(tractWcs, tract.getBBox())
243 
244  for selectData in selectIdList:
245  if not hasattr(selectData, "poly"):
246  selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
247  if tractPoly.intersects(selectData.poly):
248  return True
249  return False
250 
251  def warp(self, cache, patchId, selectDataList):
252  """!Warp all images for a patch
253 
254  Only slave nodes execute this method.
255 
256  Because only one argument may be passed, it is expected to
257  contain multiple elements, which are:
258 
259  @param patchRef: data reference for patch
260  @param selectDataList: List of SelectStruct for inputs
261  @return selectDataList with non-overlapping elements removed
262  """
263  patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
264  selectDataList = self.selectExposures(patchRef, selectDataList)
265  with self.logOperation("warping %s" % (patchRef.dataId,), catch=True):
266  self.makeCoaddTempExp.runDataRef(patchRef, selectDataList)
267  return selectDataList
268 
269  def coadd(self, cache, data):
270  """!Construct coadd for a patch and measure
271 
272  Only slave nodes execute this method.
273 
274  Because only one argument may be passed, it is expected to
275  contain multiple elements, which are:
276 
277  @param patchRef: data reference for patch
278  @param selectDataList: List of SelectStruct for inputs
279  """
280  patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
281  selectDataList = data.selectDataList
282  coadd = None
283 
284  # We skip the assembleCoadd step if either the *Coadd dataset exists
285  # or we aren't configured to write it, we're supposed to reuse
286  # detectCoaddSources outputs too, and those outputs already exist.
287  canSkipDetection = (
288  "detectCoaddSources" in self.reuse and
289  patchRef.datasetExists(self.detectCoaddSources.config.coaddName+"Coadd_det", write=True)
290  )
291  if "assembleCoadd" in self.reuse:
292  if patchRef.datasetExists(cache.coaddType, write=True):
293  self.log.info("%s: Skipping assembleCoadd for %s; outputs already exist." %
294  (NODE, patchRef.dataId))
295  coadd = patchRef.get(cache.coaddType, immediate=True)
296  elif not self.config.assembleCoadd.doWrite and self.config.doDetection and canSkipDetection:
297  self.log.info(
298  "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
299  (NODE, patchRef.dataId)
300  )
301  return
302  if coadd is None:
303  with self.logOperation("coadding %s" % (patchRef.dataId,), catch=True):
304  coaddResults = self.assembleCoadd.runDataRef(patchRef, selectDataList)
305  if coaddResults is not None:
306  coadd = coaddResults.coaddExposure
307  canSkipDetection = False # can't skip it because coadd may have changed
308  if coadd is None:
309  return
310 
311  # The section of code below determines if the detection task should be
312  # run. If detection is run, then the products are written out as
313  # deepCoadd_calexp. If detection is not run, then the outputs of the
314  # assemble task are written out as deepCoadd.
315  if self.config.doDetection:
316  if canSkipDetection:
317  self.log.info("%s: Skipping detectCoaddSources for %s; outputs already exist." %
318  (NODE, patchRef.dataId))
319  return
320  with self.logOperation("detection on {}".format(patchRef.dataId),
321  catch=True):
322  idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
323  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
324  # This includes background subtraction, so do it before writing
325  # the coadd
326  detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
327  self.detectCoaddSources.write(detResults, patchRef)
328  else:
329  if self.config.hasFakes:
330  patchRef.put(coadd, "fakes_" + self.assembleCoadd.config.coaddName + "Coadd")
331  else:
332  patchRef.put(coadd, self.assembleCoadd.config.coaddName + "Coadd")
333 
334  def selectExposures(self, patchRef, selectDataList):
335  """!Select exposures to operate upon, via the SelectImagesTask
336 
337  This is very similar to CoaddBaseTask.selectExposures, except we return
338  a list of SelectStruct (same as the input), so we can plug the results into
339  future uses of SelectImagesTask.
340 
341  @param patchRef data reference to a particular patch
342  @param selectDataList list of references to specific data products (i.e. visit, ccd)
343  @return filtered list of SelectStruct
344  """
345  def key(dataRef):
346  return tuple(dataRef.dataId[k] for k in sorted(dataRef.dataId))
347  inputs = dict((key(select.dataRef), select)
348  for select in selectDataList)
349  skyMap = patchRef.get(self.config.coaddName + "Coadd_skyMap")
350  tract = skyMap[patchRef.dataId["tract"]]
351  patch = tract[(tuple(int(i)
352  for i in patchRef.dataId["patch"].split(",")))]
353  bbox = patch.getOuterBBox()
354  wcs = tract.getWcs()
355  cornerPosList = geom.Box2D(bbox).getCorners()
356  coordList = [wcs.pixelToSky(pos) for pos in cornerPosList]
357  dataRefList = self.select.runDataRef(
358  patchRef, coordList, selectDataList=selectDataList).dataRefList
359  return [inputs[key(dataRef)] for dataRef in dataRefList]
360 
361  def writeMetadata(self, dataRef):
362  pass
def write(self, patchRef, catalog)
Write the output.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: coaddDriver.py:125
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def unpickle(factory, args, kwargs)
Definition: coaddDriver.py:79
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
Definition: coaddDriver.py:334
daf::base::PropertySet * set
Definition: fits.cc:902
def makeTask(self, parsedCmd=None, args=None)
Definition: coaddDriver.py:64
def getDataRef(butler, dataId, datasetType="raw")
Definition: utils.py:16
def __init__(self, reuse=tuple(), kwargs)
Definition: coaddDriver.py:89
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
Definition: ConvexPolygon.h:65
def coadd(self, cache, data)
Construct coadd for a patch and measure.
Definition: coaddDriver.py:269
Key< U > key
Definition: Schema.cc:281
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
Definition: coaddDriver.py:138
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
Definition: coaddDriver.py:251
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def getTargetList(parsedCmd, kwargs)
Get bare butler into Task.
Definition: coaddDriver.py:68
std::shared_ptr< SkyWcs > makeSkyWcs(TransformPoint2ToPoint2 const &pixelsToFieldAngle, lsst::geom::Angle const &orientation, bool flipX, lsst::geom::SpherePoint const &boresight, std::string const &projection="TAN")
Construct a FITS SkyWcs from camera geometry.
Definition: SkyWcs.cc:516
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
Definition: coaddDriver.py:200
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: coaddDriver.py:60
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
Definition: coaddDriver.py:223
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:694
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
Definition: coaddDriver.py:173