3 from lsst.pex.config 
import Config, Field, ConfigurableField
 
    7                                        DeblendCoaddSourcesTask,
 
    8                                        MeasureMergedCoaddSourcesTask,
 
    9                                        MergeMeasurementsTask,)
 
   20     coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
   21     doDetection = Field(dtype=bool, default=
False,
 
   22                         doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
 
   23     detectCoaddSources = ConfigurableField(target=DetectCoaddSourcesTask,
 
   24                                            doc=
"Detect sources on coadd")
 
   25     mergeCoaddDetections = ConfigurableField(
 
   26         target=MergeDetectionsTask, doc=
"Merge detections")
 
   27     deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
 
   28     measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
 
   29                                             doc=
"Measure merged and (optionally) deblended detections")
 
   30     mergeCoaddMeasurements = ConfigurableField(
 
   31         target=MergeMeasurementsTask, doc=
"Merge measurements")
 
   32     forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
 
   33                                         doc=
"Forced measurement on coadded images")
 
   35         dtype=bool, default=
False,
 
   36         doc=(
"Are we reprocessing?\n\n" 
   37              "This exists as a workaround for large deblender footprints causing large memory use " 
   38              "and/or very slow processing.  We refuse to deblend those footprints when running on a cluster " 
   39              "and return to reprocess on a machine with larger memory or more time " 
   40              "if we consider those footprints important to recover."),
 
   46         doc=
"Should be set to True if fakes were inserted into the data being processed." 
   50         Config.setDefaults(self)
 
   55         for subtask 
in (
"mergeCoaddDetections", 
"deblendCoaddSources", 
"measureCoaddSources",
 
   56                         "mergeCoaddMeasurements", 
"forcedPhotCoadd"):
 
   57             coaddName = getattr(self, subtask).coaddName
 
   59                 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
 
   64     """TaskRunner for running MultiBandTask 
   66     This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner, 
   67     except that we have a list of data references instead of a single 
   68     data reference being passed to the Task.run, and we pass the results 
   69     of the '--reuse-outputs-from' command option to the Task constructor. 
   72     def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
 
   73         TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
 
   77         """A variant of the base version that passes a butler argument to the task's constructor 
   78         parsedCmd or args must be specified. 
   80         if parsedCmd 
is not None:
 
   81             butler = parsedCmd.butler
 
   82         elif args 
is not None:
 
   83             dataRefList, kwargs = args
 
   84             butler = dataRefList[0].butlerSubset.butler
 
   86             raise RuntimeError(
"parsedCmd or args must be specified")
 
   91     """Unpickle something by calling a factory""" 
   92     return factory(*args, **kwargs)
 
   96     """Multi-node driver for multiband processing""" 
   97     ConfigClass = MultiBandDriverConfig
 
   98     _DefaultName = 
"multiBandDriver" 
   99     RunnerClass = MultiBandDriverTaskRunner
 
  101     def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
 
  103         @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor 
  104             in case it is needed. 
  105         @param[in] schema: the schema of the source detection catalog used as input. 
  106         @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 
  107             catalog.  May be None if the butler argument is provided or all steps requiring a reference 
  108             catalog are disabled. 
  110         BatchPoolTask.__init__(self, **kwargs)
 
  112             assert butler 
is not None, 
"Butler not provided" 
  113             schema = butler.get(self.
config.coaddName +
 
  114                                 "Coadd_det_schema", immediate=
True).schema
 
  118         self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
 
  119         if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
 
  123             if self.
