LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Static Public Attributes | List of all members
lsst.pipe.drivers.ingestDriver.PoolIngestTask Class Reference
Inheritance diagram for lsst.pipe.drivers.ingestDriver.PoolIngestTask:
lsst.ctrl.pool.parallel.BatchCmdLineTask lsst.pipe.tasks.ingest.IngestTask

Public Member Functions

def batchWallTime (cls, time, parsedCmd, numCores)
 Return walltime request for batch job. More...
 
def parseAndRun (cls, *args, **kwargs)
 
def runFileWrapper (self, struct, args)
 
def run (self, args)
 
def writeConfig (self, *args, **kwargs)
 
def writeMetadata (self, *args, **kwargs)
 
def parseAndSubmit (cls, args=None, **kwargs)
 
def batchCommand (cls, args)
 Return command to run CmdLineTask. More...
 
def logOperation (self, operation, catch=False, trace=True)
 Provide a context manager for logging an operation. More...
 
def parseAndRun (cls)
 
def prepareTask (cls, root=None, dryrun=False, mode="move", create=False, ignoreIngested=False)
 
def ingest (self, infile, outfile, mode="move", dryrun=False)
 
def isBadFile (self, filename, badFileList)
 
def isBadId (self, info, badIdList)
 
def expandFiles (self, fileNameList)
 Expand a set of filenames and globs, returning a list of filenames. More...
 
def runFile (self, infile, registry, args, pos)
 Examine and ingest a single file. More...
 
def ingestFiles (self, fileList)
 

Static Public Attributes

 ConfigClass = IngestConfig
 
 ArgumentParser = IngestArgumentParser
 

Detailed Description

Parallel version of IngestTask

Definition at line 7 of file ingestDriver.py.

Member Function Documentation

◆ batchCommand()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchCommand (   cls,
  args 
)
inherited

Return command to run CmdLineTask.

    @param cls: Class
    @param args: Parsed batch job arguments (from BatchArgumentParser)

Definition at line 476 of file parallel.py.

476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
def shCommandFromArgs(args)
Definition: parallel.py:42

◆ batchWallTime()

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.batchWallTime (   cls,
  time,
  parsedCmd,
  numCores 
)

Return walltime request for batch job.

    Subclasses should override if the walltime should be calculated
    differently (e.g., addition of some serial time).

    @param cls: Class
    @param time: Requested time per iteration
    @param parsedCmd: Results of argument parsing
    @param numCores: Number of cores

Reimplemented from lsst.ctrl.pool.parallel.BatchCmdLineTask.

Definition at line 10 of file ingestDriver.py.

10  def batchWallTime(cls, time, parsedCmd, numCores):
11  return float(time)*len(parsedCmd.files)/numCores
12 

◆ expandFiles()

def lsst.pipe.tasks.ingest.IngestTask.expandFiles (   self,
  fileNameList 
)
inherited

Expand a set of filenames and globs, returning a list of filenames.

    @param fileNameList A list of files and glob patterns

    N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged

Definition at line 557 of file ingest.py.

557  def expandFiles(self, fileNameList):
558  """!Expand a set of filenames and globs, returning a list of filenames
559 
560  @param fileNameList A list of files and glob patterns
561 
562  N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
563  """
564  filenameList = []
565  for globPattern in fileNameList:
566  files = glob(globPattern)
567 
568  if not files: # posix behaviour is to return pattern unchanged
569  self.log.warning("%s doesn't match any file", globPattern)
570  continue
571 
572  filenameList.extend(files)
573 
574  return filenameList
575 

◆ ingest()

def lsst.pipe.tasks.ingest.IngestTask.ingest (   self,
  infile,
  outfile,
  mode = "move",
  dryrun = False 
)
inherited
Ingest a file into the image repository.

@param infile  Name of input file
@param outfile Name of output file (file in repository)
@param mode    Mode of ingest (copy/link/move/skip)
@param dryrun  Only report what would occur?
@param Success boolean

Definition at line 478 of file ingest.py.

