10from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
20 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name for coadd")
22 target=WcsSelectImagesTask, doc=
"Select images to process")
24 target=MakeCoaddTempExpTask, doc=
"Warp images to sky")
26 dtype=bool, default=
False, doc=
"Build background reference?")
28 target=NullSelectImagesTask, doc=
"Build background reference")
30 target=SafeClipAssembleCoaddTask, doc=
"Assemble warps into coadd")
31 doDetection =
Field(dtype=bool, default=
True,
32 doc=
"Run detection on the coaddition product")
34 target=DetectCoaddSourcesTask, doc=
"Detect sources on coadd")
35 hasFakes =
Field(dtype=bool, default=
False,
36 doc=
"Should be set to True if fake sources were added to the data before processing.")
37 calexpType =
Field(dtype=str, default=
"calexp",
38 doc=
"Should be set to fakes_calexp if you want to process calexps with fakes in.")
42 self.
assembleCoaddassembleCoadd.select.retarget(NullSelectImagesTask)
48 "makeCoaddTempExp.coaddName and coaddName don't match")
51 "assembleCoadd.coaddName and coaddName don't match")
53 message = (
"assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)"
54 " don't match" % (self.
assembleCoaddassembleCoadd.matchingKernelSize,
56 raise RuntimeError(message)
61 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
62 CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
63 self.
reusereuse = parsedCmd.reuse
66 return self.TaskClass(config=self.config, log=self.log, reuse=self.
reusereuse)
70 """!Get bare butler into Task
72 @param parsedCmd results of parsing command input
74 kwargs["butler"] = parsedCmd.butler
75 kwargs[
"selectIdList"] = [
76 ref.dataId
for ref
in parsedCmd.selectId.refList]
77 return [(parsedCmd.id.refList, kwargs), ]
81 """Unpickle something by calling a factory"""
82 return factory(*args, **kwargs)
86 ConfigClass = CoaddDriverConfig
87 _DefaultName =
"coaddDriver"
88 RunnerClass = CoaddDriverTaskRunner
91 BatchPoolTask.__init__(self, **kwargs)
93 self.makeSubtask(
"select")
94 self.makeSubtask(
"makeCoaddTempExp", reuse=(
"makeCoaddTempExp" in self.
reusereuse))
95 self.makeSubtask(
"backgroundReference")
96 self.makeSubtask(
"assembleCoadd")
97 self.makeSubtask(
"detectCoaddSources")
98 if self.config.hasFakes:
105 return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
106 parentTask=self._parentTask, log=self.log,
107 reuse=self.
reusereuse))
110 def _makeArgumentParser(cls, **kwargs):
111 """!Build argument parser
113 Selection references are not cheap (reads Wcs), so are generated
114 only
if we
're not doing a batch submission.
116 parser = ArgumentParser(name=cls._DefaultName_DefaultName)
117 parser.add_id_argument("--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
118 ContainerClass=TractDataIdContainer)
119 datasetType = ConfigDatasetType(name=
"calexpType")
120 parser.add_id_argument(
"--selectId", datasetType=datasetType,
121 help=
"data ID, e.g. --selectId visit=6789 ccd=0..9")
122 parser.addReuseOption([
"makeCoaddTempExp",
"assembleCoadd",
"detectCoaddSources"])
128 Return walltime request for batch job
130 @param time: Requested time per iteration
131 @param parsedCmd: Results of argument parsing
132 @param numCores: Number of cores
133 @return float walltime request length
135 numTargets = len(parsedCmd.selectId.refList)
136 return time*numTargets/
float(numCores)
139 def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
140 """!Determine which tracts are non-empty before processing
142 @param tractPatchRefList: List of tracts
and patches to include
in the coaddition
143 @param butler: butler reference object
144 @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd
145 @return list of references to sel.runTract function evaluation
for each tractPatchRefList member
147 pool = Pool("tracts")
148 pool.storeSet(butler=butler, skymap=butler.get(
149 self.config.coaddName +
"Coadd_skyMap"))
151 for patchRefList
in tractPatchRefList:
152 tractSet =
set([patchRef.dataId[
"tract"]
153 for patchRef
in patchRefList])
154 assert len(tractSet) == 1
155 tractIdList.append(tractSet.pop())
157 selectDataList = [data
for data
in pool.mapNoBalance(self.
readSelectionreadSelection, selectIdList)
if
159 nonEmptyList = pool.mapNoBalance(
160 self.
checkTractcheckTract, tractIdList, selectDataList)
161 tractPatchRefList = [patchRefList
for patchRefList, nonEmpty
in
162 zip(tractPatchRefList, nonEmptyList)
if nonEmpty]
163 self.log.
info(
"Non-empty tracts (%d): %s" % (len(tractPatchRefList),
164 [patchRefList[0].dataId[
"tract"]
for patchRefList
in
167 for data
in selectDataList:
171 return [self.
runrun(patchRefList, butler, selectDataList)
for patchRefList
in tractPatchRefList]
174 def run(self, patchRefList, butler, selectDataList=[]):
175 """!Run stacking on a tract
177 This method only runs on the master node.
179 @param patchRefList: List of patch data references
for tract
180 @param butler: Data butler
181 @param selectDataList: List of SelectStruct
for inputs
183 pool = Pool("stacker")
185 pool.storeSet(butler=butler, warpType=self.config.coaddName +
"Coadd_directWarp",
186 coaddType=self.config.coaddName +
"Coadd")
187 patchIdList = [patchRef.dataId
for patchRef
in patchRefList]
189 selectedData = pool.map(self.
warpwarp, patchIdList, selectDataList)
190 if self.config.doBackgroundReference:
191 self.backgroundReference.
runDataRef(patchRefList, selectDataList)
193 def refNamer(patchRef):
194 return tuple(map(int, patchRef.dataId[
"patch"].split(
",")))
196 lookup = dict(zip(map(refNamer, patchRefList), selectedData))
197 coaddData = [Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)])
for
198 patchRef
in patchRefList]
199 pool.map(self.
coaddcoadd, coaddData)
202 """!Read Wcs of selected inputs
204 This method only runs on slave nodes.
205 This method is similar to SelectDataIdContainer.makeDataRefList,
206 creating a Struct like a SelectStruct,
except with a dataId instead
207 of a dataRef (to ease MPI).
209 @param cache: Pool cache
210 @param selectId: Data identifier
for selected input
211 @return a SelectStruct
with a dataId instead of dataRef
215 self.log.
info(
"Reading Wcs from %s" % (selectId,))
216 md = ref.get(
"calexp_md", immediate=
True)
220 self.log.
warn(
"Unable to construct Wcs from %s" % (selectId,))
225 """!Check whether a tract has any overlapping inputs
227 This method only runs on slave nodes.
229 @param cache: Pool cache
230 @param tractId: Data identifier
for tract
231 @param selectDataList: List of selection data
232 @return whether tract has any overlapping inputs
234 def makePolygon(wcs, bbox):
235 """Return a polygon for the image, given Wcs and bounding box"""
236 boxPixelCorners =
geom.Box2D(bbox).getCorners()
237 boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
240 skymap = cache.skymap
241 tract = skymap[tractId]
242 tractWcs = tract.getWcs()
243 tractPoly = makePolygon(tractWcs, tract.getBBox())
245 for selectData
in selectIdList:
246 if not hasattr(selectData,
"poly"):
247 selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
248 if tractPoly.intersects(selectData.poly):
252 def warp(self, cache, patchId, selectDataList):
253 """!Warp all images for a patch
255 Only slave nodes execute this method.
257 Because only one argument may be passed, it is expected to
258 contain multiple elements, which are:
260 @param patchRef: data reference
for patch
261 @param selectDataList: List of SelectStruct
for inputs
262 @return selectDataList
with non-overlapping elements removed
264 patchRef = getDataRef(cache.butler, patchId, cache.coaddType)
265 selectDataList = self.selectExposuresselectExposures(patchRef, selectDataList)
266 with self.
logOperationlogOperation(
"warping %s" % (patchRef.dataId,), catch=
True):
267 self.makeCoaddTempExp.
runDataRef(patchRef, selectDataList)
268 return selectDataList
271 """!Construct coadd for a patch and measure
273 Only slave nodes execute this method.
275 Because only one argument may be passed, it is expected to
276 contain multiple elements, which are:
278 @param patchRef: data reference
for patch
279 @param selectDataList: List of SelectStruct
for inputs
281 patchRef = getDataRef(cache.butler, data.patchId, cache.coaddType)
282 selectDataList = data.selectDataList
289 "detectCoaddSources" in self.
reusereuse
and
290 patchRef.datasetExists(self.detectCoaddSources.config.coaddName+
"Coadd_det", write=
True)
292 if "assembleCoadd" in self.
reusereuse:
293 if patchRef.datasetExists(cache.coaddType, write=
True):
294 self.log.
info(
"%s: Skipping assembleCoadd for %s; outputs already exist." %
295 (NODE, patchRef.dataId))
296 coadd = patchRef.get(cache.coaddType, immediate=
True)
297 elif not self.config.assembleCoadd.doWrite
and self.config.doDetection
and canSkipDetection:
299 "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
300 (NODE, patchRef.dataId)
304 with self.
logOperationlogOperation(
"coadding %s" % (patchRef.dataId,), catch=
True):
305 coaddResults = self.assembleCoadd.
runDataRef(patchRef, selectDataList)
306 if coaddResults
is not None:
307 coadd = coaddResults.coaddExposure
308 canSkipDetection =
False
316 if self.config.doDetection:
318 self.log.
info(
"%s: Skipping detectCoaddSources for %s; outputs already exist." %
319 (NODE, patchRef.dataId))
323 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
325 patchRef, coaddName=self.detectCoaddSources.config.coaddName, log=self.log)
328 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
329 self.detectCoaddSources.
write(detResults, patchRef)
331 if self.config.hasFakes:
332 patchRef.put(coadd,
"fakes_" + self.assembleCoadd.config.coaddName +
"Coadd")
334 patchRef.put(coadd, self.assembleCoadd.config.coaddName +
"Coadd")
337 """!Select exposures to operate upon, via the SelectImagesTask
339 This is very similar to CoaddBaseTask.selectExposures,
except we
return
340 a list of SelectStruct (same
as the input), so we can plug the results into
341 future uses of SelectImagesTask.
343 @param patchRef data reference to a particular patch
344 @param selectDataList list of references to specific data products (i.e. visit, ccd)
345 @return filtered list of SelectStruct
348 return tuple(dataRef.dataId[k]
for k
in sorted(dataRef.dataId))
349 inputs = dict((key(select.dataRef), select)
350 for select
in selectDataList)
351 skyMap = patchRef.get(self.config.coaddName +
"Coadd_skyMap")
352 tract = skyMap[patchRef.dataId[
"tract"]]
353 patch = tract[(tuple(
int(i)
354 for i
in patchRef.dataId[
"patch"].split(
",")))]
355 bbox = patch.getOuterBBox()
358 coordList = [wcs.pixelToSky(pos)
for pos
in cornerPosList]
360 patchRef, coordList, selectDataList=selectDataList).dataRefList
361 return [inputs[key(dataRef)]
for dataRef
in dataRefList]
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
A floating-point coordinate rectangle geometry.
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
def coadd(self, cache, data)
Construct coadd for a patch and measure.
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def writeMetadata(self, dataRef)
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.
def __init__(self, reuse=tuple(), **kwargs)
def getTargetList(parsedCmd, **kwargs)
Get bare butler into Task.
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def makeTask(self, parsedCmd=None, args=None)
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
daf::base::PropertySet * set
std::shared_ptr< SkyWcs > makeSkyWcs(daf::base::PropertySet &metadata, bool strip=false)
Construct a SkyWcs from FITS keywords.
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
def getGen3CoaddExposureId(dataRef, coaddName="deep", includeBand=True, log=None)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def unpickle(factory, args, kwargs)
def getDataRef(butler, dataId, datasetType="raw")
def write(self, patchRef, catalog)
Write the output.