Commit 3ee801c8 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix some data race problems detected with helgrind. addresses #1277

git-svn-id: file:///svn/toku/tokudb.1032b+1343@8628 c7de825b-a66e-492c-adef-691d508d4ae1
parent c02a0607
...@@ -163,7 +163,8 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, ...@@ -163,7 +163,8 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0); r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
#if DO_WRITER_THREAD #if DO_WRITER_THREAD
threadpool_maybe_add(t->threadpool, cachetable_writer, t); for (i=0; i < (u_int32_t)nprocs; i++)
threadpool_maybe_add(t->threadpool, cachetable_writer, t);
#endif #endif
*result = t; *result = t;
return 0; return 0;
...@@ -566,7 +567,6 @@ static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) { ...@@ -566,7 +567,6 @@ static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
// evictions without a write can be run in the current thread // evictions without a write can be run in the current thread
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
} else { } else {
threadpool_maybe_add(ct->threadpool, cachetable_writer, ct);
writequeue_enq(&ct->wq, p); writequeue_enq(&ct->wq, p);
} }
#else #else
...@@ -1117,12 +1117,10 @@ static void *cachetable_writer(void *arg) { ...@@ -1117,12 +1117,10 @@ static void *cachetable_writer(void *arg) {
int r; int r;
cachetable_lock(ct); cachetable_lock(ct);
while (1) { while (1) {
threadpool_set_thread_idle(ct->threadpool);
PAIR p = 0; PAIR p = 0;
r = writequeue_deq(&ct->wq, &ct->mutex, &p); r = writequeue_deq(&ct->wq, &ct->mutex, &p);
if (r != 0) if (r != 0)
break; break;
threadpool_set_thread_busy(ct->threadpool);
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
} }
cachetable_unlock(ct); cachetable_unlock(ct);
......
...@@ -4,6 +4,28 @@ ...@@ -4,6 +4,28 @@
#include "includes.h" #include "includes.h"
#include "test.h" #include "test.h"
// this mutex is used by some of the tests to serialize access to some
// global data, especially between the test thread and the cachetable
// writeback threads
toku_pthread_mutex_t test_mutex;
static inline void test_mutex_init() {
int r = toku_pthread_mutex_init(&test_mutex, 0); assert(r == 0);
}
static inline void test_mutex_destroy() {
int r = toku_pthread_mutex_destroy(&test_mutex); assert(r == 0);
}
static inline void test_mutex_lock() {
int r = toku_pthread_mutex_lock(&test_mutex); assert(r == 0);
}
static inline void test_mutex_unlock() {
int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0);
}
// hook my_malloc_always_fails into malloc to control malloc and verify // hook my_malloc_always_fails into malloc to control malloc and verify
// the correct recovery from malloc failures // the correct recovery from malloc failures
#if defined(__linux__) #if defined(__linux__)
...@@ -517,7 +539,9 @@ static void test_size_flush_callback(CACHEFILE f, ...@@ -517,7 +539,9 @@ static void test_size_flush_callback(CACHEFILE f,
if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep); if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep);
if (keep) { if (keep) {
assert(do_write != 0); assert(do_write != 0);
test_mutex_lock();
test_size_flush_key = key; test_size_flush_key = key;
test_mutex_unlock();
} }
} }
...@@ -595,7 +619,9 @@ static void test_size_flush() { ...@@ -595,7 +619,9 @@ static void test_size_flush() {
assert(r == 0); assert(r == 0);
/* put 2*n keys into the table, ensure flushes occur in key order */ /* put 2*n keys into the table, ensure flushes occur in key order */
test_mutex_lock();
test_size_flush_key = make_blocknum(-1); test_size_flush_key = make_blocknum(-1);
test_mutex_unlock();
int i; int i;
CACHEKEY expect_flush_key = make_blocknum(0); CACHEKEY expect_flush_key = make_blocknum(0);
...@@ -623,11 +649,13 @@ static void test_size_flush() { ...@@ -623,11 +649,13 @@ static void test_size_flush() {
assert(entry_value == value); assert(entry_value == value);
assert(entry_size == size); assert(entry_size == size);
test_mutex_lock();
if (test_size_flush_key.b != -1) { if (test_size_flush_key.b != -1) {
assert(test_size_flush_key.b == expect_flush_key.b); assert(test_size_flush_key.b == expect_flush_key.b);
assert(expect_flush_key.b == i-n); assert(expect_flush_key.b == i-n);
expect_flush_key.b += 1; expect_flush_key.b += 1;
} }
test_mutex_unlock();
r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, size); r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, size);
assert(r == 0); assert(r == 0);
...@@ -663,6 +691,8 @@ test_main (int argc, const char *argv[]) { ...@@ -663,6 +691,8 @@ test_main (int argc, const char *argv[]) {
#endif #endif
} }
test_mutex_init();
// run tests // run tests
#if !defined(_WIN32) #if !defined(_WIN32)
test_multi_filehandles(); test_multi_filehandles();
...@@ -682,6 +712,8 @@ test_main (int argc, const char *argv[]) { ...@@ -682,6 +712,8 @@ test_main (int argc, const char *argv[]) {
test_size_resize(); test_size_resize();
test_size_flush(); test_size_flush();
} }
test_mutex_destroy();
toku_malloc_cleanup(); toku_malloc_cleanup();
if (verbose) printf("ok\n"); if (verbose) printf("ok\n");
return 0; return 0;
......
...@@ -48,8 +48,10 @@ static void print_ints(void) { ...@@ -48,8 +48,10 @@ static void print_ints(void) {
} }
static void item_becomes_present(CACHEFILE cf, CACHEKEY key) { static void item_becomes_present(CACHEFILE cf, CACHEKEY key) {
while (n_present >= N_PRESENT_LIMIT) toku_pthread_yield();
test_mutex_lock(); test_mutex_lock();
while (n_present >= N_PRESENT_LIMIT) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock();
}
assert(n_present<N_PRESENT_LIMIT); assert(n_present<N_PRESENT_LIMIT);
present_items[n_present].cf = cf; present_items[n_present].cf = cf;
present_items[n_present].key = key; present_items[n_present].key = key;
...@@ -163,7 +165,10 @@ static void test_chaining (void) { ...@@ -163,7 +165,10 @@ static void test_chaining (void) {
//print_ints(); //print_ints();
} }
for (trial=0; trial<TRIALS; trial++) { for (trial=0; trial<TRIALS; trial++) {
if (n_present>0) { test_mutex_lock();
int my_n_present = n_present;
test_mutex_unlock();
if (my_n_present>0) {
// First touch some random ones // First touch some random ones
test_mutex_lock(); test_mutex_lock();
int whichone = random()%n_present; int whichone = random()%n_present;
......
#include "includes.h" #include "includes.h"
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 1
struct threadpool { struct threadpool {
int max_threads; int max_threads;
int current_threads; int current_threads;
int busy_threads;
toku_pthread_t pids[]; toku_pthread_t pids[];
}; };
...@@ -17,7 +13,6 @@ int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) { ...@@ -17,7 +13,6 @@ int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
return ENOMEM; return ENOMEM;
threadpool->max_threads = max_threads; threadpool->max_threads = max_threads;
threadpool->current_threads = 0; threadpool->current_threads = 0;
threadpool->busy_threads = 0;
int i; int i;
for (i=0; i<max_threads; i++) for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0; threadpool->pids[i] = 0;
...@@ -38,31 +33,14 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) { ...@@ -38,31 +33,14 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) {
} }
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) { void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if ((threadpool->current_threads == 0 || threadpool->busy_threads < threadpool->current_threads) && threadpool->current_threads < threadpool->max_threads) { if (threadpool->current_threads < threadpool->max_threads) {
int r = toku_pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg); int r = toku_pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg);
if (r == 0) { if (r == 0) {
threadpool->current_threads++; threadpool->current_threads++;
threadpool_set_thread_busy(threadpool);
} }
} }
} }
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
}
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif
}
int threadpool_get_current_threads(THREADPOOL threadpool) { int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads; return threadpool->current_threads;
} }
...@@ -34,16 +34,6 @@ void threadpool_destroy(THREADPOOL *threadpoolptr); ...@@ -34,16 +34,6 @@ void threadpool_destroy(THREADPOOL *threadpoolptr);
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg); void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Set the current thread busy
// Effects: the threadpool keeps a count of the number of idle threads. It
// uses this count to control the creation of additional threads.
void threadpool_set_thread_busy(THREADPOOL);
// Set the current thread idle
void threadpool_set_thread_idle(THREADPOOL);
// get the current number of threads // get the current number of threads
int threadpool_get_current_threads(THREADPOOL); int threadpool_get_current_threads(THREADPOOL);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment