1 from __future__
import absolute_import, division, print_function
3 from builtins
import zip
4 from builtins
import map
13 from lsst.pex.config
import Config, Field, ConfigurableField
14 from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
24 coaddName = Field(dtype=str, default=
"deep", doc=
"Name for coadd")
25 select = ConfigurableField(
26 target=WcsSelectImagesTask, doc=
"Select images to process")
27 makeCoaddTempExp = ConfigurableField(
28 target=MakeCoaddTempExpTask, doc=
"Warp images to sky")
29 doBackgroundReference = Field(
30 dtype=bool, default=
False, doc=
"Build background reference?")
31 backgroundReference = ConfigurableField(
32 target=NullSelectImagesTask, doc=
"Build background reference")
33 assembleCoadd = ConfigurableField(
34 target=SafeClipAssembleCoaddTask, doc=
"Assemble warps into coadd")
35 doDetection = Field(dtype=bool, default=
True,
36 doc=
"Run detection on the coaddition product")
37 detectCoaddSources = ConfigurableField(
38 target=DetectCoaddSourcesTask, doc=
"Detect sources on coadd")
39 hasFakes = Field(dtype=bool, default=
False,
40 doc=
"Should be set to True if fake sources were added to the data before processing.")
41 calexpType = Field(dtype=str, default=
"calexp",
42 doc=
"Should be set to fakes_calexp if you want to process calexps with fakes in.")
52 "makeCoaddTempExp.coaddName and coaddName don't match")
55 "assembleCoadd.coaddName and coaddName don't match")
57 message = (
"assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)" 60 raise RuntimeError(message)
65 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
66 CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
70 return self.TaskClass(config=self.config, log=self.log, reuse=self.
reuse)
74 """!Get bare butler into Task 76 @param parsedCmd results of parsing command input 78 kwargs[
"butler"] = parsedCmd.butler
79 kwargs[
"selectIdList"] = [
80 ref.dataId
for ref
in parsedCmd.selectId.refList]
81 return [(parsedCmd.id.refList, kwargs), ]
85 """Unpickle something by calling a factory""" 86 return factory(*args, **kwargs)
90 ConfigClass = CoaddDriverConfig
91 _DefaultName =
"coaddDriver" 92 RunnerClass = CoaddDriverTaskRunner
95 BatchPoolTask.__init__(self, **kwargs)
98 self.
makeSubtask(
"makeCoaddTempExp", reuse=(
"makeCoaddTempExp" in self.
reuse))
109 return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
114 def _makeArgumentParser(cls, **kwargs):
115 """!Build argument parser 117 Selection references are not cheap (reads Wcs), so are generated 118 only if we're not doing a batch submission. 121 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
122 ContainerClass=TractDataIdContainer)
124 parser.add_id_argument(
"--selectId", datasetType=datasetType,
125 help=
"data ID, e.g. --selectId visit=6789 ccd=0..9")
126 parser.addReuseOption([
"makeCoaddTempExp",
"assembleCoadd",
"detectCoaddSources"])
132 Return walltime request for batch job 134 @param time: Requested time per iteration 135 @param parsedCmd: Results of argument parsing 136 @param numCores: Number of cores 137 @return float walltime request length 139 numTargets = len(parsedCmd.selectId.refList)
140 return time*numTargets/float(numCores)
143 def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
144 """!Determine which tracts are non-empty before processing 146 @param tractPatchRefList: List of tracts and patches to include in the coaddition 147 @param butler: butler reference object 148 @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd 149 @return list of references to sel.runTract function evaluation for each tractPatchRefList member 151 pool =
Pool(
"tracts")
152 pool.storeSet(butler=butler, skymap=butler.get(
153 self.
config.coaddName +
"Coadd_skyMap"))
155 for patchRefList
in tractPatchRefList:
156 tractSet =
set([patchRef.dataId[
"tract"]
157 for patchRef
in patchRefList])
158 assert len(tractSet) == 1
159 tractIdList.append(tractSet.pop())
161 selectDataList = [data
for data
in pool.mapNoBalance(self.
readSelection, selectIdList)
if 163 nonEmptyList = pool.mapNoBalance(
165 tractPatchRefList = [patchRefList
for patchRefList, nonEmpty
in 166 zip(tractPatchRefList, nonEmptyList)
if nonEmpty]
167 self.
log.
info(
"Non-empty tracts (%d): %s" % (len(tractPatchRefList),
168 [patchRefList[0].dataId[
"tract"]
for patchRefList
in 171 for data
in selectDataList:
175 return [self.
run(patchRefList, butler, selectDataList)
for patchRefList
in tractPatchRefList]
178 def run(self, patchRefList, butler, selectDataList=[]):
179 """!Run stacking on a tract 181 This method only runs on the master node. 183 @param patchRefList: List of patch data references for tract 184 @param butler: Data butler 185 @param selectDataList: List of SelectStruct for inputs 187 pool =
Pool(
"stacker")
189 pool.storeSet(butler=butler, warpType=self.
config.coaddName +
"Coadd_directWarp",
190 coaddType=self.
config.coaddName +
"Coadd")
191 patchIdList = [patchRef.dataId
for patchRef
in patchRefList]
193 selectedData = pool.map(self.
warp, patchIdList, selectDataList)
194 if self.
config.doBackgroundReference:
195 self.backgroundReference.
runDataRef(patchRefList, selectDataList)
197 def refNamer(patchRef):
198 return tuple(map(int, patchRef.dataId[
"patch"].split(
",")))
200 lookup = dict(zip(map(refNamer, patchRefList), selectedData))
201 coaddData = [
Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)])
for 202 patchRef
in patchRefList]
203 pool.map(self.
coadd, coaddData)
206 """!Read Wcs of selected inputs 208 This method only runs on slave nodes. 209 This method is similar to SelectDataIdContainer.makeDataRefList, 210 creating a Struct like a SelectStruct, except with a dataId instead 211 of a dataRef (to ease MPI). 213 @param cache: Pool cache 214 @param selectId: Data identifier for selected input 215 @return a SelectStruct with a dataId instead of dataRef 219 self.
log.
info(
"Reading Wcs from %s" % (selectId,))
220 md = ref.get(
"calexp_md", immediate=
True)
224 self.
log.
warn(
"Unable to construct Wcs from %s" % (selectId,))
229 """!Check whether a tract has any overlapping inputs 231 This method only runs on slave nodes. 233 @param cache: Pool cache 234 @param tractId: Data identifier for tract 235 @param selectDataList: List of selection data 236 @return whether tract has any overlapping inputs 238 def makePolygon(wcs, bbox):
239 """Return a polygon for the image, given Wcs and bounding box""" 240 boxPixelCorners =
geom.Box2D(bbox).getCorners()
241 boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
244 skymap = cache.skymap
245 tract = skymap[tractId]
246 tractWcs = tract.getWcs()
247 tractPoly = makePolygon(tractWcs, tract.getBBox())
249 for selectData
in selectIdList:
250 if not hasattr(selectData,
"poly"):
251 selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
252 if tractPoly.intersects(selectData.poly):
256 def warp(self, cache, patchId, selectDataList):
257 """!Warp all images for a patch 259 Only slave nodes execute this method. 261 Because only one argument may be passed, it is expected to 262 contain multiple elements, which are: 264 @param patchRef: data reference for patch 265 @param selectDataList: List of SelectStruct for inputs 266 @return selectDataList with non-overlapping elements removed 268 patchRef =
getDataRef(cache.butler, patchId, cache.coaddType)
270 with self.
logOperation(
"warping %s" % (patchRef.dataId,), catch=
True):
271 self.makeCoaddTempExp.
runDataRef(patchRef, selectDataList)
272 return selectDataList
275 """!Construct coadd for a patch and measure 277 Only slave nodes execute this method. 279 Because only one argument may be passed, it is expected to 280 contain multiple elements, which are: 282 @param patchRef: data reference for patch 283 @param selectDataList: List of SelectStruct for inputs 285 patchRef =
getDataRef(cache.butler, data.patchId, cache.coaddType)
286 selectDataList = data.selectDataList
293 "detectCoaddSources" in self.
reuse and 294 patchRef.datasetExists(self.detectCoaddSources.config.coaddName+
"Coadd_det", write=
True)
296 if "assembleCoadd" in self.
reuse:
297 if patchRef.datasetExists(cache.coaddType, write=
True):
298 self.
log.
info(
"%s: Skipping assembleCoadd for %s; outputs already exist." %
299 (NODE, patchRef.dataId))
300 coadd = patchRef.get(cache.coaddType, immediate=
True)
301 elif not self.
config.assembleCoadd.doWrite
and self.
config.doDetection
and canSkipDetection:
303 "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
304 (NODE, patchRef.dataId)
308 with self.
logOperation(
"coadding %s" % (patchRef.dataId,), catch=
True):
309 coaddResults = self.assembleCoadd.
runDataRef(patchRef, selectDataList)
310 if coaddResults
is not None:
311 coadd = coaddResults.coaddExposure
312 canSkipDetection =
False 320 if self.
config.doDetection:
322 self.
log.
info(
"%s: Skipping detectCoaddSources for %s; outputs already exist." %
323 (NODE, patchRef.dataId))
327 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
328 expId = int(patchRef.get(self.
config.coaddName +
"CoaddId"))
331 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
332 self.detectCoaddSources.
write(detResults, patchRef)
335 patchRef.put(coadd,
"fakes_" + self.assembleCoadd.config.coaddName +
"Coadd")
337 patchRef.put(coadd, self.assembleCoadd.config.coaddName +
"Coadd")
340 """!Select exposures to operate upon, via the SelectImagesTask 342 This is very similar to CoaddBaseTask.selectExposures, except we return 343 a list of SelectStruct (same as the input), so we can plug the results into 344 future uses of SelectImagesTask. 346 @param patchRef data reference to a particular patch 347 @param selectDataList list of references to specific data products (i.e. visit, ccd) 348 @return filtered list of SelectStruct 351 return tuple(dataRef.dataId[k]
for k
in sorted(dataRef.dataId))
352 inputs = dict((
key(select.dataRef), select)
353 for select
in selectDataList)
354 skyMap = patchRef.get(self.
config.coaddName +
"Coadd_skyMap")
355 tract = skyMap[patchRef.dataId[
"tract"]]
356 patch = tract[(tuple(int(i)
357 for i
in patchRef.dataId[
"patch"].split(
",")))]
358 bbox = patch.getOuterBBox()
361 coordList = [wcs.pixelToSky(pos)
for pos
in cornerPosList]
363 patchRef, coordList, selectDataList=selectDataList).dataRefList
364 return [inputs[
key(dataRef)]
for dataRef
in dataRefList]
def write(self, patchRef, catalog)
Write the output.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def unpickle(factory, args, kwargs)
def makeSubtask(self, name, keyArgs)
A floating-point coordinate rectangle geometry.
def selectExposures(self, patchRef, selectDataList)
Select exposures to operate upon, via the SelectImagesTask.
daf::base::PropertySet * set
def makeTask(self, parsedCmd=None, args=None)
def getDataRef(butler, dataId, datasetType="raw")
def __init__(self, reuse=tuple(), kwargs)
static ConvexPolygon convexHull(std::vector< UnitVector3d > const &points)
convexHull returns the convex hull of the given set of points if it exists and throws an exception ot...
def coadd(self, cache, data)
Construct coadd for a patch and measure.
def runDataRef(self, tractPatchRefList, butler, selectIdList=[])
Determine which tracts are non-empty before processing.
def warp(self, cache, patchId, selectDataList)
Warp all images for a patch.
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def getTargetList(parsedCmd, kwargs)
Get bare butler into Task.
std::shared_ptr< SkyWcs > makeSkyWcs(TransformPoint2ToPoint2 const &pixelsToFieldAngle, lsst::geom::Angle const &orientation, bool flipX, lsst::geom::SpherePoint const &boresight, std::string const &projection="TAN")
Construct a FITS SkyWcs from camera geometry.
def readSelection(self, cache, selectId)
Read Wcs of selected inputs.
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def checkTract(self, cache, tractId, selectIdList)
Check whether a tract has any overlapping inputs.
def writeMetadata(self, dataRef)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
def run(self, patchRefList, butler, selectDataList=[])
Run stacking on a tract.