20 #ifndef TPIE_COMPRESSED_BUFFER_H
21 #define TPIE_COMPRESSED_BUFFER_H
90 memory_size_type m_size;
92 stream_size_type m_readOffset;
93 memory_size_type m_blockSize;
100 , m_readOffset(1111111111111111111ull)
101 , m_blockSize(std::numeric_limits<memory_size_type>::max())
116 tp_assert(!(m_state != from),
"compressor_buffer: invalid state transition");
128 tp_assert(
false,
"is_busy: compressor_buffer in invalid state");
136 return m_storage.
get();
142 const char *
get()
const {
143 return m_storage.
get();
149 memory_size_type
size()
const {
157 return m_storage.
size();
171 m_storage.
resize(capacity);
181 m_readOffset = 1111111111111111111ull;
182 m_blockSize = std::numeric_limits<memory_size_type>::max();
185 memory_size_type get_block_size() {
return m_blockSize; }
186 stream_size_type get_read_offset() {
return m_readOffset; }
187 void set_block_size(memory_size_type s) { m_blockSize = s; }
188 void set_read_offset(stream_size_type s) { m_readOffset = s; }
213 typedef std::shared_ptr<compressor_buffer> buffer_t;
218 buffer_t allocate_own_buffer();
219 void release_own_buffer(buffer_t &);
221 bool can_take_shared_buffer();
222 buffer_t take_shared_buffer();
223 void release_shared_buffer(buffer_t &);
246 typedef std::shared_ptr<compressor_buffer> buffer_t;
248 const static memory_size_type OWN_BUFFERS = 1;
251 : m_blockSize(blockSize)
258 log_debug() <<
"ERROR: ~stream_buffers: not empty!" << std::endl;
262 static memory_size_type memory_usage(memory_size_type blockSize) {
267 if (!(m_ownBuffers < OWN_BUFFERS || can_take_shared_buffer())) {
269 buffermapit target = m_buffers.find(blockNumber);
270 if (target != m_buffers.end())
return target->second;
275 buffermapit i = m_buffers.begin();
276 while (i != m_buffers.end() && !i->second.unique()) ++i;
277 if (i == m_buffers.end()) {
278 compressor().wait_for_request_done(lock);
288 m_buffers.insert(std::make_pair(blockNumber, b));
293 std::pair<buffermapit, bool> res
294 = m_buffers.insert(std::make_pair(blockNumber, buffer_t()));
295 buffermapit & target = res.first;
296 bool & inserted = res.second;
297 if (!inserted)
return target->second;
309 buffermapit i = m_buffers.begin();
310 while (i != m_buffers.end() && !i->second.unique()) ++i;
312 if (i == m_buffers.end()) {
314 if (m_ownBuffers < OWN_BUFFERS) {
315 target->second = allocate_own_buffer();
316 }
else if (can_take_shared_buffer()) {
317 target->second = take_shared_buffer();
321 tp_assert(
false,
"get_buffer: Could not get a new buffer "
322 "contrary to previous checks");
326 target->second.swap(i->second);
331 buffer_t result = target->second;
339 return m_buffers.empty();
343 buffermapit i = m_buffers.begin();
344 while (i != m_buffers.end()) {
346 if (j->second.get() == 0) {
350 throw exception(
"stream_buffers: j->second.get() == 0");
351 }
else if (j->second.unique()) {
352 if (shared_buffers() > 0) {
353 release_shared_buffer(j->second);
355 release_own_buffer(j->second);
363 memory_size_type own_buffers() {
367 memory_size_type shared_buffers() {
368 return m_buffers.size() - m_ownBuffers;
371 void release_shared_buffer(buffer_t & b) {
375 void release_own_buffer(buffer_t & b) {
380 bool can_take_shared_buffer() {
384 buffer_t take_shared_buffer() {
388 buffer_t allocate_own_buffer() {
394 return the_compressor_thread();
397 memory_size_type block_size()
const {
401 memory_size_type m_blockSize;
403 typedef std::map<stream_size_type, buffer_t> buffermap_t;
404 typedef buffermap_t::iterator buffermapit;
405 buffermap_t m_buffers;
408 memory_size_type m_ownBuffers;
413 #endif // TPIE_COMPRESSED_BUFFER_H
Defines the tp_assert macro.
stream_buffer_pool & the_stream_buffer_pool()
Get the stream buffer pool singleton.
T * get()
Return a raw pointer to the array content.
void finish_stream_buffer_pool()
Used by tpie::finish to free stream buffer pool.
The buffer is different from the contents on the disk.
void set_capacity(memory_size_type capacity)
Resize internal buffer, clearing all elements.
void unused(const T &x)
Declare that a variable is unused on purpose.
memory_size_type size() const
Get number of bytes used to store items.
Buffer manager for a single stream.
Generic internal array with known memory requirements.
Interface to the compressor thread.
memory_size_type capacity() const
Get maximal byte size of buffer.
The buffer will soon change to reflect the contents on the disk.
A buffer for elements belonging to a specific stream block.
logstream & log_debug()
Return logstream for writing debug log messages.
void set_size(memory_size_type size)
Set number of bytes used to store items.
The buffer is equal to the contents on the disk.
void resize(size_t size, const T &elm)
Change the size of the array.
The different states of a compressor buffer.
size_type size() const
Return the size of the array.
void init_stream_buffer_pool()
Used by tpie::init to initialize stream buffer pool.
#define tp_assert(condition, message)
void reset()
Return buffer to a newly constructed state.
The buffer will soon be written to disk.