8 from lsst.pex.config 
import Config, Field, ConfigurableField
 
    9 from lsst.pipe.base import Struct, ArgumentParser, ConfigDatasetType
 
   19     coaddName = Field(dtype=str, default=
"deep", doc=
"Name for coadd")
 
   20     select = ConfigurableField(
 
   21         target=WcsSelectImagesTask, doc=
"Select images to process")
 
   22     makeCoaddTempExp = ConfigurableField(
 
   23         target=MakeCoaddTempExpTask, doc=
"Warp images to sky")
 
   24     doBackgroundReference = Field(
 
   25         dtype=bool, default=
False, doc=
"Build background reference?")
 
   26     backgroundReference = ConfigurableField(
 
   27         target=NullSelectImagesTask, doc=
"Build background reference")
 
   28     assembleCoadd = ConfigurableField(
 
   29         target=SafeClipAssembleCoaddTask, doc=
"Assemble warps into coadd")
 
   30     doDetection = Field(dtype=bool, default=
True,
 
   31                         doc=
"Run detection on the coaddition product")
 
   32     detectCoaddSources = ConfigurableField(
 
   33         target=DetectCoaddSourcesTask, doc=
"Detect sources on coadd")
 
   34     hasFakes = Field(dtype=bool, default=
False,
 
   35                      doc=
"Should be set to True if fake sources were added to the data before processing.")
 
   36     calexpType = Field(dtype=str, default=
"calexp",
 
   37                        doc=
"Should be set to fakes_calexp if you want to process calexps with fakes in.")
 
   47                 "makeCoaddTempExp.coaddName and coaddName don't match")
 
   50                 "assembleCoadd.coaddName and coaddName don't match")
 
   52             message = (
"assembleCoadd.matchingKernelSize (%s) and makeCoaddTempExp.matchingKernelSize (%s)" 
   55             raise RuntimeError(message)
 
   60     def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
 
   61         CoaddTaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
 
   65         return self.TaskClass(config=self.config, log=self.log, reuse=self.
reuse)
 
   69         """!Get bare butler into Task 
   71         @param parsedCmd results of parsing command input 
   73         kwargs[
"butler"] = parsedCmd.butler
 
   74         kwargs[
"selectIdList"] = [
 
   75             ref.dataId 
for ref 
in parsedCmd.selectId.refList]
 
   76         return [(parsedCmd.id.refList, kwargs), ]
 
   80     """Unpickle something by calling a factory""" 
   81     return factory(*args, **kwargs)
 
   85     ConfigClass = CoaddDriverConfig
 
   86     _DefaultName = 
"coaddDriver" 
   87     RunnerClass = CoaddDriverTaskRunner
 
   90         BatchPoolTask.__init__(self, **kwargs)
 
   93         self.
makeSubtask(
"makeCoaddTempExp", reuse=(
"makeCoaddTempExp" in self.
reuse))
 
  104         return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
 
  109     def _makeArgumentParser(cls, **kwargs):
 
  110         """!Build argument parser 
  112         Selection references are not cheap (reads Wcs), so are generated 
  113         only if we're not doing a batch submission. 
  116         parser.add_id_argument(
"--id", 
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
 
  117                                ContainerClass=TractDataIdContainer)
 
  119         parser.add_id_argument(
"--selectId", datasetType=datasetType,
 
  120                                help=
"data ID, e.g. --selectId visit=6789 ccd=0..9")
 
  121         parser.addReuseOption([
"makeCoaddTempExp", 
"assembleCoadd", 
"detectCoaddSources"])
 
  127         Return walltime request for batch job 
  129         @param time: Requested time per iteration 
  130         @param parsedCmd: Results of argument parsing 
  131         @param numCores: Number of cores 
  132         @return float walltime request length 
  134         numTargets = len(parsedCmd.selectId.refList)
 
  135         return time*numTargets/float(numCores)
 
  138     def runDataRef(self, tractPatchRefList, butler, selectIdList=[]):
 
  139         """!Determine which tracts are non-empty before processing 
  141         @param tractPatchRefList: List of tracts and patches to include in the coaddition 
  142         @param butler: butler reference object 
  143         @param selectIdList: List of data Ids (i.e. visit, ccd) to consider when making the coadd 
  144         @return list of references to sel.runTract function evaluation for each tractPatchRefList member 
  146         pool = 
Pool(
"tracts")
 
  147         pool.storeSet(butler=butler, skymap=butler.get(
 
  148             self.
config.coaddName + 
"Coadd_skyMap"))
 
  150         for patchRefList 
in tractPatchRefList:
 
  151             tractSet = 
set([patchRef.dataId[
"tract"]
 
  152                             for patchRef 
in patchRefList])
 
  153             assert len(tractSet) == 1
 
  154             tractIdList.append(tractSet.pop())
 
  156         selectDataList = [data 
for data 
in pool.mapNoBalance(self.
readSelection, selectIdList) 
if 
  158         nonEmptyList = pool.mapNoBalance(
 
  160         tractPatchRefList = [patchRefList 
for patchRefList, nonEmpty 
in 
  161                              zip(tractPatchRefList, nonEmptyList) 
if nonEmpty]
 
  162         self.
log.
info(
"Non-empty tracts (%d): %s" % (len(tractPatchRefList),
 
  163                                                      [patchRefList[0].dataId[
"tract"] 
for patchRefList 
in 
  166         for data 
in selectDataList:
 
  170         return [self.
run(patchRefList, butler, selectDataList) 
for patchRefList 
in tractPatchRefList]
 
  173     def run(self, patchRefList, butler, selectDataList=[]):
 
  174         """!Run stacking on a tract 
  176         This method only runs on the master node. 
  178         @param patchRefList: List of patch data references for tract 
  179         @param butler: Data butler 
  180         @param selectDataList: List of SelectStruct for inputs 
  182         pool = 
Pool(
"stacker")
 
  184         pool.storeSet(butler=butler, warpType=self.
config.coaddName + 
"Coadd_directWarp",
 
  185                       coaddType=self.
config.coaddName + 
"Coadd")
 
  186         patchIdList = [patchRef.dataId 
for patchRef 
in patchRefList]
 
  188         selectedData = pool.map(self.
warp, patchIdList, selectDataList)
 
  189         if self.
config.doBackgroundReference:
 
  190             self.backgroundReference.
runDataRef(patchRefList, selectDataList)
 
  192         def refNamer(patchRef):
 
  193             return tuple(map(int, patchRef.dataId[
"patch"].split(
",")))
 
  195         lookup = dict(zip(map(refNamer, patchRefList), selectedData))
 
  196         coaddData = [
Struct(patchId=patchRef.dataId, selectDataList=lookup[refNamer(patchRef)]) 
for 
  197                      patchRef 
in patchRefList]
 
  198         pool.map(self.
coadd, coaddData)
 
  201         """!Read Wcs of selected inputs 
  203         This method only runs on slave nodes. 
  204         This method is similar to SelectDataIdContainer.makeDataRefList, 
  205         creating a Struct like a SelectStruct, except with a dataId instead 
  206         of a dataRef (to ease MPI). 
  208         @param cache: Pool cache 
  209         @param selectId: Data identifier for selected input 
  210         @return a SelectStruct with a dataId instead of dataRef 
  214             self.
log.
info(
"Reading Wcs from %s" % (selectId,))
 
  215             md = ref.get(
"calexp_md", immediate=
True)
 
  219             self.
log.
warn(
"Unable to construct Wcs from %s" % (selectId,))
 
  224         """!Check whether a tract has any overlapping inputs 
  226         This method only runs on slave nodes. 
  228         @param cache: Pool cache 
  229         @param tractId: Data identifier for tract 
  230         @param selectDataList: List of selection data 
  231         @return whether tract has any overlapping inputs 
  233         def makePolygon(wcs, bbox):
 
  234             """Return a polygon for the image, given Wcs and bounding box""" 
  235             boxPixelCorners = 
geom.Box2D(bbox).getCorners()
 
  236             boxSkyCorners = wcs.pixelToSky(boxPixelCorners)
 
  239         skymap = cache.skymap
 
  240         tract = skymap[tractId]
 
  241         tractWcs = tract.getWcs()
 
  242         tractPoly = makePolygon(tractWcs, tract.getBBox())
 
  244         for selectData 
in selectIdList:
 
  245             if not hasattr(selectData, 
"poly"):
 
  246                 selectData.poly = makePolygon(selectData.wcs, selectData.bbox)
 
  247             if tractPoly.intersects(selectData.poly):
 
  251     def warp(self, cache, patchId, selectDataList):
 
  252         """!Warp all images for a patch 
  254         Only slave nodes execute this method. 
  256         Because only one argument may be passed, it is expected to 
  257         contain multiple elements, which are: 
  259         @param patchRef: data reference for patch 
  260         @param selectDataList: List of SelectStruct for inputs 
  261         @return selectDataList with non-overlapping elements removed 
  263         patchRef = 
getDataRef(cache.butler, patchId, cache.coaddType)
 
  265         with self.
logOperation(
"warping %s" % (patchRef.dataId,), catch=
True):
 
  266             self.makeCoaddTempExp.
runDataRef(patchRef, selectDataList)
 
  267         return selectDataList
 
  270         """!Construct coadd for a patch and measure 
  272         Only slave nodes execute this method. 
  274         Because only one argument may be passed, it is expected to 
  275         contain multiple elements, which are: 
  277         @param patchRef: data reference for patch 
  278         @param selectDataList: List of SelectStruct for inputs 
  280         patchRef = 
getDataRef(cache.butler, data.patchId, cache.coaddType)
 
  281         selectDataList = data.selectDataList
 
  288             "detectCoaddSources" in self.
reuse and 
  289             patchRef.datasetExists(self.detectCoaddSources.config.coaddName+
"Coadd_det", write=
True)
 
  291         if "assembleCoadd" in self.
reuse:
 
  292             if patchRef.datasetExists(cache.coaddType, write=
True):
 
  293                 self.
log.
info(
"%s: Skipping assembleCoadd for %s; outputs already exist." %
 
  294                               (NODE, patchRef.dataId))
 
  295                 coadd = patchRef.get(cache.coaddType, immediate=
True)
 
  296             elif not self.
config.assembleCoadd.doWrite 
and self.
config.doDetection 
and canSkipDetection:
 
  298                     "%s: Skipping assembleCoadd and detectCoaddSources for %s; outputs already exist." %
 
  299                     (NODE, patchRef.dataId)
 
  303             with self.
logOperation(
"coadding %s" % (patchRef.dataId,), catch=
True):
 
  304                 coaddResults = self.assembleCoadd.
runDataRef(patchRef, selectDataList)
 
  305                 if coaddResults 
is not None:
 
  306                     coadd = coaddResults.coaddExposure
 
  307                     canSkipDetection = 
False   
  315         if self.
config.doDetection:
 
  317                 self.
log.
info(
"%s: Skipping detectCoaddSources for %s; outputs already exist." %
 
  318                               (NODE, patchRef.dataId))
 
  322                 idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
 
  323                 expId = int(patchRef.get(self.
config.coaddName + 
"CoaddId"))
 
  326                 detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
 
  327                 self.detectCoaddSources.
write(detResults, patchRef)
 
  330                 patchRef.put(coadd, 
"fakes_" + self.assembleCoadd.config.coaddName + 
"Coadd")
 
  332                 patchRef.put(coadd, self.assembleCoadd.config.coaddName + 
"Coadd")
 
  335         """!Select exposures to operate upon, via the SelectImagesTask 
  337         This is very similar to CoaddBaseTask.selectExposures, except we return 
  338         a list of SelectStruct (same as the input), so we can plug the results into 
  339         future uses of SelectImagesTask. 
  341         @param patchRef data reference to a particular patch 
  342         @param selectDataList list of references to specific data products (i.e. visit, ccd) 
  343         @return filtered list of SelectStruct 
  346             return tuple(dataRef.dataId[k] 
for k 
in sorted(dataRef.dataId))
 
  347         inputs = dict((
key(select.dataRef), select)
 
  348                       for select 
in selectDataList)
 
  349         skyMap = patchRef.get(self.
config.coaddName + 
"Coadd_skyMap")
 
  350         tract = skyMap[patchRef.dataId[
"tract"]]
 
  351         patch = tract[(tuple(int(i)
 
  352                              for i 
in patchRef.dataId[
"patch"].split(
",")))]
 
  353         bbox = patch.getOuterBBox()
 
  356         coordList = [wcs.pixelToSky(pos) 
for pos 
in cornerPosList]
 
  358             patchRef, coordList, selectDataList=selectDataList).dataRefList
 
  359         return [inputs[
key(dataRef)] 
for dataRef 
in dataRefList]