config.deblendCoaddSources.simultaneous:
 
  128                 err = 
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'" 
  133                              peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
 
  138         self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
 
  140                              self.mergeCoaddDetections.merged.getPeakSchema()),
 
  141                          refObjLoader=refObjLoader, butler=butler)
 
  143             self.measureCoaddSources.schema))
 
  145             self.mergeCoaddMeasurements.schema))
 
  153         return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
 
  158     def _makeArgumentParser(cls, *args, **kwargs):
 
  159         kwargs.pop(
"doBatch", 
False)
 
  161         parser.add_id_argument(
"--id", 
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
 
  162                                ContainerClass=TractDataIdContainer)
 
  163         parser.addReuseOption([
"detectCoaddSources", 
"mergeCoaddDetections", 
"measureCoaddSources",
 
  164                                "mergeCoaddMeasurements", 
"forcedPhotCoadd", 
"deblendCoaddSources"])
 
  169         """!Return walltime request for batch job 
  171         @param time: Requested time per iteration 
  172         @param parsedCmd: Results of argument parsing 
  173         @param numCores: Number of cores 
  176         for refList 
in parsedCmd.id.refList:
 
  177             numTargets += len(refList)
 
  178         return time*numTargets/float(numCpus)
 
  182         """!Run multiband processing on coadds 
  184         Only the master node runs this method. 
  186         No real MPI communication (scatter/gather) takes place: all I/O goes 
  187         through the disk. We want the intermediate stages on disk, and the 
  188         component Tasks are implemented around this, so we just follow suit. 
  190         @param patchRefList:  Data references to run measurement 
  192         for patchRef 
in patchRefList:
 
  194                 butler = patchRef.getButler()
 
  197             raise RuntimeError(
"No valid patches")
 
  200         pool.storeSet(butler=butler)
 
  214         if self.
config.doDetection:
 
  216             for patchRef 
in patchRefList:
 
  217                 if (
"detectCoaddSources" in self.
reuse and 
  218                         patchRef.datasetExists(self.
coaddType + 
"Coadd_calexp", write=
True)):
 
  219                     self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
 
  222                 if not patchRef.datasetExists(self.
coaddType + 
"Coadd"):
 
  223                     self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
 
  224                                    (patchRef.dataId, self.
config.coaddName))
 
  226                 detectionList.append(patchRef)
 
  230         patchRefList = [patchRef 
for patchRef 
in patchRefList 
if 
  231                         patchRef.datasetExists(self.
coaddType + 
"Coadd_calexp") 
and 
  232                         patchRef.datasetExists(self.
config.coaddName + 
"Coadd_det",
 
  233                                                write=self.
config.doDetection)]
 
  234         dataIdList = [patchRef.dataId 
for patchRef 
in patchRefList]
 
  239         for patchRef 
in patchRefList:
 
  240             dataId = patchRef.dataId
 
  242                 tract = dataId[
"tract"]
 
  244                 assert tract == dataId[
"tract"]
 
  246             patch = dataId[
"patch"]
 
  247             if patch 
not in patches:
 
  249             patches[patch].
append(dataId)
 
  278             if self.
config.reprocessing:
 
  279                 patchReprocessing = {}
 
  280                 for dataId, reprocess 
in zip(dataIdList, reprocessed):
 
  281                     patchId = dataId[
"patch"]
 
  282                     patchReprocessing[patchId] = patchReprocessing.get(
 
  283                         patchId, 
False) 
or reprocess
 
  285                 reprocessDataset = self.
config.coaddName + 
"Coadd_multibandReprocessing" 
  286                 for patchId 
in patchReprocessing:
 
  287                     if not patchReprocessing[patchId]:
 
  289                     dataId = dict(tract=tract, patch=patchId)
 
  290                     if patchReprocessing[patchId]:
 
  291                         filename = butler.get(
 
  292                             reprocessDataset + 
"_filename", dataId)[0]
 
  293                         open(filename, 
'a').close()  
 
  294                     elif butler.datasetExists(reprocessDataset, dataId):
 
  297                         patchReprocessing[patchId] = 
True 
  301                                         patchReprocessing[dataId1[
"patch"]]])
 
  303                                              not self.
config.reprocessing 
or patchReprocessing[patchId]])
 
  304         pool.map(self.
runForcedPhot, [dataId1 
for dataId1 
in dataIdList 
if not self.
config.reprocessing 
or 
  305                                       patchReprocessing[dataId1[
"patch"]]])
 
  308         if self.
config.reprocessing:
 
  309             for patchId 
in patchReprocessing:
 
  310                 if not patchReprocessing[patchId]:
 
  312                 dataId = dict(tract=tract, patch=patchId)
 
  313                 filename = butler.get(
 
  314                     reprocessDataset + 
"_filename", dataId)[0]
 
  318         """! Run detection on a patch 
  320         Only slave nodes execute this method. 
  322         @param cache: Pool cache, containing butler 
  323         @param patchRef: Patch on which to do detection 
  326             idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
 
  327             coadd = patchRef.get(self.
coaddType + 
"Coadd", immediate=
True)
 
  328             expId = int(patchRef.get(self.
config.coaddName + 
"CoaddId"))
 
  330             detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
 
  331             self.detectCoaddSources.
write(detResults, patchRef)
 
  335         """!Run detection merging on a patch 
  337         Only slave nodes execute this method. 
  339         @param cache: Pool cache, containing butler 
  340         @param dataIdList: List of data identifiers for the patch in different filters 
  342         with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
 
  344                            dataId 
in dataIdList]
 
  345             if (
"mergeCoaddDetections" in self.
reuse and 
  346                     dataRefList[0].datasetExists(self.
config.coaddName + 
"Coadd_mergeDet", write=
True)):
 
  347                 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
 
  348                               dataRefList[0].dataId)
 
  350             self.mergeCoaddDetections.
runDataRef(dataRefList)
 
  353         """Run the deblender on a list of dataId's 
  355         Only slave nodes execute this method. 
  360             Pool cache with butler. 
  362             Data identifier for patch in each band. 
  367             whether the patch requires reprocessing. 
  369         with self.
logOperation(
"deblending %s" % (dataIdList,)):
 
  371                            dataId 
in dataIdList]
 
  373             if (
"deblendCoaddSources" in self.
reuse and 
  375                                            write=
True) 
for dataRef 
in dataRefList])):
 
  376                 if not self.
config.reprocessing:
 
  377                     self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
 
  382                 bigFlag = catalog[
"deblend_parentTooBig"]
 
  384                 numOldBig = bigFlag.sum()
 
  386                     self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
 
  390                 if self.
config.deblendCoaddSources.simultaneous:
 
  391                     deblender = self.deblendCoaddSources.multiBandDeblend
 
  393                     deblender = self.deblendCoaddSources.singleBandDeblend
 
  398                 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) 
for 
  399                                  src 
in catalog[bigFlag]))
 
  400                 if numNewBig == numOldBig:
 
  401                     self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
 
  402                                   (numOldBig, dataRefList[0].dataId,))
 
  404                 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
 
  405                               (numOldBig - numNewBig, [dataRef.dataId 
for dataRef 
in dataRefList]))
 
  408             self.deblendCoaddSources.
runDataRef(dataRefList)
 
  412         """Run measurement on a patch for a single filter 
  414         Only slave nodes execute this method. 
  419             Pool cache, with butler 
  421             Data identifier for patch 
  423         with self.
logOperation(
"measurements on %s" % (dataId,)):
 
  425             if (
"measureCoaddSources" in self.
reuse and 
  426                 not self.
config.reprocessing 
and 
  427                     dataRef.datasetExists(self.
config.coaddName + 
"Coadd_meas", write=
True)):
 
  428                 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
 
  433         """!Run measurement merging on a patch 
  435         Only slave nodes execute this method. 
  437         @param cache: Pool cache, containing butler 
  438         @param dataIdList: List of data identifiers for the patch in different filters 
  440         with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
 
  442                            dataId 
in dataIdList]
 
  443             if (
"mergeCoaddMeasurements" in self.
reuse and 
  444                 not self.
config.reprocessing 
and 
  445                     dataRefList[0].datasetExists(self.
config.coaddName + 
"Coadd_ref", write=
True)):
 
  446                 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
 
  447                               dataRefList[0].dataId)
 
  449             self.mergeCoaddMeasurements.
runDataRef(dataRefList)
 
  452         """!Run forced photometry on a patch for a single filter 
  454         Only slave nodes execute this method. 
  456         @param cache: Pool cache, with butler 
  457         @param dataId: Data identifier for patch 
  459         with self.
logOperation(
"forced photometry on %s" % (dataId,)):
 
  462             if (
"forcedPhotCoadd" in self.
reuse and 
  463                 not self.
config.reprocessing 
and 
  464                     dataRef.datasetExists(self.
config.coaddName + 
"Coadd_forced_src", write=
True)):
 
  465                 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
 
  470         """We don't collect any metadata, so skip"""