478  def ingest(self, infile, outfile, mode="move", dryrun=False):
479  """Ingest a file into the image repository.
480 
481  @param infile Name of input file
482  @param outfile Name of output file (file in repository)
483  @param mode Mode of ingest (copy/link/move/skip)
484  @param dryrun Only report what would occur?
485  @param Success boolean
486  """
487  if mode == "skip":
488  return True
489  if dryrun:
490  self.log.info("Would %s from %s to %s", mode, infile, outfile)
491  return True
492  try:
493  outdir = os.path.dirname(outfile)
494  if not os.path.isdir(outdir):
495  try:
496  os.makedirs(outdir)
497  except OSError as exc:
498  # Silently ignore mkdir failures due to race conditions
499  if not os.path.isdir(outdir):
500  raise RuntimeError(f"Failed to create directory {outdir}") from exc
501  if os.path.lexists(outfile):
502  if self.config.clobber:
503  os.unlink(outfile)
504  else:
505  raise RuntimeError("File %s already exists; consider --config clobber=True" % outfile)
506 
507  if mode == "copy":
508  assertCanCopy(infile, outfile)
509  shutil.copyfile(infile, outfile)
510  elif mode == "link":
511  if os.path.exists(outfile):
512  if os.path.samefile(infile, outfile):
513  self.log.debug("Already linked %s to %s: ignoring", infile, outfile)
514  else:
515  self.log.warning("%s already has a file at the target location (%s): ignoring "
516  "(set clobber=True to overwrite)", infile, outfile)
517  return False
518  os.symlink(os.path.abspath(infile), outfile)
519  elif mode == "move":
520  assertCanCopy(infile, outfile)
521  shutil.move(infile, outfile)
522  else:
523  raise AssertionError("Unknown mode: %s" % mode)
524  self.log.info("%s --<%s>--> %s", infile, mode, outfile)
525  except Exception as e:
526  self.log.warning("Failed to %s %s to %s: %s", mode, infile, outfile, e)
527  if not self.config.allowError:
528  raise RuntimeError(f"Failed to {mode} {infile} to {outfile}") from e
529  return False
530  return True
531 
def assertCanCopy(fromPath, toPath)
Definition: ingest.py:650

◆ ingestFiles()

def lsst.pipe.tasks.ingest.IngestTask.ingestFiles (   self,
  fileList 
)
inherited
Ingest specified file or list of files and add them to the registry.

This method can only be called if `prepareTask` was used.

Parameters
----------
fileList : `str` or `list` [`str`]
    Pathname or list of pathnames of files to ingest.

Definition at line 632 of file ingest.py.

632  def ingestFiles(self, fileList):
633  """Ingest specified file or list of files and add them to the registry.
634 
635  This method can only be called if `prepareTask` was used.
636 
637  Parameters
638  ----------
639  fileList : `str` or `list` [`str`]
640  Pathname or list of pathnames of files to ingest.
641  """
642  if not hasattr(self, "_args"):
643  raise RuntimeError("Task not created with prepareTask")
644  if isinstance(fileList, str):
645  fileList = [fileList]
646  self._args.files = fileList
647  self.run(self._args)
648 
649 

◆ isBadFile()

def lsst.pipe.tasks.ingest.IngestTask.isBadFile (   self,
  filename,
  badFileList 
)
inherited
Return whether the file qualifies as bad

We match against the list of bad file patterns.

Definition at line 532 of file ingest.py.

532  def isBadFile(self, filename, badFileList):
533  """Return whether the file qualifies as bad
534 
535  We match against the list of bad file patterns.
536  """
537  filename = os.path.basename(filename)
538  if not badFileList:
539  return False
540  for badFile in badFileList:
541  if fnmatch(filename, badFile):
542  return True
543  return False
544 

◆ isBadId()

def lsst.pipe.tasks.ingest.IngestTask.isBadId (   self,
  info,
  badIdList 
)
inherited
Return whether the file information qualifies as bad

We match against the list of bad data identifiers.

Definition at line 545 of file ingest.py.

545  def isBadId(self, info, badIdList):
546  """Return whether the file information qualifies as bad
547 
548  We match against the list of bad data identifiers.
549  """
550  if not badIdList:
551  return False
552  for badId in badIdList:
553  if all(info[key] == value for key, value in badId.items()):
554  return True
555  return False
556 
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.

◆ logOperation()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.logOperation (   self,
  operation,
  catch = False,
  trace = True 
)
inherited

Provide a context manager for logging an operation.

    @param operation: description of operation (string)
    @param catch: Catch all exceptions?
    @param trace: Log a traceback of caught exception?

    Note that if 'catch' is True, all exceptions are swallowed, but there may
    be other side-effects such as undefined variables.

