LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Public Member Functions | Private Member Functions | Private Attributes | List of all members
lsst::ap::io::CompressedFileReader Class Reference

A sequential reader for compressed files that uses asynchronous IO to overlap IO with decompression. More...

#include <FileIo.h>

Inheritance diagram for lsst::ap::io::CompressedFileReader:
lsst::ap::io::SequentialReader lsst::ap::io::SequentialIoBase

Public Member Functions

 CompressedFileReader (std::string const &fileName, std::size_t const blockSize=262144)
 
virtual ~CompressedFileReader ()
 
virtual std::size_t read (unsigned char *const buf, std::size_t const size)
 
std::size_t getBlockSize () const
 
- Public Member Functions inherited from lsst::ap::io::SequentialIoBase
 SequentialIoBase ()
 
virtual ~SequentialIoBase ()=0
 
bool finished () const
 Returns true if there are no more bytes available for reading. More...
 
bool failed () const
 Returns true if a read operation failed. More...
 
State getState () const
 Returns the state of the SequentialReader. More...
 

Private Member Functions

void cleanup ()
 
void cleanup (State const state)
 

Private Attributes

boost::scoped_array< unsigned
char > 
_memory
 
unsigned char * _buffers
 aligned input buffers More...
 
::z_stream _stream
 zlib state More...
 
::aiocb _request
 Outstanding IO request. More...
 
std::size_t const _blockSize
 read granularity More...
 
std::size_t _fileSize
 Size of the file being read. More...
 
std::size_t _remaining
 Bytes that haven't yet been read. More...
 
int _fd
 file descriptor More...
 

Additional Inherited Members

- Public Types inherited from lsst::ap::io::SequentialIoBase
enum  State { IN_PROGRESS = 0, FINISHED, FAILED }
 
- Protected Attributes inherited from lsst::ap::io::SequentialIoBase
State _state
 

Detailed Description

A sequential reader for compressed files that uses asynchronous IO to overlap IO with decompression.

Gzip compatible files, or files written with zlib compression can be read by this class.

Definition at line 165 of file FileIo.h.

Constructor & Destructor Documentation

lsst::ap::io::CompressedFileReader::CompressedFileReader ( std::string const &  fileName,
std::size_t const  blockSize = 262144 
)
explicit

Definition at line 225 of file FileIo.cc.

228  :
229  _memory(0),
230  _buffers(0),
231  _blockSize(blockSize),
232  _fileSize(0),
233  _remaining(0),
234  _fd(-1)
235 {
236  if (blockSize < 1024 || blockSize > 16777216 || (blockSize & (blockSize - 1)) != 0) {
237  throw LSST_EXCEPT(ex::InvalidParameterError,
238  "I/O block size must be a power of 2 between 2^10 and 2^24 bytes");
239  }
240 
241  std::memset(&_stream, 0, sizeof(::z_stream));
242  std::memset(&_request, 0, sizeof(::aiocb));
243  _request.aio_fildes = -1;
244 
245  // allocate IO buffers, store pointer to 8k aligned location inside buffer
246  boost::scoped_array<unsigned char> mem(new unsigned char[2*blockSize + 8192]);
247  std::size_t buf = reinterpret_cast<std::size_t>(mem.get());
248  _buffers = reinterpret_cast<unsigned char *>((buf + 8191) & ~static_cast<std::size_t>(8191));
249 
250  // open file
251  int const fd = openFile(fileName, O_RDONLY);
252  if (fd == -1) {
253  _state = FINISHED;
254  return;
255  }
256  ScopeGuard fdGuard(boost::bind(::close, fd));
257 
258  // determine file size
259  struct stat sb;
260  if (::fstat(fd, &sb) != 0) {
261  throw LSST_EXCEPT(ex::IoError,
262  (boost::format("fstat() failed on file %1%, errno: %2%") % fileName % errno).str());
263  }
264  _fileSize = static_cast<std::size_t>(sb.st_size);
265  if (_fileSize == 0) {
266  _state = FINISHED;
267  return;
268  }
270 
271  // setup zlib (specify auto-detection of zlib/gzip header)
272  if (inflateInit2(&_stream, MAX_WBITS + 32) != Z_OK) {
273  throw LSST_EXCEPT(ex::RuntimeError,
274  "[zlib] inflateInit2() failed to initialize decompression stream");
275  }
276  ScopeGuard zlibGuard(boost::bind(::inflateEnd, &_stream));
277 
278  // issue first async read
279  std::size_t const nb = std::min(blockSize, _fileSize);
280  _request.aio_fildes = fd;
281  _request.aio_buf = _buffers;
282  _request.aio_nbytes = nb;
283  if (::aio_read(&_request) != 0) {
284  throw LSST_EXCEPT(ex::IoError,
285  (boost::format("aio_read() failed to enqueue IO request for file %1%, errno: %2%") %
286  fileName % errno).str());
287  }
288  _fd = fd;
289  using std::swap;
290  swap(_memory, mem);
291  zlibGuard.dismiss();
292  fdGuard.dismiss();
293 }
void swap(Ellipse< DataT > &a, Ellipse< DataT > &b)
Definition: EllipseTypes.h:90
void swap(ImageBase< PixelT > &a, ImageBase< PixelT > &b)
Definition: Image.cc:287
int _fd
file descriptor
Definition: FileIo.h:188
double min
Definition: attributes.cc:216
::aiocb _request
Outstanding IO request.
Definition: FileIo.h:184
std::size_t const _blockSize
read granularity
Definition: FileIo.h:185
boost::scoped_array< unsigned char > _memory
Definition: FileIo.h:181
std::size_t _fileSize
Size of the file being read.
Definition: FileIo.h:186
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
unsigned char * _buffers
aligned input buffers
Definition: FileIo.h:182
::z_stream _stream
zlib state
Definition: FileIo.h:183
std::size_t _remaining
Bytes that haven&#39;t yet been read.
Definition: FileIo.h:187
lsst::ap::io::CompressedFileReader::~CompressedFileReader ( )
virtual

Definition at line 296 of file FileIo.cc.

296  {
297  cleanup();
298 }

Member Function Documentation

void lsst::ap::io::CompressedFileReader::cleanup ( )
private

Definition at line 301 of file FileIo.cc.

301  {
302  if (_fd != -1) {
303  if (_state == IN_PROGRESS) {
304  ::aio_cancel(_fd, 0);
305  }
306  ::close(_fd);
307  ::inflateEnd(&_stream);
308  _fd = -1;
309  }
310 }
int _fd
file descriptor
Definition: FileIo.h:188
::z_stream _stream
zlib state
Definition: FileIo.h:183
void lsst::ap::io::CompressedFileReader::cleanup ( State const  state)
inlineprivate

Definition at line 191 of file FileIo.h.

191  {
192  cleanup();
193  _state = state;
194  }
std::size_t lsst::ap::io::CompressedFileReader::getBlockSize ( ) const
inline

Definition at line 177 of file FileIo.h.

177 { return _blockSize; }
std::size_t const _blockSize
read granularity
Definition: FileIo.h:185
std::size_t lsst::ap::io::CompressedFileReader::read ( unsigned char *const  buf,
std::size_t const  len 
)
virtual

Reads up to len bytes from an underlying storage device into buf and returns the number of bytes actually read.

Implements lsst::ap::io::SequentialReader.

Definition at line 313 of file FileIo.cc.

316  {
317  if (len == 0) {
318  return 0;
319  }
320  if (buf == 0) {
321  throw LSST_EXCEPT(ex::InvalidParameterError, "null pointer to read destination");
322  }
323  if (_state == FAILED) {
324  throw LSST_EXCEPT(ex::IoError, "read() called on a failed CompressedFileReader");
325  } else if (_state == FINISHED) {
326  return 0;
327  }
328 
329  // if the current block has not been fully transferred to the user,
330  // decompress as much as possible of what remains
331  _stream.next_out = buf;
332  _stream.avail_out = len;
333  if (_stream.avail_in > 0) {
334  int const zret = ::inflate(&_stream, Z_SYNC_FLUSH);
335  if (zret == Z_STREAM_END) {
336  if (_remaining != 0) {
337  cleanup(FAILED);
338  throw LSST_EXCEPT(ex::RuntimeError,
339  "[zlib] inflate(): unexpected end of stream");
340  }
341  if (_stream.avail_in != 0) {
342  cleanup(FAILED);
343  throw LSST_EXCEPT(ex::RuntimeError,
344  "[zlib] inflate() failed to consume input");
345  }
346  cleanup(FINISHED);
347  return len - _stream.avail_out;
348  } else if (zret == Z_OK) {
349  if (_stream.avail_out == 0) {
350  return len;
351  } else if (_remaining == 0) {
352  cleanup(FAILED);
353  throw LSST_EXCEPT(ex::RuntimeError,
354  "[zlib] inflate(): expected end of stream");
355  } else if (_stream.avail_in != 0) {
356  cleanup(FAILED);
357  throw LSST_EXCEPT(ex::RuntimeError,
358  "[zlib] inflate() failed to consume input");
359  }
360  // user has space for even more data
361  } else {
362  cleanup(FAILED);
363  throw LSST_EXCEPT(ex::RuntimeError,
364  (boost::format("[zlib] inflate() failed, return code: %1%") % zret).str());
365  }
366  }
367 
368  // Current block is consumed, so block on outstanding request
369  while (::aio_error(&_request) == EINPROGRESS) {
370  TimeSpec timeout;
371  ::aiocb * const list = &_request;
372  timeout.tv_sec = 10; // wait up to 10 seconds
373  if (::aio_suspend(&list, 1, &timeout) != 0) {
374  if (errno != EINTR) {
375  if (errno == EAGAIN) {
376  cleanup(FAILED);
377  throw LSST_EXCEPT(ex::TimeoutError, "aio_read() timed out");
378  } else {
379  cleanup(FAILED);
380  throw LSST_EXCEPT(ex::IoError,
381  (boost::format("aio_suspend() failed, errno: %1%") % errno).str());
382  }
383  }
384  errno = 0;
385  }
386  }
387 
388  // check status of the read that just completed
389  int const nb = ::aio_return(&_request);
390  if (nb < 0) {
391  cleanup(FAILED);
392  throw LSST_EXCEPT(ex::IoError,
393  (boost::format("aio_read() failed, errno: %1%") % ::aio_error(&_request)).str());
394  } else if (nb == 0) {
395  cleanup(FAILED);
396  throw LSST_EXCEPT(ex::IoError, "aio_read(): unexpected end of file reached");
397  }
398 
399  unsigned char * ready = static_cast<unsigned char *>(const_cast<void *>(_request.aio_buf));
400  _remaining -= static_cast<std::size_t>(nb);
401  if (_remaining > 0) {
402  // issue prefetch for the next block
403  _request.aio_buf = (ready == _buffers) ? ready + _blockSize : _buffers;
404  _request.aio_offset = _fileSize - _remaining;
405  _request.aio_nbytes = std::min(_blockSize, _remaining);
406  if (::aio_read(&_request) != 0) {
407  cleanup(FAILED);
408  throw LSST_EXCEPT(ex::IoError,
409  (boost::format("aio_read() failed to enqueue IO request, errno: %1%") % errno).str());
410  }
411  }
412 
413  // Decompress into user-supplied buffer
414  _stream.next_in = ready;
415  _stream.avail_in = nb;
416  int const zret = ::inflate(&_stream, Z_SYNC_FLUSH);
417  if (zret == Z_STREAM_END) {
418  if (_remaining != 0) {
419  cleanup(FAILED);
420  throw LSST_EXCEPT(ex::RuntimeError,
421  "[zlib] inflate(): unexpected end of stream");
422  }
423  if (_stream.avail_in != 0) {
424  cleanup(FAILED);
425  throw LSST_EXCEPT(ex::RuntimeError,
426  "[zlib] inflate() failed to consume input");
427  }
428  cleanup(FINISHED);
429  } else if (zret != Z_OK) {
430  cleanup(FAILED);
431  throw LSST_EXCEPT(ex::RuntimeError,
432  (boost::format("[zlib] inflate() failed, return code: %1%") % zret).str());
433  } else if (_stream.avail_out != 0) {
434  if (_remaining == 0) {
435  cleanup(FAILED);
436  throw LSST_EXCEPT(ex::RuntimeError,
437  "[zlib] inflate(): expected end of stream");
438  } else if (_stream.avail_in != 0) {
439  cleanup(FAILED);
440  throw LSST_EXCEPT(ex::RuntimeError,
441  "[zlib] inflate() failed to consume input");
442  }
443  }
444  return len - _stream.avail_out;
445 }
double min
Definition: attributes.cc:216
::aiocb _request
Outstanding IO request.
Definition: FileIo.h:184
std::size_t const _blockSize
read granularity
Definition: FileIo.h:185
std::size_t _fileSize
Size of the file being read.
Definition: FileIo.h:186
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
unsigned char * _buffers
aligned input buffers
Definition: FileIo.h:182
::z_stream _stream
zlib state
Definition: FileIo.h:183
std::size_t _remaining
Bytes that haven&#39;t yet been read.
Definition: FileIo.h:187

Member Data Documentation

std::size_t const lsst::ap::io::CompressedFileReader::_blockSize
private

read granularity

Definition at line 185 of file FileIo.h.

unsigned char* lsst::ap::io::CompressedFileReader::_buffers
private

aligned input buffers

Definition at line 182 of file FileIo.h.

int lsst::ap::io::CompressedFileReader::_fd
private

file descriptor

Definition at line 188 of file FileIo.h.

std::size_t lsst::ap::io::CompressedFileReader::_fileSize
private

Size of the file being read.

Definition at line 186 of file FileIo.h.

boost::scoped_array<unsigned char> lsst::ap::io::CompressedFileReader::_memory
private

Definition at line 181 of file FileIo.h.

std::size_t lsst::ap::io::CompressedFileReader::_remaining
private

Bytes that haven't yet been read.

Definition at line 187 of file FileIo.h.

::aiocb lsst::ap::io::CompressedFileReader::_request
private

Outstanding IO request.

Definition at line 184 of file FileIo.h.

::z_stream lsst::ap::io::CompressedFileReader::_stream
private

zlib state

Definition at line 183 of file FileIo.h.


The documentation for this class was generated from the following files: