9 __all__ = [
"DemoTask", ]
13 """Task for demonstrating the BatchPoolTask functionality""" 18 def _makeArgumentParser(cls, *args, **kwargs):
19 kwargs.pop(
'doBatch',
False)
21 parser.add_id_argument(
"--id", datasetType=
"raw", level=
"visit",
22 help=
"data ID, e.g. --id visit=12345")
27 """Return walltime request for batch job 29 Subclasses should override if the walltime should be calculated 30 differently (e.g., addition of some serial time). 32 @param time: Requested time per iteration 33 @param parsedCmd: Results of argument parsing 34 @param numCores: Number of cores 36 numTargets = [sum(1
for ccdRef
in visitRef.subItems(
"ccd")
if ccdRef.datasetExists(
"raw"))
for 37 visitRef
in parsedCmd.id.refList]
38 return time*sum(math.ceil(tt/numCores)
for tt
in numTargets)
43 Only the master node runs this method. It will dispatch jobs to the 49 dataIdList = dict([(ccdRef.get(
"ccdExposureId"), ccdRef.dataId)
50 for ccdRef
in visitRef.subItems(
"ccd")
if ccdRef.datasetExists(
"raw")])
51 dataIdList = collections.OrderedDict(sorted(dataIdList.items()))
54 total = pool.reduce(operator.add, self.
run,
list(dataIdList.values()),
55 butler=visitRef.getButler())
56 self.
log.
info(
"Total number of pixels read: %d" % (total,))
58 def run(self, cache, dataId, butler=None):
59 """Read image and return number of pixels 61 Only the slave nodes run this method. 63 assert butler
is not None 65 raw = butler.get(
"raw", dataId, immediate=
True)
66 dims = raw.getDimensions()
67 num = dims.getX()*dims.getY()
68 self.
log.
info(
"Read %d pixels for %s" % (num, dataId,))
71 def _getConfigName(self):
74 def _getMetadataName(self):
77 def _getEupsVersionsName(self):
def batchWallTime(cls, time, parsedCmd, numCores)
def runDataRef(self, visitRef)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def run(self, cache, dataId, butler=None)
daf::base::PropertyList * list