LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Public Member Functions | Private Member Functions | Private Attributes | List of all members
lsst::ap::io::CompressedFileWriter Class Reference

A sequential writer for compressed files that uses asynchronous IO to overlap IO with compression. More...

#include <FileIo.h>

Inheritance diagram for lsst::ap::io::CompressedFileWriter:
lsst::ap::io::SequentialWriter lsst::ap::io::SequentialIoBase

Public Member Functions

 CompressedFileWriter (std::string const &fileName, bool const overwrite=false, std::size_t const blockSize=262144)
 
virtual ~CompressedFileWriter ()
 
virtual void write (unsigned char const *const buf, std::size_t const size)
 Writes len bytes from buf to the underlying storage device. More...
 
virtual void finish ()
 Moves modified data to the underlying storage device and marks the SequentialWriter as finished. More...
 
std::size_t getBlockSize () const
 
- Public Member Functions inherited from lsst::ap::io::SequentialIoBase
 SequentialIoBase ()
 
virtual ~SequentialIoBase ()=0
 
bool finished () const
 Returns true if there are no more bytes available for reading. More...
 
bool failed () const
 Returns true if a read operation failed. More...
 
State getState () const
 Returns the state of the SequentialReader. More...
 

Private Member Functions

void cleanup ()
 
void cleanup (State const state)
 

Private Attributes

boost::scoped_array< unsigned
char > 
_memory
 
unsigned char * _buffers
 aligned output buffers More...
 
::z_stream _stream
 zlib state More...
 
::aiocb _request
 Outstanding IO request. More...
 
std::size_t const _blockSize
 read granularity More...
 
int _fd
 
bool _started
 

Additional Inherited Members

- Public Types inherited from lsst::ap::io::SequentialIoBase
enum  State { IN_PROGRESS = 0, FINISHED, FAILED }
 
- Protected Attributes inherited from lsst::ap::io::SequentialIoBase
State _state
 

Detailed Description

A sequential writer for compressed files that uses asynchronous IO to overlap IO with compression.

Gzip compatible files are written by this class.

Definition at line 204 of file FileIo.h.

Constructor & Destructor Documentation

lsst::ap::io::CompressedFileWriter::CompressedFileWriter ( std::string const &  fileName,
bool const  overwrite = false,
std::size_t const  blockSize = 262144 
)
explicit

Definition at line 450 of file FileIo.cc.

