LSSTApplications  1.1.2+25,10.0+13,10.0+132,10.0+133,10.0+224,10.0+41,10.0+8,10.0-1-g0f53050+14,10.0-1-g4b7b172+19,10.0-1-g61a5bae+98,10.0-1-g7408a83+3,10.0-1-gc1e0f5a+19,10.0-1-gdb4482e+14,10.0-11-g3947115+2,10.0-12-g8719d8b+2,10.0-15-ga3f480f+1,10.0-2-g4f67435,10.0-2-gcb4bc6c+26,10.0-28-gf7f57a9+1,10.0-3-g1bbe32c+14,10.0-3-g5b46d21,10.0-4-g027f45f+5,10.0-4-g86f66b5+2,10.0-4-gc4fccf3+24,10.0-40-g4349866+2,10.0-5-g766159b,10.0-5-gca2295e+25,10.0-6-g462a451+1
LSSTDataManagementBasePackage
Chunk.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008, 2009, 2010 LSST Corporation.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <http://www.lsstcorp.org/LegalNotices/>.
23  */
24 
25 
33 #ifndef LSST_AP_CHUNK_CC
34 #define LSST_AP_CHUNK_CC
35 
36 #include <cstring>
37 #include <stdexcept>
38 #include <vector>
39 
40 #include "boost/scoped_array.hpp"
41 #include "boost/scoped_ptr.hpp"
42 
43 #include "lsst/pex/exceptions.h"
44 
45 #include "DataTraits.h"
46 #include "Chunk.h"
47 #include "io/FileIo.h"
48 
49 
50 // -- ChunkDescriptor ----------------
51 
52 template <int MaxBlocksPerChunk>
54 
55  _chunkId = -1;
56  _visitId = -1;
57  _nextChunk = -1;
58  _usable = false;
59  _numBlocks = 0;
60  _nextBlock = 0;
61  _index = 0;
62  _size = 0;
63  _delta = 0;
64  _curBlockOffset = 0;
65 
66  _interestedParties.clear();
67  std::memset(_blocks, 0, sizeof(_blocks));
68 }
69 
70 
71 // -- ChunkRef ----------------
72 
74 template <typename AllocatorT, typename DataT, typename TraitsT>
75 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::reserve(int const n) {
76  if (n > capacity()) {
77  if (n > 0x3fffffff) {
78  throw LSST_EXCEPT(lsst::pex::exceptions::LengthError,
79  "Requested chunk capacity is too large");
80  }
81  int nb = (n + ((1 << ENTRIES_PER_BLOCK_LOG2) - 1)) >> ENTRIES_PER_BLOCK_LOG2;
82  int b = _descriptor->_numBlocks;
83  _allocator->allocate(&(_descriptor->_blocks[b]), nb - b);
84  // zero out the newly allocated blocks
85  for (; b < nb; ++b) {
86  std::memset(
87  map(_descriptor->_blocks[b] + (sizeof(ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2)),
88  0,
89  sizeof(DataT) << ENTRIES_PER_BLOCK_LOG2
90  );
91  }
92  _descriptor->_numBlocks = nb;
93  }
94 }
95 
96 
98 template <typename AllocatorT, typename DataT, typename TraitsT>
99 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::insert(
100  DataT const & data,
101  ChunkEntryFlag const flags
102 ) {
103  int const block = _descriptor->_nextBlock;
104  std::size_t off = _descriptor->_curBlockOffset;
105  int i = _descriptor->_index;
106 
107  if (block == 0 || i >= (1 << ENTRIES_PER_BLOCK_LOG2)) {
108  // no current block, or current block is full
109  if (block >= _descriptor->_numBlocks) {
110  if (block >= MAX_BLOCKS) {
111  throw LSST_EXCEPT(lsst::pex::exceptions::LengthError,
112  "maximum number of blocks per chunk exceeded");
113  }
114  off = _allocator->allocate();
115  // zero out the newly allocated block
116  std::memset(
117  map(off + (sizeof(ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2)),
118  0,
119  sizeof(DataT) << ENTRIES_PER_BLOCK_LOG2
120  );
121  _descriptor->_blocks[block] = off;
122  _descriptor->_numBlocks = block + 1;
123  } else {
124  off = _descriptor->_blocks[block];
125  }
126  _descriptor->_nextBlock = block + 1;
127  _descriptor->_curBlockOffset = off;
128  i = 0;
129  }
130 
131  // copy in data
132  std::size_t const addr = off + i*sizeof(DataT) +
133  (sizeof(ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2);
134  new (reinterpret_cast<DataT *>(map(addr))) DataT(data);
135  *reinterpret_cast<ChunkEntryFlag *>(map(off + i*sizeof(ChunkEntryFlag))) = flags;
136  _descriptor->_index = i + 1;
137  ++_descriptor->_size;
138 }
139 
140 
148 template <typename AllocatorT, typename DataT, typename TraitsT>
149 bool lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::pack(int const i) {
150 
151  static int const fb = (sizeof(ChunkEntryFlag) << ENTRIES_PER_BLOCK_LOG2);
152 
153  assert (i >= 0 && i < _descriptor->_size);
154 
155  int delta = 0x7fffffff;
156  int sz = _descriptor->_size;
157  int dBlk = i >> ENTRIES_PER_BLOCK_LOG2;
158  int ib = i & ((1 << ENTRIES_PER_BLOCK_LOG2) - 1);
159  std::size_t d = _descriptor->_blocks[dBlk];
160  std::size_t dEnd = d + BLOCK_SIZE;
161 
162  ChunkEntryFlag * df = map(d + ib*sizeof(ChunkEntryFlag));
163  d += fb + ib*sizeof(DataT);
164 
165  int sBlk = dBlk;
166  std::size_t s = d;
167  std::size_t sEnd = dEnd;
168  ChunkEntryFlag const * sf = df;
169 
170  while (true) {
171  for ( ; s < sEnd; s += sizeof(DataT), sf++) {
172  if ((*sf & DELETED) != 0) {
173  --sz;
174  continue;
175  }
176  if ((*sf & IN_DELTA) != 0 && delta == 0x7fffffff) {
177  // set delta
178  delta = (dBlk << ENTRIES_PER_BLOCK_LOG2) +
179  (d - _descriptor->_blocks[dBlk] - fb)/sizeof(DataT);
180  }
181  if (df != sf) {
182  *df = *sf;
183  std::memcpy(map(d), map(s), sizeof(DataT));
184  }
185  df++;
186  d += sizeof(DataT);
187  if (d == dEnd) {
188  ++dBlk;
189  d = _descriptor->_blocks[dBlk];
190  dEnd = d + BLOCK_SIZE;
191  df = map(d);
192  d += fb;
193  }
194  }
195  ++sBlk;
196  if (sBlk >= _descriptor->_nextBlock) {
197  break;
198  }
199  s = _descriptor->_blocks[sBlk];
200  sEnd = s + BLOCK_SIZE;
201  sf = map(s);
202  s += fb;
203  }
204 
205  if (sz < _descriptor->_size) {
206  std::size_t const cb = _descriptor->_blocks[dBlk];
207  _descriptor->_curBlockOffset = cb;
208  _descriptor->_nextBlock = dBlk + 1;
209  d -= cb + fb;
210  _descriptor->_index = d/sizeof(DataT);
211  _descriptor->_size = sz;
212  _descriptor->_delta = std::min(sz, delta);
213  return true;
214  }
215  return false;
216 }
217 
218 
227 template <typename AllocatorT, typename DataT, typename TraitsT>
228 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::setFlags(
229  int const b,
230  ChunkEntryFlag const flags,
231  int const i,
232  int const n
233 ) {
234  assert(i >= 0 && n >= 0 && i + n >= 0 && i + n <= (1 << ENTRIES_PER_BLOCK_LOG2));
235 
236  ChunkEntryFlag * df = getFlagBlock(b) + i;
237  std::memset(df, flags, n);
238 }
239 
240 
246 template <typename AllocatorT, typename DataT, typename TraitsT>
247 bool lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::rollback() {
248  bool mod = false;
249 
250  for (int b = 0; b < _descriptor->_nextBlock; ++b) {
251 
252  std::size_t const off = _descriptor->_blocks[b];
253  ChunkEntryFlag * const flags = map(off);
254 
255  for (int i = 0, e = entries(b); i < e; ++i) {
256  ChunkEntryFlag f = flags[i];
257  if ((f & INSERTED) != 0) {
258  // remove all newly inserted entries
259  _descriptor->_nextBlock = b + 1;
260  _descriptor->_curBlockOffset = off;
261  _descriptor->_index = i;
262  _descriptor->_size = i + (b << ENTRIES_PER_BLOCK_LOG2);
263  return true;
264  } else if ((f & (UNCOMMITTED | DELETED)) == (UNCOMMITTED | DELETED)) {
265  // undo uncommitted deletes
266  flags[i] = f & ~(UNCOMMITTED | DELETED);
267  mod = true;
268  }
269  }
270  }
271  return mod;
272 }
273 
274 
281 template <typename AllocatorT, typename DataT, typename TraitsT>
282 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::commit(bool clearDelta) {
283 
284  ChunkEntryFlag mask = UNCOMMITTED | INSERTED;
285  if (clearDelta) {
286  mask |= IN_DELTA;
287  }
288  mask = ~mask;
289 
290  for (int b = 0; b < _descriptor->_nextBlock; ++b) {
291  std::size_t const off = _descriptor->_blocks[b];
292  ChunkEntryFlag * const flags = map(off);
293  int const e = entries(b);
294 
295  for (int i = 0; i < e; ++i) {
296  flags[i] &= mask;
297  }
298  }
299 
300  if (clearDelta) {
301  _descriptor->_delta = _descriptor->_size;
302  }
303 }
304 
305 
314 template <typename AllocatorT, typename DataT, typename TraitsT>
315 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::applyDeletes(
316  int const * const deletes,
317  int const numDeletes,
318  int const end
319 ) {
320  // check that all delete indexes are within the specified range
321  for (int i = 0; i < numDeletes; ++i) {
322  if (deletes[i] < 0 || deletes[i] >= end) {
323  throw LSST_EXCEPT(lsst::pex::exceptions::IoError,
324  "Binary chunk delta file contains an invalid delete marker - delta not applied"
325  );
326  }
327  }
328 
329  // ok - apply deletes.
330  for (int i = 0; i < numDeletes; ++i) {
331  int d = deletes[i];
332  ChunkEntryFlag * f = reinterpret_cast<ChunkEntryFlag *>(map(
333  _descriptor->_blocks[d >> ENTRIES_PER_BLOCK_LOG2] +
334  (d & ((1 << ENTRIES_PER_BLOCK_LOG2) - 1)) * sizeof(ChunkEntryFlag)
335  ));
336  *f |= DELETED;
337  }
338 }
339 
340 
341 namespace lsst { namespace ap { namespace {
342 
343 void doRead(io::SequentialReader & reader, unsigned char * dst, std::size_t dstlen) {
344  while (dstlen > 0) {
345  std::size_t nb = reader.read(dst, dstlen);
346  assert(nb <= dstlen);
347  if (nb == 0) {
348  throw LSST_EXCEPT(lsst::pex::exceptions::IoError, "Unexpected end of file");
349  }
350  dst += nb;
351  dstlen -= nb;
352  }
353 }
354 
355 std::string const badChunkMessage =
356  "Binary chunk file failed sanity checks - file is of the wrong format, "
357  "of the wrong record type, or corrupted. This error indicates that one "
358  "of the following situations has occurred:\n"
359  " - the file being read was not a chunk file\n"
360  " - the chunk file being read was generated with an incompatible\n"
361  " version of the association pipeline code\n"
362  " - the chunk file being read was generated on a machine with\n"
363  " either a different word size (32 vs. 64 bit) or endianness from\n"
364  " the runtime machine(s).\n"
365  "Please regenerate the chunk on a machine matching the characteristics "
366  "of the runtime machine(s), check your policy files, and try again.";
367 
368 }}} // end of namespace lsst::ap::<anonymous>
369 
370 
378 template <typename AllocatorT, typename DataT, typename TraitsT>
379 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::read(
380  std::string const & name,
381  bool const compressed
382 ) {
383  clear();
384  boost::scoped_ptr<io::SequentialReader> reader;
385  if (compressed) {
386  reader.reset(new io::CompressedFileReader(name));
387  } else {
388  reader.reset(new io::SequentialFileReader(name));
389  }
390 
391  if (reader->finished()) {
392  // A non-existant chunk file is equivalent to an empty chunk file
393  return;
394  }
395 
396  // read in the header
397  BinChunkHeader header;
398  doRead(*reader, reinterpret_cast<unsigned char *>(&header), sizeof(BinChunkHeader));
399  if (!header.isValid() || header._numDeletes != 0 || header._recordSize != sizeof(DataT)) {
400  throw LSST_EXCEPT(lsst::pex::exceptions::IoError, badChunkMessage);
401  }
402 
403  int nr = header._numRecords;
404  if (nr == 0) {
405  return; // nothing to read in
406  }
407  reserve(nr);
408 
409  int b = 0;
410  int nd;
411 
412  // read in one memory block at a time
413  do {
414  nd = std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
415  nr -= nd;
416  doRead(*reader, reinterpret_cast<unsigned char *>(getBlock(b)), nd * sizeof(DataT));
417  setFlags(b, 0, 0, nd);
418  ++b;
419  } while (nr > 0);
420 
421  // update chunk size
422  _descriptor->_nextBlock = b;
423  _descriptor->_curBlockOffset = _descriptor->_blocks[b - 1];
424  _descriptor->_index = nd;
425  int sz = nd + ((b - 1) << ENTRIES_PER_BLOCK_LOG2);
426  _descriptor->_size = sz;
427  _descriptor->_delta = sz;
428 }
429 
430 
439 template <typename AllocatorT, typename DataT, typename TraitsT>
440 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::readDelta(
441  std::string const & name,
442  bool const compressed
443 ) {
444  boost::scoped_ptr<io::SequentialReader> reader;
445  if (compressed) {
446  reader.reset(new io::CompressedFileReader(name));
447  } else {
448  reader.reset(new io::SequentialFileReader(name));
449  }
450 
451  if (reader->finished()) {
452  // A non-existant chunk delta file is equivalent to an empty chunk delta file
453  return;
454  }
455 
456  // read in the header
457  BinChunkHeader header;
458  doRead(*reader, reinterpret_cast<unsigned char *>(&header), sizeof(BinChunkHeader));
459  if (!header.isValid() || header._recordSize != sizeof(DataT)) {
460  throw LSST_EXCEPT(lsst::pex::exceptions::IoError, badChunkMessage);
461  }
462 
463  // read in indexes of records to delete
464  boost::scoped_array<int> deletes(header._numDeletes > 0 ? new int[header._numDeletes] : 0);
465  doRead(*reader, reinterpret_cast<unsigned char *>(deletes.get()), sizeof(int) * header._numDeletes);
466 
467  // read in records to append
468  int nr = header._numRecords;
469  if (nr == 0) {
470  applyDeletes(deletes.get(), header._numDeletes, _descriptor->_size);
471  return; // nothing more to read in
472  }
473  reserve(nr + _descriptor->_size);
474 
475  // fill up the current block (or the first block if there is no current block)
476  int b = _descriptor->_nextBlock;
477  if (b > 0) {
478  --b;
479  }
480  int i = _descriptor->_index;
481  int nd = std::min((1 << ENTRIES_PER_BLOCK_LOG2) - i, nr);
482  nr -= nd;
483  doRead(*reader, reinterpret_cast<unsigned char *>(&getBlock(b)[i]), nd * sizeof(DataT));
484  setFlags(b, IN_DELTA, i, nd);
485  nd += i;
486  ++b;
487 
488  // fill remaining blocks
489  while (nr > 0) {
490  nd = std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
491  nr -= nd;
492  doRead(*reader, reinterpret_cast<unsigned char *>(getBlock(b)), nd * sizeof(DataT));
493  setFlags(b, IN_DELTA, 0, nd);
494  ++b;
495  }
496 
497  int const sz = ((b - 1) << ENTRIES_PER_BLOCK_LOG2) + nd;
498  applyDeletes(deletes.get(), header._numDeletes, sz);
499 
500  // update chunk state to reflect additions
501  _descriptor->_nextBlock = b;
502  _descriptor->_curBlockOffset = _descriptor->_blocks[b - 1];
503  _descriptor->_index = nd;
504  _descriptor->_size = sz;
505 }
506 
507 
520 template <typename AllocatorT, typename DataT, typename TraitsT>
521 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::write(
522  std::string const & name,
523  bool const overwrite,
524  bool const compressed,
525  bool const withDelta
526 ) const {
527  boost::scoped_ptr<io::SequentialWriter> writer;
528  if (compressed) {
529  writer.reset(new io::CompressedFileWriter(name, overwrite));
530  } else {
531  writer.reset(new io::SequentialFileWriter(name, overwrite));
532  }
533 
534  // create header
535  int nr = withDelta ? size() : delta();
536  BinChunkHeader header;
537  header._numRecords = nr;
538  header._recordSize = sizeof(DataT);
539 
540  writer->write(reinterpret_cast<unsigned char *>(&header), sizeof(BinChunkHeader));
541  for (int b = 0; nr > 0; ++b) {
542  int const nd = std::min((1 << ENTRIES_PER_BLOCK_LOG2), nr);
543  writer->write(reinterpret_cast<unsigned char const *>(getBlock(b)), nd * sizeof(DataT));
544  nr -= nd;
545  }
546  writer->finish();
547 }
548 
549 
562 template <typename AllocatorT, typename DataT, typename TraitsT>
563 void lsst::ap::ChunkRef<AllocatorT, DataT, TraitsT>::writeDelta(
564  std::string const & name,
565  bool const overwrite,
566  bool const compressed
567 ) const {
568  boost::scoped_ptr<io::SequentialWriter> writer;
569  if (compressed) {
570  writer.reset(new io::CompressedFileWriter(name, overwrite));
571  } else {
572  writer.reset(new io::SequentialFileWriter(name, overwrite));
573  }
574 
575  // collect all deletes (there are likely to be very few, if any)
576  std::vector<int> deletes;
577 
578  int const nb = _descriptor->_nextBlock;
579  for (int b = 0; b < nb; ++b) {
580  ChunkEntryFlag const * const f = getFlagBlock(b);
581  int const nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
582  for (int i = 0; i < nd; ++i) {
583  if ((f[i] & DELETED) != 0) {
584  deletes.push_back(i + (b << ENTRIES_PER_BLOCK_LOG2));
585  }
586  }
587  }
588 
589  // create and write header
590  int fd = _descriptor->_delta;
591  BinChunkHeader header;
592  header._numRecords = _descriptor->_size - fd;
593  header._numDeletes = deletes.size();
594  header._recordSize = sizeof(DataT);
595  writer->write(reinterpret_cast<unsigned char *>(&header), sizeof(BinChunkHeader));
596 
597  // write array of delete indexes
598  if (!deletes.empty()) {
599  writer->write(reinterpret_cast<unsigned char *>(&deletes.front()),
600  deletes.size() * sizeof(int));
601  }
602 
603  // write out IN_DELTA chunk entries
604  if (fd < _descriptor->_size) {
605  int b = fd >> ENTRIES_PER_BLOCK_LOG2;
606  fd &= (1 << ENTRIES_PER_BLOCK_LOG2) - 1;
607  int nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
608  writer->write(reinterpret_cast<unsigned char const *>(&getBlock(b)[fd]), (nd - fd) * sizeof(DataT));
609  for (++b; b < nb; ++b) {
610  nd = (b < nb - 1) ? (1 << ENTRIES_PER_BLOCK_LOG2) : _descriptor->_index;
611  writer->write(reinterpret_cast<unsigned char const *>(getBlock(b)), nd * sizeof(DataT));
612  }
613  }
614 
615  // all done
616  writer->finish();
617 }
618 
619 
620 #endif // LSST_AP_CHUNK_CC
Low-level sequential file IO classes.
unsigned char ChunkEntryFlag
Definition: Chunk.h:142
double min
Definition: attributes.cc:216
int d
Definition: KDTree.cc:89
Classes for holding spatial chunks of things in memory.
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
Compile time constants related to data types to be stored in chunks.
afw::table::Key< double > b
Include files required for standard LSST Exception handling.