1 from __future__ 
import absolute_import, division, print_function
     3 from argparse 
import ArgumentError
     5 from builtins 
import zip
    11                                        DeblendCoaddSourcesTask,
    12                                        MeasureMergedCoaddSourcesTask,
    13                                        MergeMeasurementsTask,)
    27         """!Make self.refList from self.idList    29         It's difficult to make a data reference that merely points to an entire    30         tract: there is no data product solely at the tract level.  Instead, we    31         generate a list of data references for patches within the tract.    33         @param namespace namespace object that is the result of an argument parser    35         datasetType = namespace.config.coaddName + 
"Coadd_calexp"    37         def getPatchRefList(tract):
    38             return [namespace.butler.dataRef(datasetType=datasetType,
    40                                              filter=dataId[
"filter"],
    41                                              patch=
"%d,%d" % patch.getIndex())
    45         for dataId 
in self.idList:
    48             if "filter" not in dataId:
    49                 raise ArgumentError(
None, 
"--id must include 'filter'")
    51             skymap = self.getSkymap(namespace, datasetType)
    54                 tractId = dataId[
"tract"]
    55                 if tractId 
not in tractRefs:
    56                     tractRefs[tractId] = []
    58                     tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=datasetType,
    62                                                                        patch=dataId[
'patch']))
    64                     tractRefs[tractId] += getPatchRefList(skymap[tractId])
    66                 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
    73     coaddName = 
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
    74     doDetection = 
Field(dtype=bool, default=
False,
    75                         doc=
"Re-run detection? (requires *Coadd dataset to have been written)")
    77                                            doc=
"Detect sources on coadd")
    79         target=MergeDetectionsTask, doc=
"Merge detections")
    80     deblendCoaddSources = 
ConfigurableField(target=DeblendCoaddSourcesTask, doc=
"Deblend merged detections")
    82                                             doc=
"Measure merged and (optionally) deblended detections")
    84         target=MergeMeasurementsTask, doc=
"Merge measurements")
    86                                         doc=
"Forced measurement on coadded images")
    88         dtype=bool, default=
False,
    89         doc=(
"Are we reprocessing?\n\n"    90              "This exists as a workaround for large deblender footprints causing large memory use "    91              "and/or very slow processing.  We refuse to deblend those footprints when running on a cluster "    92              "and return to reprocess on a machine with larger memory or more time "    93              "if we consider those footprints important to recover."),
    97         Config.setDefaults(self)
   101         for subtask 
in (
"mergeCoaddDetections", 
"deblendCoaddSources", 
"measureCoaddSources",
   102                         "mergeCoaddMeasurements", 
"forcedPhotCoadd"):
   103             coaddName = getattr(self, subtask).coaddName
   105                 raise RuntimeError(
"%s.coaddName (%s) doesn't match root coaddName (%s)" %
   110     """TaskRunner for running MultiBandTask   112     This is similar to the lsst.pipe.base.ButlerInitializedTaskRunner,   113     except that we have a list of data references instead of a single   114     data reference being passed to the Task.run, and we pass the results   115     of the '--reuse-outputs-from' command option to the Task constructor.   118     def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
   119         TaskRunner.__init__(self, TaskClass, parsedCmd, doReturnResults)
   123         """A variant of the base version that passes a butler argument to the task's constructor   124         parsedCmd or args must be specified.   126         if parsedCmd 
is not None:
   127             butler = parsedCmd.butler
   128         elif args 
is not None:
   129             dataRefList, kwargs = args
   130             butler = dataRefList[0].butlerSubset.butler
   132             raise RuntimeError(
"parsedCmd or args must be specified")
   137     """Unpickle something by calling a factory"""   138     return factory(*args, **kwargs)
   142     """Multi-node driver for multiband processing"""   143     ConfigClass = MultiBandDriverConfig
   144     _DefaultName = 
"multiBandDriver"   145     RunnerClass = MultiBandDriverTaskRunner
   147     def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), **kwargs):
   149         @param[in] butler: the butler can be used to retrieve schema or passed to the refObjLoader constructor   150             in case it is needed.   151         @param[in] schema: the schema of the source detection catalog used as input.   152         @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference   153             catalog.  May be None if the butler argument is provided or all steps requiring a reference   154             catalog are disabled.   156         BatchPoolTask.__init__(self, **kwargs)
   158             assert butler 
is not None, 
"Butler not provided"   159             schema = butler.get(self.
config.coaddName +
   160                                 "Coadd_det_schema", immediate=
True).schema
   164         self.
makeSubtask(
"mergeCoaddDetections", schema=schema)
   165         if self.
config.measureCoaddSources.inputCatalog.startswith(
"deblended"):
   169             if self.
config.deblendCoaddSources.simultaneous:
   170                 if self.
config.deblendCoaddSources.multiBandDeblend.conserveFlux:
   172                 if self.
config.deblendCoaddSources.multiBandDeblend.saveTemplates:
   177                 err = 
"Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"   182                              peakSchema=
afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
   187         self.
makeSubtask(
"measureCoaddSources", schema=measureInputSchema,
   189                              self.mergeCoaddDetections.merged.getPeakSchema()),
   190                          refObjLoader=refObjLoader, butler=butler)
   192             self.measureCoaddSources.schema))
   194             self.mergeCoaddMeasurements.schema))
   198         return unpickle, (self.__class__, [], dict(config=self.
config, name=self.
_name,
   203     def _makeArgumentParser(cls, *args, **kwargs):
   204         kwargs.pop(
"doBatch", 
False)
   206         parser.add_id_argument(
"--id", 
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2",
   207                                ContainerClass=TractDataIdContainer)
   208         parser.addReuseOption([
"detectCoaddSources", 
"mergeCoaddDetections", 
"measureCoaddSources",
   209                                "mergeCoaddMeasurements", 
"forcedPhotCoadd", 
"deblendCoaddSources"])
   214         """!Return walltime request for batch job   216         @param time: Requested time per iteration   217         @param parsedCmd: Results of argument parsing   218         @param numCores: Number of cores   221         for refList 