Definition at line 502 of file parallel.py.

502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 

◆ parseAndRun() [1/2]

def lsst.pipe.tasks.ingest.IngestTask.parseAndRun (   cls)
inherited
Parse the command-line arguments and run the Task.

Definition at line 428 of file ingest.py.

428  def parseAndRun(cls):
429  """Parse the command-line arguments and run the Task."""
430  task, args = cls._parse()
431  task.run(args)
432 

◆ parseAndRun() [2/2]

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.parseAndRun (   cls,
args,
**  kwargs 
)
Run with a MPI process pool

Definition at line 24 of file ingestDriver.py.

24  def parseAndRun(cls, *args, **kwargs):
25  """Run with a MPI process pool"""
26  pool = startPool()
27  config = cls.ConfigClass()
28  parser = cls.ArgumentParser(name=cls._DefaultName)
29  args = parser.parse_args(config)
30  task = cls(config=args.config)
31  task.run(args)
32  pool.exit()
33 
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216

◆ parseAndSubmit()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.parseAndSubmit (   cls,
  args = None,
**  kwargs 
)
inherited

Definition at line 435 of file parallel.py.

435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
def setBatchType(batchType)
Definition: pool.py:101

◆ prepareTask()

def lsst.pipe.tasks.ingest.IngestTask.prepareTask (   cls,
  root = None,
  dryrun = False,
  mode = "move",
  create = False,
  ignoreIngested = False 
)
inherited
Prepare for running the task repeatedly with `ingestFiles`.

Saves the parsed arguments, including the Butler and log, as a
private instance variable.

Parameters
----------
root : `str`, optional
    Repository root pathname.  If None, run the Task using the
    command line arguments, ignoring all other arguments below.
dryrun : `bool`, optional
    If True, don't perform any action; log what would have happened.
mode : `str`, optional
    How files are delivered to their destination.  Default is "move",
    unlike the command-line default of "link".
create : `bool`, optional
    If True, create a new registry, clobbering any old one present.
ignoreIngested : `bool`, optional
    If True, do not complain if the file is already present in the
    registry (and do nothing else).

Returns
-------
task : `IngestTask`
    If `root` was provided, the IngestTask instance

Definition at line 434 of file ingest.py.

435  ignoreIngested=False):
436  """Prepare for running the task repeatedly with `ingestFiles`.
437 
438  Saves the parsed arguments, including the Butler and log, as a
439  private instance variable.
440 
441  Parameters
442  ----------
443  root : `str`, optional
444  Repository root pathname. If None, run the Task using the
445  command line arguments, ignoring all other arguments below.
446  dryrun : `bool`, optional
447  If True, don't perform any action; log what would have happened.
448  mode : `str`, optional
449  How files are delivered to their destination. Default is "move",
450  unlike the command-line default of "link".
451  create : `bool`, optional
452  If True, create a new registry, clobbering any old one present.
453  ignoreIngested : `bool`, optional
454  If True, do not complain if the file is already present in the
455  registry (and do nothing else).
456 
457  Returns
458  -------
459  task : `IngestTask`
460  If `root` was provided, the IngestTask instance
461  """
462  sys.argv = ["IngestTask"]
463  sys.argv.append(root)
464  if dryrun:
465  sys.argv.append("--dry-run")
466  sys.argv.append("--mode")
467  sys.argv.append(mode)
468  if create:
469  sys.argv.append("--create")
470  if ignoreIngested:
471  sys.argv.append("--ignore-ingested")
472  sys.argv.append("__fakefile__") # needed for parsing, not used
473 
474  task, args = cls._parse()
475  task._args = args
476  return task
477 

◆ run()

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.run (   self,
  args 
)
Run ingest

We read and ingest the files in parallel, and then
stuff the registry database in serial.

Reimplemented from lsst.pipe.tasks.ingest.IngestTask.

Definition at line 60 of file ingestDriver.py.

60  def run(self, args):
61  """Run ingest
62 
63  We read and ingest the files in parallel, and then
64  stuff the registry database in serial.
65  """
66  # Parallel
67  pool = Pool(None)
68  filenameList = self.expandFiles(args.files)
69  dataList = [Struct(filename=filename, position=ii) for ii, filename in enumerate(filenameList)]
70  infoList = pool.map(self.runFileWrapper, dataList, args)
71 
72  # Serial
73  root = args.input
74  context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
75  with context as registry:
76  for hduInfoList in infoList:
77  if hduInfoList is None:
78  continue
79  for info in hduInfoList:
80  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
81 
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603

