Commit 32163e33 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5351 fixes to circular buffer

git-svn-id: file:///svn/toku/tokudb@48718 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0fd3fcc3
...@@ -9,20 +9,26 @@ namespace toku { ...@@ -9,20 +9,26 @@ namespace toku {
template<typename T> template<typename T>
void circular_buffer<T>::init(T * const array, size_t cap) { void circular_buffer<T>::init(T * const array, size_t cap) {
invariant_notnull(array); invariant_notnull(array);
toku_mutex_init(&m_lock, nullptr);
toku_cond_init(&m_cond, nullptr);
m_array = array; m_array = array;
m_cap = cap; m_cap = cap;
m_begin = 0; m_begin = 0;
m_limit = 0; m_limit = 0;
toku_mutex_init(&m_lock, nullptr);
toku_cond_init(&m_push_cond, nullptr);
toku_cond_init(&m_pop_cond, nullptr);
m_push_waiters = 0;
m_pop_waiters = 0;
} }
template<typename T> template<typename T>
void circular_buffer<T>::deinit(void) { void circular_buffer<T>::deinit(void) {
lock(); lock();
invariant(this->is_empty()); invariant(is_empty());
invariant_zero(m_push_waiters);
invariant_zero(m_pop_waiters);
unlock(); unlock();
toku_cond_destroy(&m_cond); toku_cond_destroy(&m_pop_cond);
toku_cond_destroy(&m_push_cond);
toku_mutex_destroy(&m_lock); toku_mutex_destroy(&m_lock);
} }
...@@ -70,11 +76,10 @@ namespace toku { ...@@ -70,11 +76,10 @@ namespace toku {
void circular_buffer<T>::push_and_maybe_signal_unlocked(const T &elt) { void circular_buffer<T>::push_and_maybe_signal_unlocked(const T &elt) {
toku_mutex_assert_locked(&m_lock); toku_mutex_assert_locked(&m_lock);
invariant(!is_full()); invariant(!is_full());
bool will_signal = is_empty();
size_t location = m_limit++; size_t location = m_limit++;
*get_addr(location) = elt; *get_addr(location) = elt;
if (will_signal) { if (m_pop_waiters > 0) {
toku_cond_signal(&m_cond); toku_cond_signal(&m_pop_cond);
} }
} }
...@@ -82,7 +87,9 @@ namespace toku { ...@@ -82,7 +87,9 @@ namespace toku {
void circular_buffer<T>::push(const T &elt) { void circular_buffer<T>::push(const T &elt) {
lock(); lock();
while (is_full()) { while (is_full()) {
toku_cond_wait(&m_cond, &m_lock); ++m_push_waiters;
toku_cond_wait(&m_push_cond, &m_lock);
--m_push_waiters;
} }
push_and_maybe_signal_unlocked(elt); push_and_maybe_signal_unlocked(elt);
unlock(); unlock();
...@@ -92,7 +99,7 @@ namespace toku { ...@@ -92,7 +99,7 @@ namespace toku {
bool circular_buffer<T>::trypush(const T &elt) { bool circular_buffer<T>::trypush(const T &elt) {
bool pushed = false; bool pushed = false;
lock(); lock();
if (!is_full()) { if (!is_full() && m_push_waiters == 0) {
push_and_maybe_signal_unlocked(elt); push_and_maybe_signal_unlocked(elt);
pushed = true; pushed = true;
} }
...@@ -104,11 +111,10 @@ namespace toku { ...@@ -104,11 +111,10 @@ namespace toku {
T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) { T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) {
toku_mutex_assert_locked(&m_lock); toku_mutex_assert_locked(&m_lock);
invariant(!is_empty()); invariant(!is_empty());
bool will_signal = is_full();
T ret = *get_addr(m_begin); T ret = *get_addr(m_begin);
++m_begin; ++m_begin;
if (will_signal) { if (m_push_waiters > 0) {
toku_cond_signal(&m_cond); toku_cond_signal(&m_push_cond);
} }
return ret; return ret;
} }
...@@ -117,7 +123,9 @@ namespace toku { ...@@ -117,7 +123,9 @@ namespace toku {
T circular_buffer<T>::pop(void) { T circular_buffer<T>::pop(void) {
lock(); lock();
while (is_empty()) { while (is_empty()) {
toku_cond_wait(&m_cond, &m_lock); ++m_pop_waiters;
toku_cond_wait(&m_pop_cond, &m_lock);
--m_pop_waiters;
} }
T ret = pop_and_maybe_signal_unlocked(); T ret = pop_and_maybe_signal_unlocked();
unlock(); unlock();
...@@ -129,7 +137,7 @@ namespace toku { ...@@ -129,7 +137,7 @@ namespace toku {
bool popped = false; bool popped = false;
invariant_notnull(eltp); invariant_notnull(eltp);
lock(); lock();
if (!is_empty()) { if (!is_empty() && m_pop_waiters == 0) {
*eltp = pop_and_maybe_signal_unlocked(); *eltp = pop_and_maybe_signal_unlocked();
popped = true; popped = true;
} }
......
...@@ -47,11 +47,13 @@ private: ...@@ -47,11 +47,13 @@ private:
T pop_and_maybe_signal_unlocked(void); T pop_and_maybe_signal_unlocked(void);
toku_mutex_t m_lock;
toku_cond_t m_cond;
T *m_array; T *m_array;
size_t m_cap; size_t m_cap;
size_t m_begin, m_limit; size_t m_begin, m_limit;
toku_mutex_t m_lock;
toku_cond_t m_push_cond;
toku_cond_t m_pop_cond;
int m_push_waiters, m_pop_waiters;
}; };
} }
......
...@@ -13,18 +13,20 @@ ...@@ -13,18 +13,20 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "circular_buffer.h" #include "circular_buffer.h"
#include "memory.h" #include "memory.h"
#include "toku_time.h"
#include "test.h" #include "test.h"
static int verbose = 0; static int verbose = 0;
static volatile bool running; static volatile bool running;
static volatile bool producers_joined;
static void *producer(void *extra) { static void *producer(void *extra) {
toku::circular_buffer<uint32_t> *buf = static_cast<toku::circular_buffer<uint32_t> *>(extra); toku::circular_buffer<uint32_t> *buf = static_cast<toku::circular_buffer<uint32_t> *>(extra);
while (running) { while (running) {
buf->push(random()); buf->push(random());
usleep(random() % 10000); if (running) {
usleep(random() % 1000);
}
} }
return nullptr; return nullptr;
...@@ -38,15 +40,15 @@ struct consumer_extra { ...@@ -38,15 +40,15 @@ struct consumer_extra {
static void *consumer(void *extra) { static void *consumer(void *extra) {
struct consumer_extra *e = static_cast<struct consumer_extra *>(extra); struct consumer_extra *e = static_cast<struct consumer_extra *>(extra);
while (running) { while (!producers_joined) {
e->xorsum ^= e->buf->pop(); e->xorsum ^= e->buf->pop();
usleep(random() % 1000); if (running) {
usleep(random() % 100);
}
} }
uint32_t x; uint32_t x;
bool popped = e->buf->trypop(&x); while (e->buf->trypop(&x)) {
while (popped) {
e->xorsum ^= x; e->xorsum ^= x;
popped = e->buf->trypop(&x);
} }
return nullptr; return nullptr;
...@@ -83,6 +85,8 @@ static void test_with_threads(void) { ...@@ -83,6 +85,8 @@ static void test_with_threads(void) {
r = toku_pthread_join(producer_thds[i], nullptr); r = toku_pthread_join(producer_thds[i], nullptr);
invariant_zero(r); invariant_zero(r);
} }
swapped = __sync_bool_compare_and_swap(&producers_joined, false, true);
invariant(swapped);
r = toku_pthread_join(consumer_thd, nullptr); r = toku_pthread_join(consumer_thd, nullptr);
invariant_zero(r); invariant_zero(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment