7                                        DeblendCoaddSourcesTask,
 
    8                                        MeasureMergedCoaddSourcesTask,
 
    9                                        MergeMeasurementsTask,)
 
   20     coaddName = 
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
   21     doDetection = 
Field(dtype=bool, default=
False,
 
   22                         doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
 
   24                                            doc=
"Detect sources on coadd")
 
   26         target=MergeDetectionsTask, doc=
"Merge detections")
 
   27     deblendCoaddSources = 
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
 
   29                                             doc=
"Measure merged and (optionally) deblended detections")
 
   31         target=MergeMeasurementsTask, doc=
"Merge measurements")
 
   33                                         doc=
"Forced measurement on coadded images")
 
   35         dtype=bool, default=
False,
 
   36         doc=(
"Are we reprocessing?\n\n" 
   37              "This exists as a workaround for large deblender footprints causing large memory use " 
   38              "and/or very slow processing.  We refuse to deblend those footprints when running on a cluster " 
   39              "and return to reprocess on a machine with larger memory or more time " 
   40              "if we consider those footprints important to recover."),
 
   46         doc=
"Should be set to True if fakes were inserted into the data being processed." 
   50         Config.setDefaults(self)
 
   51         self.
forcedPhotCoaddforcedPhotCoadd.references.retarget(MultiBandReferencesTask)
 
   55         for subtask 
in (
"mergeCoaddDetections", 
"deblendCoaddSources", 
"measureCoaddSources",
 
   56                         "mergeCoaddMeasurements", 
"forcedPhotCoadd"):
 
   57             coaddName = getattr(self, subtask).coaddName
 
   59                 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
 
   60                                    (subtask, coaddName, self.
coaddNamecoaddName))
 
   64     """TaskRunner for running MultiBandTask 
   66     This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner, 
   67     except that we have a list of data references instead of a single 
   68     data reference being passed to the Task.run, and we pass the results 
   69     of the '--reuse-outputs-from' command option to the Task constructor. 
   72     def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
 
   73         TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
 
   74         self.
reusereuse = parsedCmd.reuse
 
   77         """A variant of the base version that passes a butler argument to the task's constructor 
   78         parsedCmd or args must be specified. 
   80         if parsedCmd 
is not None:
 
   81             butler = parsedCmd.butler
 
   82         elif args 
is not None:
 
   83             dataRefList, kwargs = args
 
   84             butler = dataRefList[0].butlerSubset.butler
 
   86             raise RuntimeError(
"parsedCmd or args must be specified")
 
   87         return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.
reusereuse)
 
   91     """Unpickle something by calling a factory""" 
   92     return factory(*args, **kwargs)
 
   96     """Multi-node driver for multiband processing""" 
   97     ConfigClass = MultiBandDriverConfig
 
   98     _DefaultName = 
"multiBandDriver" 
   99     RunnerClass = MultiBandDriverTaskRunner
 
  101     def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
 
  103         @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor 
  104             in case it is needed. 
  105         @param[in] schema: the schema of the source detection catalog used as input. 
  106         @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference 
  107             catalog.  May be None if the butler argument is provided or all steps requiring a reference 
  108             catalog are disabled. 
  110         BatchPoolTask.__init__(self, **kwargs)
 
  112             assert butler 
is not None, 
"Butler not provided" 
  113             schema = butler.get(self.config.coaddName +
 
  114                                 "Coadd_det_schema", immediate=
True).schema
 
  117         self.makeSubtask(
"detectCoaddSources")
 
  118         self.makeSubtask(
"mergeCoaddDetections", schema=schema)
 
  119         if self.config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
 
  125                 err = 
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'" 
  128             self.makeSubtask(
"deblendCoaddSources",
 
  130                              peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
 
  135         self.makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
 
  137                              self.mergeCoaddDetections.merged.getPeakSchema()),
 
  138                          refObjLoader=refObjLoader, butler=butler)
 
  140             self.measureCoaddSources.schema))
 
  142             self.mergeCoaddMeasurements.schema))
 
  143         if self.config.hasFakes:
 
  144             self.
coaddTypecoaddType = 
"fakes_" + self.config.coaddName
 
  146             self.