◆ runFile()

def lsst.pipe.tasks.ingest.IngestTask.runFile (   self,
  infile,
  registry,
  args,
  pos 
)
inherited

Examine and ingest a single file.

    @param infile: File to process
    @param registry: Registry into which to insert Butler metadata, or None
    @param args: Parsed command-line arguments
    @param pos: Position number of this file in the input list
    @return parsed information from FITS HDUs if registry is None; or None

Definition at line 576 of file ingest.py.

576  def runFile(self, infile, registry, args, pos):
577  """!Examine and ingest a single file
578 
579  @param infile: File to process
580  @param registry: Registry into which to insert Butler metadata, or None
581  @param args: Parsed command-line arguments
582  @param pos: Position number of this file in the input list
583  @return parsed information from FITS HDUs if registry is None; or None
584  """
585  if self.isBadFile(infile, args.badFile):
586  self.log.info("Skipping declared bad file %s", infile)
587  return None
588  try:
589  fileInfo, hduInfoList = self.parse.getInfo(infile)
590  except Exception as e:
591  if not self.config.allowError:
592  raise RuntimeError(f"Error parsing {infile}") from e
593  self.log.warning("Error parsing %s (%s); skipping", infile, e)
594  return None
595  if self.isBadId(fileInfo, args.badId.idList):
596  self.log.info("Skipping declared bad file %s: %s", infile, fileInfo)
597  return None
598  if registry is not None and self.register.check(registry, fileInfo):
599  if args.ignoreIngested:
600  return None
601  self.log.warning("%s: already ingested: %s", infile, fileInfo)
602  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
603  if not self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
604  return None
605  if hduInfoList is None:
606  return None
607  if registry is None:
608  return hduInfoList
609  for info in hduInfoList:
610  try:
611  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
612  except Exception as exc:
613  raise IngestError(f"Failed to register file {infile}", infile, pos) from exc
614  return None # No further registration should be performed
615 

◆ runFileWrapper()

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.runFileWrapper (   self,
  struct,
  args 
)
Run ingest on one file

This is a wrapper method for calling ``runFile``.

Parameters
----------
struct : `lsst.pipe.base.Struct`
    Structure containing ``filename`` (`str`) and ``position`` (`int`).
args : `argparse.Namespace`
    Parsed command-line arguments.

Returns
-------
hduInfoList : `list` of `dict`
    Parsed information from FITS HDUs, or ``None``.

Definition at line 34 of file ingestDriver.py.

34  def runFileWrapper(self, struct, args):
35  """Run ingest on one file
36 
37  This is a wrapper method for calling ``runFile``.
38 
39  Parameters
40  ----------
41  struct : `lsst.pipe.base.Struct`
42  Structure containing ``filename`` (`str`) and ``position`` (`int`).
43  args : `argparse.Namespace`
44  Parsed command-line arguments.
45 
46  Returns
47  -------
48  hduInfoList : `list` of `dict`
49  Parsed information from FITS HDUs, or ``None``.
50  """
51  filename = struct.filename
52  position = struct.position
53  try:
54  return self.runFile(filename, None, args, position)
55  except IngestError as exc:
56  self.log.warn(f"Unable to ingest {filename}: {exc}")
57  return None
58 

◆ writeConfig()

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.writeConfig (   self,
args,
**  kwargs 
)

Definition at line 82 of file ingestDriver.py.

82  def writeConfig(self, *args, **kwargs):
83  pass
84 

◆ writeMetadata()

def lsst.pipe.drivers.ingestDriver.PoolIngestTask.writeMetadata (   self,
args,
**  kwargs 
)

Definition at line 85 of file ingestDriver.py.

85  def writeMetadata(self, *args, **kwargs):
86  pass
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.

Member Data Documentation

◆ ArgumentParser

lsst.pipe.tasks.ingest.IngestTask.ArgumentParser = IngestArgumentParser
staticinherited

Definition at line 409 of file ingest.py.

◆ ConfigClass

lsst.pipe.tasks.ingest.IngestTask.ConfigClass = IngestConfig
staticinherited

Definition at line 408 of file ingest.py.


The documentation for this class was generated from the following file: