LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
demoTask.py
Go to the documentation of this file.
1import math
2import collections
3import operator
4from lsst.ctrl.pool.parallel import BatchPoolTask
5from lsst.ctrl.pool.pool import Pool
6from lsst.pipe.base import ArgumentParser
7from lsst.pex.config import Config
8
9__all__ = ["DemoTask", ]
10
11
13 """Task for demonstrating the BatchPoolTask functionality"""
14 ConfigClass = Config
15 _DefaultName = "demo"
16
17 @classmethod
18 def _makeArgumentParser(cls, *args, **kwargs):
19 kwargs.pop('doBatch', False) # Unused
20 parser = ArgumentParser(name="demo", *args, **kwargs)
21 parser.add_id_argument("--id", datasetType="raw", level="visit",
22 help="data ID, e.g. --id visit=12345")
23 return parser
24
25 @classmethod
26 def batchWallTime(cls, time, parsedCmd, numCores):
27 """Return walltime request for batch job
28
29 Subclasses should override if the walltime should be calculated
30 differently (e.g., addition of some serial time).
31
32 @param time: Requested time per iteration
33 @param parsedCmd: Results of argument parsing
34 @param numCores: Number of cores
35 """
36 numTargets = [sum(1 for ccdRef in visitRef.subItems("ccd") if ccdRef.datasetExists("raw")) for
37 visitRef in parsedCmd.id.refList]
38 return time*sum(math.ceil(tt/numCores) for tt in numTargets)
39
40 def runDataRef(self, visitRef):
41 """Main entry-point
42
43 Only the master node runs this method. It will dispatch jobs to the
44 slave nodes.
45 """
46 pool = Pool("test")
47
48 # Less overhead to transfer the butler once rather than in each dataRef
49 dataIdList = dict([(ccdRef.get("ccdExposureId"), ccdRef.dataId)
50 for ccdRef in visitRef.subItems("ccd") if ccdRef.datasetExists("raw")])
51 dataIdList = collections.OrderedDict(sorted(dataIdList.items()))
52
53 with self.logOperation("master"):
54 total = pool.reduce(operator.add, self.run, list(dataIdList.values()),
55 butler=visitRef.getButler())
56 self.log.info("Total number of pixels read: %d" % (total,))
57
58 def run(self, cache, dataId, butler=None):
59 """Read image and return number of pixels
60
61 Only the slave nodes run this method.
62 """
63 assert butler is not None
64 with self.logOperation("read %s" % (dataId,)):
65 raw = butler.get("raw", dataId, immediate=True)
66 dims = raw.getDimensions()
67 num = dims.getX()*dims.getY()
68 self.log.info("Read %d pixels for %s" % (num, dataId,))
69 return num
70
71 def _getConfigName(self):
72 return None
73
74 def _getMetadataName(self):
75 return None
76
77 def _getEupsVersionsName(self):
78 return None
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:502
def batchWallTime(cls, time, parsedCmd, numCores)
Definition: demoTask.py:26
def runDataRef(self, visitRef)
Definition: demoTask.py:40
def run(self, cache, dataId, butler=None)
Definition: demoTask.py:58
daf::base::PropertyList * list
Definition: fits.cc:928