in parsedCmd.id.refList:
   222             numTargets += len(refList)
   223         return time*numTargets/
float(numCpus)
   227         """!Run multiband processing on coadds   229         Only the master node runs this method.   231         No real MPI communication (scatter/gather) takes place: all I/O goes   232         through the disk. We want the intermediate stages on disk, and the   233         component Tasks are implemented around this, so we just follow suit.   235         @param patchRefList:  Data references to run measurement   237         for patchRef 
in patchRefList:
   239                 butler = patchRef.getButler()
   242             raise RuntimeError(
"No valid patches")
   245         pool.storeSet(butler=butler)
   260         if self.
config.doDetection:
   262             for patchRef 
in patchRefList:
   263                 if (
"detectCoaddSources" in self.
reuse and   264                         patchRef.datasetExists(self.
config.coaddName + 
"Coadd_calexp", write=
True)):
   265                     self.
log.
info(
"Skipping detectCoaddSources for %s; output already exists." %
   268                 if not patchRef.datasetExists(self.
config.coaddName + 
"Coadd"):
   269                     self.
log.
debug(
"Not processing %s; required input %sCoadd missing." %
   270                                    (patchRef.dataId, self.
config.coaddName))
   272                 detectionList.append(patchRef)
   276         patchRefList = [patchRef 
for patchRef 
in patchRefList 
if   277                         patchRef.datasetExists(self.
config.coaddName + 
"Coadd_calexp") 
and   278                         patchRef.datasetExists(self.
config.coaddName + 
"Coadd_det",
   279                                                write=self.
config.doDetection)]
   280         dataIdList = [patchRef.dataId 
for patchRef 
in patchRefList]
   285         for patchRef 
in patchRefList:
   286             dataId = patchRef.dataId
   288                 tract = dataId[
"tract"]
   290                 assert tract == dataId[
"tract"]
   292             patch = dataId[
"patch"]
   293             if patch 
not in patches:
   295             patches[patch].
append(dataId)
   324             if self.
config.reprocessing:
   325                 patchReprocessing = {}
   326                 for dataId, reprocess 
in zip(dataIdList, reprocessed):
   327                     patchId = dataId[
"patch"]
   328                     patchReprocessing[patchId] = patchReprocessing.get(
   329                         patchId, 
False) 
or reprocess
   331                 reprocessDataset = self.
config.coaddName + 
"Coadd_multibandReprocessing"   332                 for patchId 
in patchReprocessing:
   333                     if not patchReprocessing[patchId]:
   335                     dataId = dict(tract=tract, patch=patchId)
   336                     if patchReprocessing[patchId]:
   337                         filename = butler.get(
   338                             reprocessDataset + 
"_filename", dataId)[0]
   339                         open(filename, 
'a').close()  
   340                     elif butler.datasetExists(reprocessDataset, dataId):
   343                         patchReprocessing[patchId] = 
True   347                                         patchReprocessing[dataId1[
"patch"]]])
   349                                              not self.
config.reprocessing 
or patchReprocessing[patchId]])
   350         pool.map(self.
runForcedPhot, [dataId1 
for dataId1 
in dataIdList 
if not self.
config.reprocessing 
or   351                                       patchReprocessing[dataId1[
"patch"]]])
   354         if self.
config.reprocessing:
   355             for patchId 
in patchReprocessing:
   356                 if not patchReprocessing[patchId]:
   358                 dataId = dict(tract=tract, patch=patchId)
   359                 filename = butler.get(
   360                     reprocessDataset + 
"_filename", dataId)[0]
   364         """! Run detection on a patch   366         Only slave nodes execute this method.   368         @param cache: Pool cache, containing butler   369         @param patchRef: Patch on which to do detection   372             idFactory = self.detectCoaddSources.makeIdFactory(patchRef)
   373             coadd = patchRef.get(self.
config.coaddName + 
"Coadd",
   375             expId = 
int(patchRef.get(self.
config.coaddName + 
"CoaddId"))
   377             detResults = self.detectCoaddSources.
run(coadd, idFactory, expId=expId)
   378             self.detectCoaddSources.write(detResults, patchRef)
   382         """!Run detection merging on a patch   384         Only slave nodes execute this method.   386         @param cache: Pool cache, containing butler   387         @param dataIdList: List of data identifiers for the patch in different filters   389         with self.
logOperation(
"merge detections from %s" % (dataIdList,)):
   390             dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName + 
"Coadd_calexp") 
for   391                            dataId 
in dataIdList]
   392             if (
"mergeCoaddDetections" in self.
reuse and   393                     dataRefList[0].datasetExists(self.
config.coaddName + 
"Coadd_mergeDet", write=
True)):
   394                 self.
log.
info(
"Skipping mergeCoaddDetections for %s; output already exists." %
   395                               dataRefList[0].dataId)
   397             self.mergeCoaddDetections.
runDataRef(dataRefList)
   400         """Run the deblender on a list of dataId's   402         Only slave nodes execute this method.   407             Pool cache with butler.   409             Data identifier for patch in each band.   414             whether the patch requires reprocessing.   416         with self.
logOperation(
"deblending %s" % (dataIdList,)):
   417             dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName + 
"Coadd_calexp") 
for   418                            dataId 
in dataIdList]
   420             if (
"deblendCoaddSources" in self.
reuse and   422                                            write=
True) 
for dataRef 
in dataRefList])):
   423                 if not self.
