LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
coaddDriver.py
Go to the documentation of this file.
1import lsst.afw.image as afwImage
2import lsst.afw.geom as afwGeom
3from lsst.afw.fits import FitsError
4from lsst.coadd.utils.getGen3CoaddExposureId import getGen3CoaddExposureId
5import lsst.geom as geom
6from lsst.ctrl.pool.parallel import BatchPoolTask
7from lsst.ctrl.pool.pool import Pool, abortOnError, NODE
8import lsst.sphgeom
9from lsst.pex.config import Config, Field, ConfigurableField
10from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
11from lsst.pipe.tasks.coaddBase import CoaddTaskRunner
12from lsst.pipe.tasks.makeCoaddTempExp import MakeCoaddTempExpTask
13from lsst.pipe.tasks.multiBand import DetectCoaddSourcesTask
14from lsst.pipe.tasks.selectImages import WcsSelectImagesTask
15from lsst.pipe.tasks.assembleCoadd import SafeClipAssembleCoaddTask
16from lsst.pipe.drivers.utils import getDataRef, NullSelectImagesTask, TractDataIdContainer
17
18
20 coaddName = Field(dtype=str, default="deep", doc="Name for coadd")
22 target=WcsSelectImagesTask, doc="Select images to process")
23 makeCoaddTempExp = ConfigurableField(
24 target=MakeCoaddTempExpTask, doc="Warp images to sky")
25 doBackgroundReference = Field(
26 dtype=bool, default=False, doc="Build background reference?")
27 backgroundReference = ConfigurableField(
28 target=NullSelectImagesTask, doc="Build background reference")
29 assembleCoadd = ConfigurableField(
30 target=SafeClipAssembleCoaddTask, doc="Assemble warps into coadd")
31 doDetection = Field(dtype=bool, default=True,
32 doc="Run detection on the coaddition product")
33 detectCoaddSources = ConfigurableField(
34 target=DetectCoaddSourcesTask, doc="Detect sources on coadd")
35 hasFakes = Field(dtype=bool, default=False,
36 doc="Should be set to True if fake sources were added to the data before processing.")
37 calexpType = Field(dtype=str, default="calexp",
38 doc="Should be set to fakes_calexp if you want to process calexps with fakes in.")
39
40 def setDefaults(self):
41 self.makeCoaddTempExp.select.retarget(NullSelectImagesTask)
42 self.assembleCoadd.select.retarget(NullSelectImagesTask)
43 self.assembleCoadd.doWrite = False
44
45 def validate(self):
46 if self.makeCoaddTempExp.coaddName != self.coaddName:
47 raise RuntimeError(
48 "makeCoaddTempExp.coaddName and coaddName don't match")
49 if self.assembleCoadd.coaddName != self.coaddName:
50 raise RuntimeError(
51 "assembleCoadd.coaddName and coaddName don't match")
52 if self.assembleCoadd.matchingKernelSize != self.makeCoaddTempExp.matchingKernelSize:
53 message = ("assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
54 " don't match" % (self.assembleCoadd.matchingKernelSize,
55 self.makeCoaddTempExp.matchingKernelSize))
56 raise RuntimeError(message)
57
58
59class CoaddDriverTaskRunner(CoaddTaskRunner):
60
61 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
62 CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
63 self.reuse = parsedCmd.reuse
64
65 def makeTask(self, parsedCmd=None, args=None):
66 return self.TaskClass(config=self.config, log=self.log, reuse=self.reuse)
67
68 @staticmethod
69 def getTargetList(parsedCmd, **kwargs):
70 """!Get bare butler into Task
71
72 @param parsedCmd results of parsing command input
73 """
74 kwargs["butler"] = parsedCmd.butler
75 kwargs["selectIdList"] = [
76 ref.dataId for ref in parsedCmd.selectId.refList]
77 return [(parsedCmd.id.refList, kwargs), ]
78
79
80def unpickle(factory, args, kwargs):
81 """Unpickle something by calling a factory"""
82 return factory(*args, **kwargs)
83
84
86 ConfigClass = CoaddDriverConfig
87 _DefaultName = "coaddDriver"
88 RunnerClass = CoaddDriverTaskRunner
89
90 def __init__(self, reuse=tuple(), **kwargs):
91 BatchPoolTask.__init__(self, **kwargs)
92 self.reuse = reuse
93 self.makeSubtask("select")
94 self.makeSubtask("makeCoaddTempExp", reuse=("makeCoaddTempExp" in self.reuse))
95 self.makeSubtask("backgroundReference")
96 self.makeSubtask("assembleCoadd")
97 self.makeSubtask("detectCoaddSources")
98 if self.config.hasFakes:
99 self.calexpType = "fakes_calexp"
100 else:
101 self.calexpType = "calexp"
102
103 def __reduce__(self):
104 """Pickler"""
105 return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
106 parentTask=self._parentTask, log=self.log,
107 reuse=self.reuse))
108
109 @classmethod
110 def _makeArgumentParser(cls, **kwargs):
111 """!Build argument parser
112
113 Selection references are not cheap (reads Wcs), so are generated
114 only if we're not doing a batch submission.
115 """
116 parser = ArgumentParser(name=cls._DefaultName)
117 parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2",
118 ContainerClass=TractDataIdContainer)
119 datasetType = ConfigDatasetType(name="calexpType")
120 parser.add_id_argument("--selectId", datasetType=datasetType,
121 help="data ID, e.g. --selectId visit=6789 ccd=0..9")
122 parser.addReuseOption(["makeCoaddTempExp", "assembleCoadd", "detectCoaddSources"])
123 return parser
124
125 @classmethod
126 def batchWallTime(cls, time, parsedCmd, numCores):
127 """!
128 Return walltime request for batch job
129
130 @param time: Requested time per iteration
131 @param parsedCmd: Results of argument parsing
132 @param numCores: Number of cores
133 @return float walltime request length
134 """
135 numTargets = len(parsedCmd.selectId.refList)
136 return time*numTargets/float(numCores)
137
138 @abortOnError
139 def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
140 """!Determine which tracts are non-empty before processing
141
142 @param tractPatchRefList: List of tracts and patches to include in the coaddition
143 @param butler: butler reference object
144 @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
145 @return list of references to sel.runTract function evaluation for each tractPatchRefList member
146 """
147 pool = Pool("tracts")
148 pool.storeSet(butler=butler, skymap=butler.get(
149 self.config.coaddName + "Coadd_skyMap"))
150 tractIdList = []
151 for patchRefList in tractPatchRefList:
152 tractSet = set([patchRef.dataId["tract"]
153 for patchRef in patchRefList])
154 assert len(tractSet) == 1
155 tractIdList.append(tractSet.pop())
156
157 selectDataList = [data for data in pool.mapNoBalance(self.readSelection, selectIdList) if
158 data is not None]
159 nonEmptyList = pool.mapNoBalance(
160 self.checkTract, tractIdList, selectDataList)
161 tractPatchRefList = [patchRefList for patchRefList, nonEmpty in
162 zip(tractPatchRefList, nonEmptyList) if nonEmpty]
163 self.log.info("Non-empty tracts (%d): %s" % (len(tractPatchRefList),
164 [patchRefList[0].dataId["tract"] for patchRefList in
165 tractPatchRefList]))
166 # Install the dataRef in the selectDataList
167 for data in selectDataList:
168 data.dataRef = getDataRef(butler, data.dataId, self.calexpType)
169
170 # Process the non-empty tracts
171 return [self.run(patchRefList, butler, selectDataList) for patchRefList in tractPatchRefList]
172
173 @abortOnError
174 def run(self, patchRefList, butler, selectDataList=[]):
175 """!Run stacking on a tract
176
177 This method only runs on the master node.
178
179 @param patchRefList: List of patch data references for tract
180 @param butler: Data butler
181 @param selectDataList: List of SelectStruct for inputs
182 """
183 pool = Pool("stacker")
184 pool.cacheClear()
185 pool.storeSet(butler=butler, warpType=self.config.coaddName + "Coadd_directWarp",
186 coaddType=self.config.coaddName + "Coadd")
187 patchIdList = [patchRef.dataId for patchRef in patchRefList]
188
189 selectedData = pool.map(self.warp, patchIdList, selectDataList)
190 if self.config.doBackgroundReference:
191 self.backgroundReference.runDataRef(patchRefList, selectDataList)
192
193 def refNamer(patchRef):
194 return tuple(map(int, patchRef.dataId["patch"].split(",")))
195
196 lookup = dict(zip(map(refNamer, patchRefList), selectedData))
197 coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) for
198 patchRef in patchRefList]
199 pool.map(self.coadd, coaddData)
200
201 def readSelection(self, cache, selectId):
202 """!Read Wcs of selected inputs
203
204 This method only runs on slave nodes.
205 This method is similar to SelectDataIdContainer.makeDataRefList,
206 creating a Struct like a SelectStruct, except with a dataId instead
207 of a dataRef (to ease MPI).
208
209 @param cache: Pool cache
210 @param selectId: Data identifier for selected input
211 @return a SelectStruct with a dataId instead of dataRef
212 """
213 try:
214 ref = getDataRef(cache.butler, selectId, self.calexpType)
215 self.log.info("Reading Wcs from %s" % (selectId,))
216 md = ref.get("calexp_md", immediate=True)
217 wcs = afwGeom.makeSkyWcs(md)
218 data = Struct(dataId=selectId, wcs=wcs, bbox=afwImage.bboxFromMetadata(md))
219 except FitsError:
220 self.log.warn("Unable to construct Wcs from %s" % (selectId,))
221 return None
222 return data
223
224 def checkTract(self, cache, tractId, selectIdList):
225 """!Check whether a tract has any overlapping inputs
226
227 This method only runs on slave nodes.
228
229 @param cache: Pool cache
230 @param tractId: Data identifier for tract
231 @param selectDataList: List of selection data
232 @return whether tract has any overlapping inputs
233 """
234 def makePolygon(wcs, bbox):
235 """Return a polygon for the image, given Wcs and bounding box"""
236 boxPixelCorners = geom.Box2D(bbox).getCorners()
237 boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
238 return lsst.sphgeom.ConvexPolygon.convexHull([coord.getVector() for coord in boxSkyCorners])
239
240 skymap = cache.skymap
241 tract = skymap[tractId]
242 tractWcs = tract.getWcs()
243 tractPoly = makePolygon(tractWcs, tract.getBBox())
244
245 for selectData in selectIdList:
246 if not hasattr(selectData, "poly"):
247 selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
248 if tractPoly.intersects(selectData.poly):
249 return True
250 return False
251
252 def warp(self, cache, patchId, selectDataList):
253 """!Warp all images for a patch
254
255 Only slave nodes execute this method.
256
257 Because only one argument may be passed, it is expected to
258 contain multiple elements, which are:
259
260 @param patchRef: data reference for patch
261 @param selectDataList: List of SelectStruct for inputs
262 @return selectDataList with non-overlapping elements removed
263 """
264 patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
265 selectDataList = self.selectExposures(patchRef, selectDataList)
266 with self.logOperation("warping %s" % (patchRef.dataId,), catch=True):
267 self.makeCoaddTempExp.runDataRef(patchRef, selectDataList)
268 return selectDataList
269
270 def coadd(self, cache, data):
271 """!Construct coadd for a patch and measure
272
273 Only slave nodes execute this method.
274
275 Because only one argument may be passed, it is expected to
276 contain multiple elements, which are:
277
278 @param patchRef: data reference for patch
279 @param selectDataList: List of SelectStruct for inputs
280 """
281 patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
282 selectDataList = data.selectDataList
283 coadd = None
284
285 # We skip the assembleCoadd step if either the *Coadd dataset exists
286 # or we aren't configured to write it, we're supposed to reuse
287 # detectCoaddSources outputs too, and those outputs already exist.
288 canSkipDetection = (
289 "detectCoaddSources" in self.reuse and
290 patchRef.datasetExists(self.detectCoaddSources.config.coaddName+"Coadd_det", write=True)
291 )
292 if "assembleCoadd" in self.reuse:
293 if patchRef.datasetExists(cache.coaddType, write=True):
294 self.log.info("%s: Skipping assembleCoadd for %s; outputs already exist." %
295 (NODE, patchRef.dataId))
296 coadd = patchRef.get(cache.coaddType, immediate=True)
297 elif not self.config.assembleCoadd.doWrite and self.config.doDetection and canSkipDetection:
298 self.log.info(
299 "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
300 (NODE, patchRef.dataId)
301 )
302 return
303 if coadd is None:
304 with self.logOperation("coadding %s" % (patchRef.dataId,), catch=True):
305 coaddResults = self.assembleCoadd.runDataRef(patchRef, selectDataList)
306 if coaddResults is not None:
307 coadd = coaddResults.coaddExposure
308 canSkipDetection = False # can't skip it because coadd may have changed
309 if coadd is None:
310 return
311
312 # The section of code below determines if the detection task should be
313 # run. If detection is run, then the products are written out as
314 # deepCoadd_calexp. If detection is not run, then the outputs of the
315 # assemble task are written out as deepCoadd.
316 if self.config.doDetection:
317 if canSkipDetection:
318 self.log.info("%s: Skipping detectCoaddSources for %s; outputs already exist." %
319 (NODE, patchRef.dataId))
320 return
321 with self.logOperation("detection on {}".format(patchRef.dataId),
322 catch=True):
323 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
324 expId = getGen3CoaddExposureId(
325 patchRef, coaddName=self.detectCoaddSources.config.coaddName, log=self.log)
326 # This includes background subtraction, so do it before writing
327 # the coadd
328 detResults = self.detectCoaddSources.run(coadd, idFactory, expId=expId)
329 self.detectCoaddSources.write(detResults, patchRef)
330 else:
331 if self.config.hasFakes:
332 patchRef.put(coadd, "fakes_" + self.assembleCoadd.config.coaddName + "Coadd")
333 else:
334 patchRef.put(coadd, self.assembleCoadd.config.coaddName + "Coadd")
335
336 def selectExposures(self, patchRef, selectDataList):
337 """!Select exposures to operate upon, via the SelectImagesTask
338
339 This is very similar to CoaddBaseTask.selectExposures, except we return
340 a list of SelectStruct (same as the input), so we can plug the results into
341 future uses of SelectImagesTask.
342
343 @param patchRef data reference to a particular patch
344 @param selectDataList list of references to specific data products (i.e. visit, ccd)
345 @return filtered list of SelectStruct
346 """
347 def key(dataRef):
348 return tuple(dataRef.dataId[k] for k in sorted(dataRef.dataId))
349 inputs = dict((key(select.dataRef), select)
350 for select in selectDataList)
351 skyMap = patchRef.get(self.config.coaddName + "Coadd_skyMap")
352 tract = skyMap[patchRef.dataId["tract"]]
353 patch = tract[(tuple(int(i)
354 for i in patchRef.dataId["patch"].split(",")))]
355 bbox = patch.getOuterBBox()
356 wcs = tract.getWcs()
357 cornerPosList = geom.Box2D(bbox).getCorners()
358 coordList = [wcs.pixelToSky(pos) for pos in cornerPosList]
359 dataRefList = self.select.runDataRef(
360 patchRef, coordList, selectDataList=selectDataList).dataRefList
361 return [inputs[key(dataRef)] for dataRef in dataRefList]
362
363 def writeMetadata(self, dataRef):
364 pass
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:502
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
Definition: coaddDriver.py:201
def coadd(self, cache, data)
Construct coadd for a patch and measure.
Definition: coaddDriver.py:270
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
Definition: coaddDriver.py:139
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: coaddDriver.py:126
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
Definition: coaddDriver.py:224
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
Definition: coaddDriver.py:336
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
Definition: coaddDriver.py:252
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
Definition: coaddDriver.py:174
def __init__(self, reuse=tuple(), **kwargs)
Definition: coaddDriver.py:90
def getTargetList(parsedCmd, **kwargs)
Get bare butler into Task.
Definition: coaddDriver.py:69
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
Definition: coaddDriver.py:61
def makeTask(self, parsedCmd=None, args=None)
Definition: coaddDriver.py:65
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
Definition: ConvexPolygon.h:65
daf::base::PropertySet * set
Definition: fits.cc:927
std::shared_ptr< SkyWcs > makeSkyWcs(daf::base::PropertySet &metadata, bool strip=false)
Construct a SkyWcs from FITS keywords.
Definition: SkyWcs.cc:521
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:721
def unpickle(factory, args, kwargs)
Definition: coaddDriver.py:80