coaddTypecoaddType = self.config.coaddName
 
  150         return unpickle, (self.__class__, [], dict(config=self.config, name=self._name,
 
  151                                                    parentTask=self._parentTask, log=self.log,
 
  155     def _makeArgumentParser(cls, *args, **kwargs):
 
  156         kwargs.pop(
"doBatch", 
False)
 
  157         parser = ArgumentParser(name=cls.
_DefaultName_DefaultName, *args, **kwargs)
 
  158         parser.add_id_argument(
"--id", 
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
 
  159                                ContainerClass=TractDataIdContainer)
 
  160         parser.addReuseOption([
"detectCoaddSources", 
"mergeCoaddDetections", 
"measureCoaddSources",
 
  161                                "mergeCoaddMeasurements", 
"forcedPhotCoadd", 
"deblendCoaddSources"])
 
  166         """!Return walltime request for batch job 
  168         @param time: Requested time per iteration 
  169         @param parsedCmd: Results of argument parsing 
  170         @param numCores: Number of cores 
  173         for refList 
in parsedCmd.id.refList:
 
  174             numTargets += len(refList)
 
  175         return time*numTargets/float(numCpus)
 
  179         """!Run multiband processing on coadds 
  181         Only the master node runs this method. 
  183         No real MPI communication (scatter/gather) takes place: all I/O goes 
  184         through the disk. We want the intermediate stages on disk, and the 
  185         component Tasks are implemented around this, so we just follow suit. 
  187         @param patchRefList:  Data references to run measurement 
  189         for patchRef 
in patchRefList:
 
  191                 butler = patchRef.getButler()
 
  194             raise RuntimeError(
"No valid patches")
 
  197         pool.storeSet(butler=butler)
 
  211         if self.config.doDetection:
 
  213             for patchRef 
in patchRefList:
 
  214                 if (
"detectCoaddSources" in self.
reusereuse 
and 
  215                         patchRef.datasetExists(self.
coaddTypecoaddType + 
"Coadd_calexp", write=
True)):
 
  216                     self.log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
 
  219                 if not patchRef.datasetExists(self.
coaddTypecoaddType + 
"Coadd"):
 
  220                     self.log.
debug(
"Not processing %s; required input %sCoadd missing." %
 
  221                                    (patchRef.dataId, self.config.coaddName))
 
  223                 detectionList.append(patchRef)
 
  227         patchRefList = [patchRef 
for patchRef 
in patchRefList 
if 
  228                         patchRef.datasetExists(self.
coaddTypecoaddType + 
"Coadd_calexp") 
and 
  229                         patchRef.datasetExists(self.config.coaddName + 
"Coadd_det",
 
  230                                                write=self.config.doDetection)]
 
  231         dataIdList = [patchRef.dataId 
for patchRef 
in patchRefList]
 
  236         for patchRef 
in patchRefList:
 
  237             dataId = patchRef.dataId
 
  239                 tract = dataId[
"tract"]
 
  241                 assert tract == dataId[
"tract"]
 
  243             patch = dataId[
"patch"]
 
  244             if patch 
not in patches:
 
  246             patches[patch].
append(dataId)
 
  273             reprocessed = pool.map(self.
runDeblendMergedrunDeblendMerged, patches.values())
 
  275             if self.config.reprocessing:
 
  276                 patchReprocessing = {}
 
  277                 for dataId, reprocess 
in zip(dataIdList, reprocessed):
 
  278                     patchId = dataId[
"patch"]
 
  279                     patchReprocessing[patchId] = patchReprocessing.get(
 
  280                         patchId, 
False) 
or reprocess
 
  282                 reprocessDataset = self.config.coaddName + 
"Coadd_multibandReprocessing" 
  283                 for patchId 
in patchReprocessing:
 
  284                     if not patchReprocessing[patchId]:
 
  286                     dataId = dict(tract=tract, patch=patchId)
 
  287                     if patchReprocessing[patchId]:
 
  288                         filename = butler.get(
 
  289                             reprocessDataset + 
"_filename", dataId)[0]
 
  290                         open(filename, 
'a').close()  
 
  291                     elif butler.datasetExists(reprocessDataset, dataId):
 
  294                         patchReprocessing[patchId] = 
True 
  297         pool.map(self.
runMeasurementsrunMeasurements, [dataId1 
for dataId1 
in dataIdList 
if not self.config.reprocessing 
or 
  298                                         patchReprocessing[dataId1[
"patch"]]])
 
  299         pool.map(self.
runMergeMeasurementsrunMergeMeasurements, [idList 
for patchId, idList 
in patches.items() 
if 
  300                                              not self.config.reprocessing 
or patchReprocessing[patchId]])
 
  301         pool.map(self.
runForcedPhotrunForcedPhot, [dataId1 
for dataId1 
in dataIdList 
if not self.config.reprocessing 
or 
  302                                       patchReprocessing[dataId1[
"patch"]]])
 
  305         if self.config.reprocessing:
 
  306             for patchId 
in patchReprocessing:
 
  307                 if not patchReprocessing[patchId]:
 
  309                 dataId = dict(tract=tract, patch=patchId)
 
  310                 filename = butler.get(
 
  311                     reprocessDataset + 
"_filename", dataId)[0]
 
  315         """! Run detection on a patch 
  317         Only slave nodes execute this method. 
  319         @param cache: Pool cache, containing butler 
  320         @param patchRef: Patch on which to do detection 
  323             idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
 
  324             coadd = patchRef.get(self.
coaddTypecoaddType + 
"Coadd", immediate=
True)
 
  325             expId = int(patchRef.get(self.config.coaddName + 
"CoaddId"))
 
  326             self.detectCoaddSources.emptyMetadata()
 
  327             detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
 
  328             self.detectCoaddSources.
write(detResults, patchRef)
 
  332         """!Run detection merging on a patch 
  334         Only slave nodes execute this method. 
  336         @param cache: Pool cache, containing butler 
  337         @param dataIdList: List of data identifiers for the patch in different filters 
  339         with self.
logOperationlogOperation(
"merge detections from %s" % (dataIdList,)):
 
  340             dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType + 
"Coadd_calexp") 
for 
  341                            dataId 
in dataIdList]
 
  342             if (
"mergeCoaddDetections" in self.
reusereuse 
and 
  343                     dataRefList[0].datasetExists(self.config.coaddName + 
"Coadd_mergeDet", write=
True)):
 
  344                 self.log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
 
  345                               dataRefList[0].dataId)
 
  347             self.mergeCoaddDetections.