config.reprocessing:
   424                     self.
log.
info(
"Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
   429                 bigFlag = catalog[
"deblend_parentTooBig"]
   431                 numOldBig = bigFlag.sum()
   433                     self.
log.
info(
"No large footprints in %s" % (dataRefList[0].dataId))
   437                 if self.
config.deblendCoaddSources.simultaneous:
   438                     deblender = self.deblendCoaddSources.multiBandDeblend
   440                     deblender = self.deblendCoaddSources.singleBandDeblend
   445                 numNewBig = sum((deblender.isLargeFootprint(src.getFootprint()) 
for   446                                  src 
in catalog[bigFlag]))
   447                 if numNewBig == numOldBig:
   448                     self.
log.
info(
"All %d formerly large footprints continue to be large in %s" %
   449                                   (numOldBig, dataRefList[0].dataId,))
   451                 self.
log.
info(
"Found %d large footprints to be reprocessed in %s" %
   452                               (numOldBig - numNewBig, [dataRef.dataId 
for dataRef 
in dataRefList]))
   455             self.deblendCoaddSources.
runDataRef(dataRefList)
   459         """Run measurement on a patch for a single filter   461         Only slave nodes execute this method.   466             Pool cache, with butler   468             Data identifier for patch   470         with self.
logOperation(
"measurements on %s" % (dataId,)):
   472                                  self.
config.coaddName + 
"Coadd_calexp")
   473             if (
"measureCoaddSources" in self.
reuse and   474                 not self.
config.reprocessing 
and   475                     dataRef.datasetExists(self.
config.coaddName + 
"Coadd_meas", write=
True)):
   476                 self.
log.
info(
"Skipping measuretCoaddSources for %s; output already exists" % dataId)
   481         """!Run measurement merging on a patch   483         Only slave nodes execute this method.   485         @param cache: Pool cache, containing butler   486         @param dataIdList: List of data identifiers for the patch in different filters   488         with self.
logOperation(
"merge measurements from %s" % (dataIdList,)):
   489             dataRefList = [
getDataRef(cache.butler, dataId, self.
config.coaddName + 
"Coadd_calexp") 
for   490                            dataId 
in dataIdList]
   491             if (
"mergeCoaddMeasurements" in self.
reuse and   492                 not self.
config.reprocessing 
and   493                     dataRefList[0].datasetExists(self.
config.coaddName + 
"Coadd_ref", write=
True)):
   494                 self.
log.
info(
"Skipping mergeCoaddMeasurements for %s; output already exists" %
   495                               dataRefList[0].dataId)
   497             self.mergeCoaddMeasurements.
runDataRef(dataRefList)
   500         """!Run forced photometry on a patch for a single filter   502         Only slave nodes execute this method.   504         @param cache: Pool cache, with butler   505         @param dataId: Data identifier for patch   507         with self.
logOperation(
"forced photometry on %s" % (dataId,)):
   509                                  self.
config.coaddName + 
"Coadd_calexp")
   510             if (
"forcedPhotCoadd" in self.
reuse and   511                 not self.
config.reprocessing 
and   512                     dataRef.datasetExists(self.
config.coaddName + 
"Coadd_forced_src", write=
True)):
   513                 self.
log.
info(
"Skipping forcedPhotCoadd for %s; output already exists" % dataId)
   518         """We don't collect any metadata, so skip""" Defines the fields and offsets for a table. 
def makeSubtask(self, name, keyArgs)
def unpickle(factory, args, kwargs)
def runDataRef(self, patchRefList)
Run multiband processing on coadds. 
def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series. 
def writeMetadata(self, dataRef)
def runDeblendMerged(self, cache, dataIdList)
def getDataRef(butler, dataId, datasetType="raw")
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true. 
def runMeasurements(self, cache, dataId)
def runForcedPhot(self, cache, dataId)
Run forced photometry on a patch for a single filter. 
def batchWallTime(cls, time, parsedCmd, numCpus)
Return walltime request for batch job. 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation. 
def makeDataRefList(self, namespace)
Make self.refList from self.idList. 
def runDetection(self, cache, patchRef)
Run detection on a patch. 
def runMergeDetections(self, cache, dataIdList)
Run detection merging on a patch. 
daf::base::PropertyList * list
def makeTask(self, parsedCmd=None, args=None)
def runMergeMeasurements(self, cache, dataIdList)
Run measurement merging on a patch.