LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
ingestDriver.py
Go to the documentation of this file.
1from lsst.ctrl.pool.pool import Pool, startPool, abortOnError
2from lsst.ctrl.pool.parallel import BatchCmdLineTask
3from lsst.pipe.base import Struct
4from lsst.pipe.tasks.ingest import IngestTask, IngestError
5
6
8 """Parallel version of IngestTask"""
9 @classmethod
10 def batchWallTime(cls, time, parsedCmd, numCores):
11 return float(time)*len(parsedCmd.files)/numCores
12
13 @classmethod
14 def _makeArgumentParser(cls, *args, **kwargs):
15 """Build an ArgumentParser
16
17 Removes the batch-specific parts.
18 """
19 kwargs.pop("doBatch", False)
20 kwargs.pop("add_help", False)
21 return cls.ArgumentParser(*args, name="ingest", **kwargs)
22
23 @classmethod
24 def parseAndRun(cls, *args, **kwargs):
25 """Run with a MPI process pool"""
26 pool = startPool()
27 config = cls.ConfigClass()
28 parser = cls.ArgumentParser(name=cls._DefaultName)
29 args = parser.parse_args(config)
30 task = cls(config=args.config)
31 task.run(args)
32 pool.exit()
33
34 def runFileWrapper(self, struct, args):
35 """Run ingest on one file
36
37 This is a wrapper method for calling ``runFile``.
38
39 Parameters
40 ----------
41 struct : `lsst.pipe.base.Struct`
42 Structure containing ``filename`` (`str`) and ``position`` (`int`).
43 args : `argparse.Namespace`
44 Parsed command-line arguments.
45
46 Returns
47 -------
48 hduInfoList : `list` of `dict`
49 Parsed information from FITS HDUs, or ``None``.
50 """
51 filename = struct.filename
52 position = struct.position
53 try:
54 return self.runFile(filename, None, args, position)
55 except IngestError as exc:
56 self.log.warn(f"Unable to ingest {filename}: {exc}")
57 return None
58
59 @abortOnError
60 def run(self, args):
61 """Run ingest
62
63 We read and ingest the files in parallel, and then
64 stuff the registry database in serial.
65 """
66 # Parallel
67 pool = Pool(None)
68 filenameList = self.expandFiles(args.files)
69 dataList = [Struct(filename=filename, position=ii) for ii, filename in enumerate(filenameList)]
70 infoList = pool.map(self.runFileWrapper, dataList, args)
71
72 # Serial
73 root = args.input
74 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
75 with context as registry:
76 for hduInfoList in infoList:
77 if hduInfoList is None:
78 continue
79 for info in hduInfoList:
80 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
81
82 def writeConfig(self, *args, **kwargs):
83 pass
84
85 def writeMetadata(self, *args, **kwargs):
86 pass
def writeConfig(self, *args, **kwargs)
Definition: ingestDriver.py:82
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: ingestDriver.py:10
def writeMetadata(self, *args, **kwargs)
Definition: ingestDriver.py:85