33 #include <sys/types.h>
35 #include <sys/fcntl.h>
40 #if ZLIB_VERNUM < 0x123
41 # warning Older version of zlib detected, upgrading to version 1.2.3 or later is recommended
47 #include "boost/bind.hpp"
48 #include "boost/format.hpp"
57 namespace ex = lsst::pex::exceptions;
59 namespace lsst {
namespace ap {
namespace io {
namespace {
62 std::string
const & fileName,
64 ::mode_t
const mode = 0
66 int fd = ::open(fileName.c_str(), oflag, mode);
68 if (errno != ENOENT || (oflag & O_WRONLY) != 0) {
70 (
boost::format(
"open() failed on file %1%, flags: %2%, errno: %3%") %
71 fileName % oflag % errno).str());
91 std::string
const & fileName
95 int const fd = openFile(fileName,
false, O_RDONLY);
116 unsigned char *
const buf,
117 std::size_t
const len
124 "null pointer to read destination");
126 if (_state == FAILED) {
128 "read() called on a failed SequentialFileReader");
129 }
else if (_state == FINISHED) {
133 ::ssize_t n = ::read(_fd, buf, len);
141 return static_cast<std::size_t
>(n);
148 std::string
const & fileName,
153 int const fd = openFile(
155 O_WRONLY | O_CREAT | O_APPEND | (overwrite ? O_TRUNC : O_EXCL),
156 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH
174 unsigned char const *
const buf,
175 std::size_t
const len
182 "null pointer to bytes to write");
184 if (_state != IN_PROGRESS) {
186 "write() called on a finished or failed SequentialFileWriter");
189 unsigned char const * dst = buf;
190 std::size_t nb = len;
192 ::ssize_t n = ::write(_fd, dst, nb);
196 (
boost::format(
"write() failed, errno: %1%") % errno).str());
205 if (_state != IN_PROGRESS) {
207 "finish() called on a finished or failed SequentialFileWriter");
211 while (fsync(_fd) != 0) {
212 if (errno != EINTR) {
215 (
boost::format(
"fsync() failed, errno: %1%") % errno).str());
226 std::string
const & fileName,
227 std::size_t
const blockSize
231 _blockSize(blockSize),
236 if (blockSize < 1024 || blockSize > 16777216 || (blockSize & (blockSize - 1)) != 0) {
238 "I/O block size must be a power of 2 between 2^10 and 2^24 bytes");
241 std::memset(&
_stream, 0,
sizeof(::z_stream));
242 std::memset(&
_request, 0,
sizeof(::aiocb));
246 boost::scoped_array<unsigned char> mem(
new unsigned char[2*blockSize + 8192]);
247 std::size_t buf =
reinterpret_cast<std::size_t
>(mem.get());
248 _buffers =
reinterpret_cast<unsigned char *
>((buf + 8191) & ~static_cast<std::size_t>(8191));
251 int const fd = openFile(fileName, O_RDONLY);
260 if (::fstat(fd, &sb) != 0) {
262 (
boost::format(
"fstat() failed on file %1%, errno: %2%") % fileName % errno).str());
264 _fileSize =
static_cast<std::size_t
>(sb.st_size);
272 if (inflateInit2(&
_stream, MAX_WBITS + 32) != Z_OK) {
274 "[zlib] inflateInit2() failed to initialize decompression stream");
285 (
boost::format(
"aio_read() failed to enqueue IO request for file %1%, errno: %2%") %
286 fileName % errno).str());
303 if (_state == IN_PROGRESS) {
304 ::aio_cancel(_fd, 0);
307 ::inflateEnd(&_stream);
314 unsigned char *
const buf,
315 std::size_t
const len
321 throw LSST_EXCEPT(ex::InvalidParameterError,
"null pointer to read destination");
323 if (_state == FAILED) {
324 throw LSST_EXCEPT(ex::IoError,
"read() called on a failed CompressedFileReader");
325 }
else if (_state == FINISHED) {
331 _stream.next_out = buf;
332 _stream.avail_out = len;
333 if (_stream.avail_in > 0) {
334 int const zret = ::inflate(&_stream, Z_SYNC_FLUSH);
335 if (zret == Z_STREAM_END) {
336 if (_remaining != 0) {
339 "[zlib] inflate(): unexpected end of stream");
341 if (_stream.avail_in != 0) {
344 "[zlib] inflate() failed to consume input");
347 return len - _stream.avail_out;
348 }
else if (zret == Z_OK) {
349 if (_stream.avail_out == 0) {
351 }
else if (_remaining == 0) {
354 "[zlib] inflate(): expected end of stream");
355 }
else if (_stream.avail_in != 0) {
358 "[zlib] inflate() failed to consume input");
364 (
boost::format(
"[zlib] inflate() failed, return code: %1%") % zret).str());
369 while (::aio_error(&_request) == EINPROGRESS) {
371 ::aiocb *
const list = &_request;
373 if (::aio_suspend(&list, 1, &timeout) != 0) {
374 if (errno != EINTR) {
375 if (errno == EAGAIN) {
377 throw LSST_EXCEPT(ex::TimeoutError,
"aio_read() timed out");
381 (
boost::format(
"aio_suspend() failed, errno: %1%") % errno).str());
389 int const nb = ::aio_return(&_request);
393 (
boost::format(
"aio_read() failed, errno: %1%") % ::aio_error(&_request)).str());
394 }
else if (nb == 0) {
396 throw LSST_EXCEPT(ex::IoError,
"aio_read(): unexpected end of file reached");
399 unsigned char * ready =
static_cast<unsigned char *
>(
const_cast<void *
>(_request.aio_buf));
400 _remaining -=
static_cast<std::size_t
>(nb);
401 if (_remaining > 0) {
403 _request.aio_buf = (ready == _buffers) ? ready + _blockSize : _buffers;
404 _request.aio_offset = _fileSize - _remaining;
405 _request.aio_nbytes =
std::min(_blockSize, _remaining);
406 if (::aio_read(&_request) != 0) {
409 (
boost::format(
"aio_read() failed to enqueue IO request, errno: %1%") % errno).str());
414 _stream.next_in = ready;
415 _stream.avail_in = nb;
416 int const zret = ::inflate(&_stream, Z_SYNC_FLUSH);
417 if (zret == Z_STREAM_END) {
418 if (_remaining != 0) {
421 "[zlib] inflate(): unexpected end of stream");
423 if (_stream.avail_in != 0) {
426 "[zlib] inflate() failed to consume input");
429 }
else if (zret != Z_OK) {
432 (
boost::format(
"[zlib] inflate() failed, return code: %1%") % zret).str());
433 }
else if (_stream.avail_out != 0) {
434 if (_remaining == 0) {
437 "[zlib] inflate(): expected end of stream");
438 }
else if (_stream.avail_in != 0) {
441 "[zlib] inflate() failed to consume input");
444 return len - _stream.avail_out;
451 std::string
const & fileName,
452 bool const overwrite,
453 std::size_t
const blockSize
457 _blockSize(blockSize),
461 if (blockSize < 1024 || blockSize > 16777216) {
463 "I/O block size must be a power of 2 between 2^10 and 2^24 bytes");
466 std::memset(&
_stream, 0,
sizeof(::z_stream));
467 std::memset(&
_request, 0,
sizeof(::aiocb));
471 boost::scoped_array<unsigned char> mem(
new unsigned char[2*blockSize + 8192]);
472 std::size_t buf =
reinterpret_cast<std::size_t
>(mem.get());
473 _buffers =
reinterpret_cast<unsigned char *
>((buf + 8191) & ~static_cast<std::size_t>(8191));
476 int const fd = openFile(
478 O_WRONLY | O_CREAT | O_APPEND | (overwrite ? O_TRUNC : O_EXCL),
479 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH
485 if (deflateInit2(&
_stream, 1, Z_DEFLATED, MAX_WBITS + 16, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
487 "[zlib] deflateInit2() failed to initialize compression stream");
506 ::aio_cancel(_fd, 0);
509 ::deflateEnd(&_stream);
516 unsigned char const *
const buf,
517 std::size_t
const len
524 "null pointer to read destination");
526 if (_state != IN_PROGRESS) {
528 "write() called on a finished or failed CompressedFileWriter");
531 _stream.next_in =
const_cast<Bytef *
>(buf);
532 _stream.avail_in = len;
533 if (_stream.avail_out == 0) {
534 unsigned char * buf =
static_cast<unsigned char *
>(
const_cast<void *
>(_request.aio_buf));
535 _stream.next_out = (buf == _buffers) ? buf + _blockSize : _buffers;
536 _stream.avail_out = _blockSize;
542 int const zret = ::deflate(&_stream, Z_NO_FLUSH);
546 (
boost::format(
"[zlib] deflate() failed, return code: %1%") % zret).str());
548 if (_stream.avail_out != 0) {
549 if (_stream.avail_in != 0) {
552 "[zlib] deflate() failed to consume input");
557 if (_stream.avail_out == 0) {
560 while (::aio_error(&_request) == EINPROGRESS) {
562 ::aiocb *
const list = &_request;
564 if (::aio_suspend(&list, 1, &timeout) != 0) {
565 if (errno != EINTR) {
566 if (errno == EAGAIN) {
568 throw LSST_EXCEPT(ex::TimeoutError,
"aio_write() timed out");
572 (
boost::format(
"aio_suspend() failed, errno: %1%") % errno).str());
579 if (::aio_return(&_request) != static_cast<int>(_blockSize)) {
583 ::aio_error(&_request)).str());
586 unsigned char * buf =
static_cast<unsigned char *
>(
const_cast<void *
>(_request.aio_buf));
587 _request.aio_buf = _stream.next_out - _blockSize;
588 _stream.next_out = buf;
589 _stream.avail_out = _blockSize;
591 if (aio_write(&_request) != 0) {
594 (
boost::format(
"aio_write() failed to enqueue IO request") % errno).str());
597 }
else if (_stream.avail_in != 0) {
599 "[zlib] deflate() failed to consume input");
602 }
while(_stream.avail_in != 0);
608 if (_state != IN_PROGRESS) {
610 "finish() called on a finished or failed CompressedFileWriter");
613 if (_stream.avail_out == 0) {
614 unsigned char * buf =
static_cast<unsigned char *
>(
const_cast<void *
>(_request.aio_buf));
615 _stream.next_out = (buf == _buffers) ? buf + _blockSize : _buffers;
616 _stream.avail_out = _blockSize;
621 int const zret = ::deflate(&_stream, Z_FINISH);
622 if (zret != Z_STREAM_END && zret != Z_OK) {
625 (
boost::format(
"[zlib] deflate() failed, return code: %1%") % zret).str());
628 while (::aio_error(&_request) == EINPROGRESS) {
630 ::aiocb *
const list = &_request;
632 if (::aio_suspend(&list, 1, &timeout) != 0) {
633 if (errno != EINTR) {
634 if (errno == EAGAIN) {
636 throw LSST_EXCEPT(ex::TimeoutError,
"aio_write() timed out");
640 (
boost::format(
"aio_suspend() failed, errno: %1%") % errno).str());
647 if (::aio_return(&_request) != static_cast<int>(_blockSize)) {
651 ::aio_error(&_request)).str());
654 if (zret == Z_STREAM_END) {
658 unsigned char * buf =
static_cast<unsigned char *
>(
const_cast<void *
>(_request.aio_buf));
659 _stream.next_out = buf;
660 _stream.avail_out = _blockSize;
662 _request.aio_buf = (buf == _buffers) ? buf + _blockSize : _buffers;
663 if (::aio_write(&_request) != 0) {
666 (
boost::format(
"aio_write() failed to enqueue IO request") % errno).str());
673 std::size_t bytesRemaining = _blockSize - _stream.avail_out;
674 unsigned char * buf = _stream.next_out - bytesRemaining;
675 while (bytesRemaining > 0) {
676 ::ssize_t nb = ::write(_fd, buf, bytesRemaining);
680 (
boost::format(
"write() failed, errno: %1%") % errno).str());
682 bytesRemaining -=
static_cast<std::size_t
>(nb);
686 while (::fsync(_fd) != 0) {
687 if (errno != EINTR) {
690 (
boost::format(
"fsync() failed, errno: %1%") % errno).str());
::z_stream _stream
zlib state
Low-level sequential file IO classes.
void swap(Ellipse< DataT > &a, Ellipse< DataT > &b)
boost::scoped_array< unsigned char > _memory
Wraps the C library timespec struct.
void swap(ImageBase< PixelT > &a, ImageBase< PixelT > &b)
virtual ~SequentialIoBase()=0
virtual void write(unsigned char const *const buf, std::size_t const size)
Writes len bytes from buf to the underlying storage device.
CompressedFileReader(std::string const &fileName, std::size_t const blockSize=262144)
Utility class for automatically invoking a function when leaving a scope.
virtual void finish()
Moves modified data to the underlying storage device and marks the SequentialWriter as finished...
virtual std::size_t read(unsigned char *const buf, std::size_t const size)
virtual ~SequentialFileWriter()
::aiocb _request
Outstanding IO request.
virtual ~SequentialFileReader()
::aiocb _request
Outstanding IO request.
boost::scoped_array< unsigned char > _memory
Convenience wrapper for the C library timespec struct and a simple profiling class.
std::size_t _fileSize
Size of the file being read.
virtual ~CompressedFileReader()
Master header file for the association pipeline.
virtual std::size_t read(unsigned char *const buf, std::size_t const size)
#define LSST_EXCEPT(type,...)
virtual void finish()
Moves modified data to the underlying storage device and marks the SequentialWriter as finished...
virtual void write(unsigned char const *const buf, std::size_t const len)
Writes len bytes from buf to the underlying storage device.
unsigned char * _buffers
aligned input buffers
::z_stream _stream
zlib state
CompressedFileWriter(std::string const &fileName, bool const overwrite=false, std::size_t const blockSize=262144)
std::size_t _remaining
Bytes that haven't yet been read.
SequentialFileWriter(std::string const &fileName, bool const overwrite=false)
Include files required for standard LSST Exception handling.
unsigned char * _buffers
aligned output buffers
virtual ~CompressedFileWriter()
SequentialFileReader(std::string const &fileName)
Utility class for automatically invoking a function when leaving a scope.