33 #ifndef LSST_AP_CHUNK_CC
34 #define LSST_AP_CHUNK_CC
40 #include "boost/scoped_array.hpp"
41 #include "boost/scoped_ptr.hpp"
52 template <
int MaxBlocksPerChunk>
66 _interestedParties.clear();
67 std::memset(_blocks, 0,
sizeof(_blocks));
74 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
75 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::reserve(
int const n) {
78 throw LSST_EXCEPT(lsst::pex::exceptions::LengthError,
79 "Requested chunk capacity is too large");
81 int nb = (n + ((1 << ENTRIES_PER_BLOCK_LOG2) - 1)) >> ENTRIES_PER_BLOCK_LOG2;
82 int b = _descriptor->_numBlocks;
83 _allocator->allocate(&(_descriptor->_blocks[b]), nb - b);
87 map(_descriptor->_blocks[b] + (
sizeof(
ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2)),
89 sizeof(DataT) << ENTRIES_PER_BLOCK_LOG2
92 _descriptor->_numBlocks = nb;
98 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
99 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::insert(
103 int const block = _descriptor->_nextBlock;
104 std::size_t off = _descriptor->_curBlockOffset;
105 int i = _descriptor->_index;
107 if (block == 0 || i >= (1 << ENTRIES_PER_BLOCK_LOG2)) {
109 if (block >= _descriptor->_numBlocks) {
110 if (block >= MAX_BLOCKS) {
111 throw LSST_EXCEPT(lsst::pex::exceptions::LengthError,
112 "maximum number of blocks per chunk exceeded");
114 off = _allocator->allocate();
119 sizeof(DataT) << ENTRIES_PER_BLOCK_LOG2
121 _descriptor->_blocks[block] = off;
122 _descriptor->_numBlocks = block + 1;
124 off = _descriptor->_blocks[block];
126 _descriptor->_nextBlock = block + 1;
127 _descriptor->_curBlockOffset = off;
132 std::size_t
const addr = off + i*
sizeof(DataT) +
134 new (
reinterpret_cast<DataT *
>(map(addr))) DataT(data);
136 _descriptor->_index = i + 1;
137 ++_descriptor->_size;
148 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
149 bool lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::pack(
int const i) {
151 static int const fb = (
sizeof(
ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2);
153 assert (i >= 0 && i < _descriptor->_size);
155 int delta = 0x7fffffff;
156 int sz = _descriptor->_size;
157 int dBlk = i >> ENTRIES_PER_BLOCK_LOG2;
158 int ib = i & ((1 << ENTRIES_PER_BLOCK_LOG2) - 1);
159 std::size_t
d = _descriptor->_blocks[dBlk];
160 std::size_t dEnd = d + BLOCK_SIZE;
163 d += fb + ib*
sizeof(DataT);
167 std::size_t sEnd = dEnd;
171 for ( ; s < sEnd; s +=
sizeof(DataT), sf++) {
172 if ((*sf & DELETED) != 0) {
176 if ((*sf & IN_DELTA) != 0 && delta == 0x7fffffff) {
178 delta = (dBlk << ENTRIES_PER_BLOCK_LOG2) +
179 (d - _descriptor->_blocks[dBlk] - fb)/
sizeof(DataT);
183 std::memcpy(map(d), map(s),
sizeof(DataT));
189 d = _descriptor->_blocks[dBlk];
190 dEnd = d + BLOCK_SIZE;
196 if (sBlk >= _descriptor->_nextBlock) {
199 s = _descriptor->_blocks[sBlk];
200 sEnd = s + BLOCK_SIZE;
205 if (sz < _descriptor->_size) {
206 std::size_t
const cb = _descriptor->_blocks[dBlk];
207 _descriptor->_curBlockOffset = cb;
208 _descriptor->_nextBlock = dBlk + 1;
210 _descriptor->_index = d/
sizeof(DataT);
211 _descriptor->_size = sz;
212 _descriptor->_delta =
std::min(sz, delta);
227 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
228 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::setFlags(
234 assert(i >= 0 && n >= 0 && i + n >= 0 && i + n <= (1 << ENTRIES_PER_BLOCK_LOG2));
237 std::memset(df, flags, n);
246 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
247 bool lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::rollback() {
250 for (
int b = 0; b < _descriptor->_nextBlock; ++
b) {
252 std::size_t
const off = _descriptor->_blocks[
b];
255 for (
int i = 0, e = entries(b); i < e; ++i) {
257 if ((f & INSERTED) != 0) {
259 _descriptor->_nextBlock = b + 1;
260 _descriptor->_curBlockOffset = off;
261 _descriptor->_index = i;
262 _descriptor->_size = i + (b << ENTRIES_PER_BLOCK_LOG2);
264 }
else if ((f & (UNCOMMITTED | DELETED)) == (UNCOMMITTED | DELETED)) {
266 flags[i] = f & ~(UNCOMMITTED | DELETED);
281 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
282 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::commit(
bool clearDelta) {
290 for (
int b = 0; b < _descriptor->_nextBlock; ++
b) {
291 std::size_t
const off = _descriptor->_blocks[
b];
293 int const e = entries(b);
295 for (
int i = 0; i < e; ++i) {
301 _descriptor->_delta = _descriptor->_size;
314 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
315 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::applyDeletes(
316 int const *
const deletes,
317 int const numDeletes,
321 for (
int i = 0; i < numDeletes; ++i) {
322 if (deletes[i] < 0 || deletes[i] >= end) {
324 "Binary chunk delta file contains an invalid delete marker - delta not applied"
330 for (
int i = 0; i < numDeletes; ++i) {
333 _descriptor->_blocks[d >> ENTRIES_PER_BLOCK_LOG2] +
334 (d & ((1 << ENTRIES_PER_BLOCK_LOG2) - 1)) *
sizeof(
ChunkEntryFlag)
341 namespace lsst {
namespace ap {
namespace {
343 void doRead(io::SequentialReader & reader,
unsigned char * dst, std::size_t dstlen) {
345 std::size_t nb = reader.read(dst, dstlen);
346 assert(nb <= dstlen);
348 throw LSST_EXCEPT(lsst::pex::exceptions::IoError,
"Unexpected end of file");
355 std::string
const badChunkMessage =
356 "Binary chunk file failed sanity checks - file is of the wrong format, "
357 "of the wrong record type, or corrupted. This error indicates that one "
358 "of the following situations has occurred:\n"
359 " - the file being read was not a chunk file\n"
360 " - the chunk file being read was generated with an incompatible\n"
361 " version of the association pipeline code\n"
362 " - the chunk file being read was generated on a machine with\n"
363 " either a different word size (32 vs. 64 bit) or endianness from\n"
364 " the runtime machine(s).\n"
365 "Please regenerate the chunk on a machine matching the characteristics "
366 "of the runtime machine(s), check your policy files, and try again.";
378 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
379 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::read(
380 std::string
const & name,
381 bool const compressed
384 boost::scoped_ptr<io::SequentialReader> reader;
386 reader.reset(
new io::CompressedFileReader(name));
388 reader.reset(
new io::SequentialFileReader(name));
391 if (reader->finished()) {
397 BinChunkHeader header;
398 doRead(*reader, reinterpret_cast<unsigned char *>(&header),
sizeof(BinChunkHeader));
399 if (!header.isValid() || header._numDeletes != 0 || header._recordSize !=
sizeof(DataT)) {
400 throw LSST_EXCEPT(lsst::pex::exceptions::IoError, badChunkMessage);
403 int nr = header._numRecords;
414 nd =
std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
416 doRead(*reader, reinterpret_cast<unsigned char *>(getBlock(b)), nd *
sizeof(DataT));
417 setFlags(b, 0, 0, nd);
422 _descriptor->_nextBlock =
b;
423 _descriptor->_curBlockOffset = _descriptor->_blocks[b - 1];
424 _descriptor->_index = nd;
425 int sz = nd + ((b - 1) << ENTRIES_PER_BLOCK_LOG2);
426 _descriptor->_size = sz;
427 _descriptor->_delta = sz;
439 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
440 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::readDelta(
441 std::string
const & name,
442 bool const compressed
444 boost::scoped_ptr<io::SequentialReader> reader;
446 reader.reset(
new io::CompressedFileReader(name));
448 reader.reset(
new io::SequentialFileReader(name));
451 if (reader->finished()) {
457 BinChunkHeader header;
458 doRead(*reader, reinterpret_cast<unsigned char *>(&header),
sizeof(BinChunkHeader));
459 if (!header.isValid() || header._recordSize !=
sizeof(DataT)) {
460 throw LSST_EXCEPT(lsst::pex::exceptions::IoError, badChunkMessage);
464 boost::scoped_array<int> deletes(header._numDeletes > 0 ?
new int[header._numDeletes] : 0);
465 doRead(*reader, reinterpret_cast<unsigned char *>(deletes.get()),
sizeof(
int) * header._numDeletes);
468 int nr = header._numRecords;
470 applyDeletes(deletes.get(), header._numDeletes, _descriptor->_size);
473 reserve(nr + _descriptor->_size);
476 int b = _descriptor->_nextBlock;
480 int i = _descriptor->_index;
481 int nd =
std::min((1 << ENTRIES_PER_BLOCK_LOG2) - i, nr);
483 doRead(*reader, reinterpret_cast<unsigned char *>(&getBlock(b)[i]), nd *
sizeof(DataT));
484 setFlags(b, IN_DELTA, i, nd);
490 nd =
std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
492 doRead(*reader, reinterpret_cast<unsigned char *>(getBlock(b)), nd *
sizeof(DataT));
493 setFlags(b, IN_DELTA, 0, nd);
497 int const sz = ((b - 1) << ENTRIES_PER_BLOCK_LOG2) + nd;
498 applyDeletes(deletes.get(), header._numDeletes, sz);
501 _descriptor->_nextBlock =
b;
502 _descriptor->_curBlockOffset = _descriptor->_blocks[b - 1];
503 _descriptor->_index = nd;
504 _descriptor->_size = sz;
520 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
521 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::write(
522 std::string
const & name,
523 bool const overwrite,
524 bool const compressed,
527 boost::scoped_ptr<io::SequentialWriter> writer;
529 writer.reset(
new io::CompressedFileWriter(name, overwrite));
531 writer.reset(
new io::SequentialFileWriter(name, overwrite));
535 int nr = withDelta ? size() : delta();
536 BinChunkHeader header;
537 header._numRecords = nr;
538 header._recordSize =
sizeof(DataT);
540 writer->write(reinterpret_cast<unsigned char *>(&header),
sizeof(BinChunkHeader));
541 for (
int b = 0; nr > 0; ++
b) {
542 int const nd =
std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
543 writer->write(reinterpret_cast<unsigned char const *>(getBlock(b)), nd *
sizeof(DataT));
562 template <
typename AllocatorT,
typename DataT,
typename TraitsT>
563 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::writeDelta(
564 std::string
const & name,
565 bool const overwrite,
566 bool const compressed
568 boost::scoped_ptr<io::SequentialWriter> writer;
570 writer.reset(
new io::CompressedFileWriter(name, overwrite));
572 writer.reset(
new io::SequentialFileWriter(name, overwrite));
576 std::vector<int> deletes;
578 int const nb = _descriptor->_nextBlock;
579 for (
int b = 0; b < nb; ++
b) {
581 int const nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
582 for (
int i = 0; i < nd; ++i) {
583 if ((f[i] & DELETED) != 0) {
584 deletes.push_back(i + (b << ENTRIES_PER_BLOCK_LOG2));
590 int fd = _descriptor->_delta;
591 BinChunkHeader header;
592 header._numRecords = _descriptor->_size - fd;
593 header._numDeletes = deletes.size();
594 header._recordSize =
sizeof(DataT);
595 writer->write(reinterpret_cast<unsigned char *>(&header),
sizeof(BinChunkHeader));
598 if (!deletes.empty()) {
599 writer->write(reinterpret_cast<unsigned char *>(&deletes.front()),
600 deletes.size() *
sizeof(int));
604 if (fd < _descriptor->_size) {
605 int b = fd >> ENTRIES_PER_BLOCK_LOG2;
606 fd &= (1 << ENTRIES_PER_BLOCK_LOG2) - 1;
607 int nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
608 writer->write(reinterpret_cast<unsigned char const *>(&getBlock(b)[fd]), (nd - fd) *
sizeof(DataT));
609 for (++b; b < nb; ++
b) {
610 nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
611 writer->write(reinterpret_cast<unsigned char const *>(getBlock(b)), nd *
sizeof(DataT));
620 #endif // LSST_AP_CHUNK_CC
Low-level sequential file IO classes.
unsigned char ChunkEntryFlag
Classes for holding spatial chunks of things in memory.
#define LSST_EXCEPT(type,...)
Compile time constants related to data types to be stored in chunks.
afw::table::Key< double > b
Include files required for standard LSST Exception handling.