runDataRef(dataRefList)
 
  350         """Run the deblender on a list of dataId's 
  352         Only slave nodes execute this method. 
  357             Pool cache with butler. 
  359             Data identifier for patch in each band. 
  364             whether the patch requires reprocessing. 
  366         with self.
logOperationlogOperation(
"deblending %s" % (dataIdList,)):
 
  367             dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType + 
"Coadd_calexp") 
for 
  368                            dataId 
in dataIdList]
 
  370             if (
"deblendCoaddSources" in self.
reusereuse 
and 
  371                 all([dataRef.datasetExists(self.config.coaddName + 
"Coadd_" + self.
measurementInputmeasurementInput,
 
  372                                            write=
True) 
for dataRef 
in dataRefList])):
 
  373                 if not self.config.reprocessing:
 
  374                     self.log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
 
  378                 catalog = dataRefList[0].get(self.config.coaddName + 
"Coadd_" + self.
measurementInputmeasurementInput)
 
  379                 bigFlag = catalog[
"deblend_parentTooBig"]
 
  381                 numOldBig = bigFlag.sum()
 
  383                     self.log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
 
  387                 if self.config.deblendCoaddSources.simultaneous:
 
  388                     deblender = self.deblendCoaddSources.multiBandDeblend
 
  390                     deblender = self.deblendCoaddSources.singleBandDeblend
 
  395                 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) 
for 
  396                                  src 
in catalog[bigFlag]))
 
  397                 if numNewBig == numOldBig:
 
  398                     self.log.
info(
"All %d formerly large footprints continue to be large in %s" %
 
  399                                   (numOldBig, dataRefList[0].dataId,))
 
  401                 self.log.
info(
"Found %d large footprints to be reprocessed in %s" %
 
  402                               (numOldBig - numNewBig, [dataRef.dataId 
for dataRef 
in dataRefList]))
 
  405             self.deblendCoaddSources.
runDataRef(dataRefList)
 
  409         """Run measurement on a patch for a single filter 
  411         Only slave nodes execute this method. 
  416             Pool cache, with butler 
  418             Data identifier for patch 
  420         with self.
logOperationlogOperation(
"measurements on %s" % (dataId,)):
 
  422             if (
"measureCoaddSources" in self.
reusereuse 
and 
  423                 not self.config.reprocessing 
and 
  424                     dataRef.datasetExists(self.config.coaddName + 
"Coadd_meas", write=
True)):
 
  425                 self.log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
 
  430         """!Run measurement merging on a patch 
  432         Only slave nodes execute this method. 
  434         @param cache: Pool cache, containing butler 
  435         @param dataIdList: List of data identifiers for the patch in different filters 
  437         with self.
logOperationlogOperation(
"merge measurements from %s" % (dataIdList,)):
 
  438             dataRefList = [
getDataRef(cache.butler, dataId, self.
coaddTypecoaddType + 
"Coadd_calexp") 
for 
  439                            dataId 
in dataIdList]
 
  440             if (
"mergeCoaddMeasurements" in self.
reusereuse 
and 
  441                 not self.config.reprocessing 
and 
  442                     dataRefList[0].datasetExists(self.config.coaddName + 
"Coadd_ref", write=
True)):
 
  443                 self.log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
 
  444                               dataRefList[0].dataId)
 
  446             self.mergeCoaddMeasurements.
runDataRef(dataRefList)
 
  449         """!Run forced photometry on a patch for a single filter 
  451         Only slave nodes execute this method. 
  453         @param cache: Pool cache, with butler 
  454         @param dataId: Data identifier for patch 
  456         with self.
logOperationlogOperation(
"forced photometry on %s" % (dataId,)):
 
  458                                  self.
coaddTypecoaddType + 
"Coadd_calexp")
 
  459             if (
"forcedPhotCoadd" in self.
reusereuse 
and 
  460                 not self.config.reprocessing 
and 
  461                     dataRef.datasetExists(self.config.coaddName + 
"Coadd_forced_src", write=
True)):
 
  462                 self.log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
 
  467         """We don't collect any metadata, so skip""" 
Defines the fields and offsets for a table.
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job.
def runDetection(self, cache, patchRef)
Run detection on a patch.
def writeMetadata(self, dataRef)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.
def runDeblendMerged(self, cache, dataIdList)
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch.
def runMeasurements(self, cache, dataId)
def runDataRef(self, patchRefList)
Run multiband processing on coadds.
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter.
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs)
def makeTask(self, parsedCmd=None, args=None)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def run(self, coaddExposures, bbox, wcs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def unpickle(factory, args, kwargs)
def getDataRef(butler, dataId, datasetType="raw")