454  :
455  _memory(0),
456  _buffers(0),
457  _blockSize(blockSize),
458  _fd(-1),
459  _started(false)
460 {
461  if (blockSize < 1024 || blockSize > 16777216) {
462  throw LSST_EXCEPT(ex::InvalidParameterError,
463  "I/O block size must be a power of 2 between 2^10 and 2^24 bytes");
464  }
465 
466  std::memset(&_stream, 0, sizeof(::z_stream));
467  std::memset(&_request, 0, sizeof(::aiocb));
468  _request.aio_fildes = -1;
469 
470  // allocate IO buffers, store pointer to 8k aligned location inside buffer
471  boost::scoped_array<unsigned char> mem(new unsigned char[2*blockSize + 8192]);
472  std::size_t buf = reinterpret_cast<std::size_t>(mem.get());
473  _buffers = reinterpret_cast<unsigned char *>((buf + 8191) & ~static_cast<std::size_t>(8191));
474 
475  // open file
476  int const fd = openFile(
477  fileName,
478  O_WRONLY | O_CREAT | O_APPEND | (overwrite ? O_TRUNC : O_EXCL),
479  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH
480  );
481  assert(fd != -1);
482  ScopeGuard fdGuard(boost::bind(::close, fd));
483 
484  // setup zlib (15 window bits, add 16 to indicate a gzip compatible header is desired)
485  if (deflateInit2(&_stream, 1, Z_DEFLATED, MAX_WBITS + 16, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
486  throw LSST_EXCEPT(ex::RuntimeError,
487  "[zlib] deflateInit2() failed to initialize compression stream");
488  }
489 
490  _request.aio_fildes = fd;
491  _request.aio_buf = _buffers;
492  _request.aio_nbytes = blockSize;
493  _fd = fd;
494  using std::swap;
495  swap(_memory, mem);
496  fdGuard.dismiss();
497 }
::z_stream _stream
zlib state
Definition: FileIo.h:225
void swap(Ellipse< DataT > &a, Ellipse< DataT > &b)
Definition: EllipseTypes.h:90
boost::scoped_array< unsigned char > _memory
Definition: FileIo.h:223
void swap(ImageBase< PixelT > &a, ImageBase< PixelT > &b)
Definition: Image.cc:287
::aiocb _request
Outstanding IO request.
Definition: FileIo.h:226
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
std::size_t const _blockSize
read granularity
Definition: FileIo.h:227
unsigned char * _buffers
aligned output buffers
Definition: FileIo.h:224
lsst::ap::io::CompressedFileWriter::~CompressedFileWriter ( )
virtual

Definition at line 500 of file FileIo.cc.

Member Function Documentation

void lsst::ap::io::CompressedFileWriter::cleanup ( )
private

Definition at line 503 of file FileIo.cc.

503  {
504  if (_fd != -1) {
505  if (_started) {
506  ::aio_cancel(_fd, 0);
507  }
508  ::close(_fd);
509  ::deflateEnd(&_stream);
510  _fd = -1;
511  }
512 }
::z_stream _stream
zlib state
Definition: FileIo.h:225
void lsst::ap::io::CompressedFileWriter::cleanup ( State const  state)
inlineprivate

Definition at line 232 of file FileIo.h.

232  {
233  cleanup();
234  _state = state;
235  }
void lsst::ap::io::CompressedFileWriter::finish ( )
virtual

Moves modified data to the underlying storage device and marks the SequentialWriter as finished.

Implements lsst::ap::io::SequentialWriter.

Definition at line 606 of file FileIo.cc.

606  {
607 
608  if (_state != IN_PROGRESS) {
609  throw LSST_EXCEPT(ex::IoError,
610  "finish() called on a finished or failed CompressedFileWriter");
611  }
612 
613  if (_stream.avail_out == 0) {
614  unsigned char * buf = static_cast<unsigned char *>(const_cast<void *>(_request.aio_buf));
615  _stream.next_out = (buf == _buffers) ? buf + _blockSize : _buffers;
616  _stream.avail_out = _blockSize;
617  }
618 
619  while (true) {
620 
621  int const zret = ::deflate(&_stream, Z_FINISH);
622  if (zret != Z_STREAM_END && zret != Z_OK) {
623  cleanup(FAILED);
624  throw LSST_EXCEPT(ex::RuntimeError,
625  (boost::format("[zlib] deflate() failed, return code: %1%") % zret).str());
626  }
627  if (_started) {
628  while (::aio_error(&_request) == EINPROGRESS) {
629  TimeSpec timeout;
630  ::aiocb * const list = &_request;
631  timeout.tv_sec = 10; // wait up to 10 seconds
632  if (::aio_suspend(&list, 1, &timeout) != 0) {
633  if (errno != EINTR) {
634  if (errno == EAGAIN) {
635  cleanup(FAILED);
636  throw LSST_EXCEPT(ex::TimeoutError, "aio_write() timed out");
637  } else {
638  cleanup(FAILED);
639  throw LSST_EXCEPT(ex::IoError,
640  (boost::format("aio_suspend() failed, errno: %1%") % errno).str());
641  }
642  }
643  errno = 0;
644  }
645  }
646  // check status of the write that just completed
647  if (::aio_return(&_request) != static_cast<int>(_blockSize)) {
648  cleanup(FAILED);
649  throw LSST_EXCEPT(ex::IoError,
650  (boost::format("aio_write() failed, errno: %1%") %
651  ::aio_error(&_request)).str());
652  }
653  }
654  if (zret == Z_STREAM_END) {
655  break;
656  }
657 
658  unsigned char * buf = static_cast<unsigned char *>(const_cast<void *>(_request.aio_buf));
659  _stream.next_out = buf;
660  _stream.avail_out = _blockSize;
661  // issue asynchronous write of the deflated data
662  _request.aio_buf = (buf == _buffers) ? buf + _blockSize : _buffers;
663  if (::aio_write(&_request) != 0) {
664  cleanup(FAILED);
665  throw LSST_EXCEPT(ex::IoError,
666  (boost::format("aio_write() failed to enqueue IO request") % errno).str());
667  }
668  _started = true;
669 
670  }
671 
672  // no outstanding IO remains, perform final (blocking) write
673  std::size_t bytesRemaining = _blockSize - _stream.avail_out;
674  unsigned char * buf = _stream.next_out - bytesRemaining;
675  while (bytesRemaining > 0) {
676  ::ssize_t nb = ::write(_fd, buf, bytesRemaining);
677  if (nb < 0) {
678  cleanup(FAILED);
679  throw LSST_EXCEPT(ex::IoError,
680  (boost::format("write() failed, errno: %1%") % errno).str());
681  }
682  bytesRemaining -= static_cast<std::size_t>(nb);
683  buf += nb;
684  }
685  // flush both userland and kernel buffers
686  while (::fsync(_fd) != 0) {
687  if (errno != EINTR) {
688  cleanup(FAILED);
689  throw LSST_EXCEPT(ex::IoError,
690  (boost::format("fsync() failed, errno: %1%") % errno).str());
691  }
692  errno = 0;
693  }
694  cleanup(FINISHED);
695 }
::z_stream _stream
zlib state
Definition: FileIo.h:225
virtual void write(unsigned char const *const buf, std::size_t const size)
Writes len bytes from buf to the underlying storage device.
Definition: FileIo.cc:515
::aiocb _request
Outstanding IO request.
Definition: FileIo.h:226
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
std::size_t const _blockSize
read granularity
Definition: FileIo.h:227
unsigned char * _buffers
aligned output buffers
Definition: FileIo.h:224
std::size_t lsst::ap::io::CompressedFileWriter::getBlockSize ( ) const
inline

Definition at line 219 of file FileIo.h.

219 { return _blockSize; }
std::size_t const _blockSize
read granularity
Definition: FileIo.h:227
void lsst::ap::io::CompressedFileWriter::write ( unsigned char const *const  buf,
std::size_t const  len 
)
virtual

Writes len bytes from buf to the underlying storage device.

Implements lsst::ap::io::SequentialWriter.

Definition at line 515 of file FileIo.cc.

518  {
519  if (len == 0) {
520  return;
521  }
522  if (buf == 0) {
523  throw LSST_EXCEPT(ex::InvalidParameterError,
524  "null pointer to read destination");
525  }
526  if (_state != IN_PROGRESS) {
527  throw LSST_EXCEPT(ex::IoError,
528  "write() called on a finished or failed CompressedFileWriter");
529  }
530 
531  _stream.next_in = const_cast<Bytef *>(buf);
532  _stream.avail_in = len;
533  if (_stream.avail_out == 0) {
534  unsigned char * buf = static_cast<unsigned char *>(const_cast<void *>(_request.aio_buf));
535  _stream.next_out = (buf == _buffers) ? buf + _blockSize : _buffers;
536  _stream.avail_out = _blockSize;
537  }
538 
539  // deflate/write until the buffer passed in by the user has been consumed
540  do {
541 
542  int const zret = ::deflate(&_stream, Z_NO_FLUSH);
543  if (zret != Z_OK) {
544  cleanup(FAILED);
545  throw LSST_EXCEPT(ex::RuntimeError,
546  (boost::format("[zlib] deflate() failed, return code: %1%") % zret).str());
547  }
548  if (_stream.avail_out != 0) {
549  if (_stream.avail_in != 0) {
550  cleanup(FAILED);
551  throw LSST_EXCEPT(ex::RuntimeError,
552  "[zlib] deflate() failed to consume input");
553  }
554  break;
555  }
556 
557  if (_stream.avail_out == 0) {
558  if (_started) {
559  // wait for pending write to finish
560  while (::aio_error(&_request) == EINPROGRESS) {
561  TimeSpec timeout;
562  ::aiocb * const list = &_request;
563  timeout.tv_sec = 10; // wait up to 10 seconds
564  if (::aio_suspend(&list, 1, &timeout) != 0) {
565  if (errno != EINTR) {
566  if (errno == EAGAIN) {
567  cleanup(FAILED);
568  throw LSST_EXCEPT(ex::TimeoutError, "aio_write() timed out");
569  } else {
570  cleanup(FAILED);
571  throw LSST_EXCEPT(ex::IoError,
572  (boost::format("aio_suspend() failed, errno: %1%") % errno).str());
573  }
574  }
575  errno = 0;
576  }
577  }
578  // check status of the write that just completed
579  if (::aio_return(&_request) != static_cast<int>(_blockSize)) {
580  cleanup(FAILED);
581  throw LSST_EXCEPT(ex::IoError,
582  (boost::format("aio_write() failed, errno: %1%") %
583  ::aio_error(&_request)).str());
584  }
585  }
586  unsigned char * buf = static_cast<unsigned char *>(const_cast<void *>(_request.aio_buf));
587  _request.aio_buf = _stream.next_out - _blockSize;
588  _stream.next_out = buf;
589  _stream.avail_out = _blockSize;
590  // issue asynchronous write
591  if (aio_write(&_request) != 0) {
592  cleanup(FAILED);
593  throw LSST_EXCEPT(ex::IoError,
594  (boost::format("aio_write() failed to enqueue IO request") % errno).str());
595  }
596  _started = true;
597  } else if (_stream.avail_in != 0) {
598  throw LSST_EXCEPT(ex::RuntimeError,
599  "[zlib] deflate() failed to consume input");
600  }
601 
602  } while(_stream.avail_in != 0);
603 }
::z_stream _stream
zlib state
Definition: FileIo.h:225
::aiocb _request
Outstanding IO request.
Definition: FileIo.h:226
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
std::size_t const _blockSize
read granularity
Definition: FileIo.h:227
unsigned char * _buffers
aligned output buffers
Definition: FileIo.h:224

Member Data Documentation

std::size_t const lsst::ap::io::CompressedFileWriter::_blockSize
private

read granularity

Definition at line 227 of file FileIo.h.

unsigned char* lsst::ap::io::CompressedFileWriter::_buffers
private

aligned output buffers

Definition at line 224 of file FileIo.h.

int lsst::ap::io::CompressedFileWriter::_fd
private

Definition at line 228 of file FileIo.h.

boost::scoped_array<unsigned char> lsst::ap::io::CompressedFileWriter::_memory
private

Definition at line 223 of file FileIo.h.

::aiocb lsst::ap::io::CompressedFileWriter::_request
private

Outstanding IO request.

Definition at line 226 of file FileIo.h.

bool lsst::ap::io::CompressedFileWriter::_started
private

Definition at line 229 of file FileIo.h.

::z_stream lsst::ap::io::CompressedFileWriter::_stream
private

zlib state

Definition at line 225 of file FileIo.h.


The documentation for this class was generated from the following files: