Commit b09778dd authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2513 merge the cached threadpool to main refs[t:2513]

git-svn-id: file:///svn/toku/tokudb@24627 c7de825b-a66e-492c-adef-691d508d4ae1
parent cf5f49ee
...@@ -58,7 +58,30 @@ toku_os_get_number_processors(void) { ...@@ -58,7 +58,30 @@ toku_os_get_number_processors(void) {
int int
toku_os_get_number_active_processors(void) { toku_os_get_number_active_processors(void) {
return sysconf(_SC_NPROCESSORS_ONLN); int n = sysconf(_SC_NPROCESSORS_ONLN);
#define DO_AFFINITY 1
#if DO_AFFINITY
#include <sched.h>
cpu_set_t cpuset;
int r = sched_getaffinity(getpid(), sizeof cpuset, &cpuset);
assert(r == 0);
int nn = 0;
for (unsigned i = 0; i < 8 * sizeof cpuset; i++)
if (CPU_ISSET(i, &cpuset))
nn++;
assert(nn <= n);
n = nn;
#endif
#define DO_TOKU_NCPUS 1
#if DO_TOKU_NCPUS
char *toku_ncpus = getenv("TOKU_NCPUS");
if (toku_ncpus) {
int ncpus = atoi(toku_ncpus);
if (ncpus < n)
n = ncpus;
}
#endif
return n;
} }
int int
......
...@@ -78,8 +78,8 @@ BRT_SOURCES = \ ...@@ -78,8 +78,8 @@ BRT_SOURCES = \
sub_block \ sub_block \
ule \ ule \
threadpool \ threadpool \
toku_worker \
txn \ txn \
workqueue \
x1764 \ x1764 \
xids \ xids \
ybt \ ybt \
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "includes.h" #include "includes.h"
#include "toku_atomic.h" #include "toku_atomic.h"
#include "threadpool.h"
static BRT_UPGRADE_STATUS_S upgrade_status; // accountability, used in backwards_x.c static BRT_UPGRADE_STATUS_S upgrade_status; // accountability, used in backwards_x.c
...@@ -13,9 +14,6 @@ toku_brt_get_upgrade_status (BRT_UPGRADE_STATUS s) { ...@@ -13,9 +14,6 @@ toku_brt_get_upgrade_status (BRT_UPGRADE_STATUS s) {
*s = upgrade_status; *s = upgrade_status;
} }
// performance tracing // performance tracing
#define DO_TOKU_TRACE 0 #define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE #if DO_TOKU_TRACE
...@@ -30,15 +28,18 @@ static inline void do_toku_trace(const char *cp, int len) { ...@@ -30,15 +28,18 @@ static inline void do_toku_trace(const char *cp, int len) {
#endif #endif
static int num_cores = 0; // cache the number of cores for the parallelization static int num_cores = 0; // cache the number of cores for the parallelization
static struct toku_thread_pool *brt_pool = NULL;
int int
toku_brt_serialize_init(void) { toku_brt_serialize_init(void) {
num_cores = toku_os_get_number_processors(); num_cores = toku_os_get_number_active_processors();
int r = toku_thread_pool_create(&brt_pool, num_cores); assert(r == 0);
return 0; return 0;
} }
int int
toku_brt_serialize_destroy(void) { toku_brt_serialize_destroy(void) {
toku_thread_pool_destroy(&brt_pool);
return 0; return 0;
} }
...@@ -440,7 +441,7 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf, ...@@ -440,7 +441,7 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf,
// compress all of the sub blocks // compress all of the sub blocks
char *uncompressed_ptr = uncompressed_buf + node_header_overhead; char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
char *compressed_ptr = compressed_buf + header_len; char *compressed_ptr = compressed_buf + header_len;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores); compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, brt_pool);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len); //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
...@@ -596,6 +597,7 @@ deserialize_child_buffer_worker(void *arg) { ...@@ -596,6 +597,7 @@ deserialize_child_buffer_worker(void *arg) {
break; break;
deserialize_child_buffer(dw->node, dw->cnum, &dw->rb, &dw->local_fingerprint); deserialize_child_buffer(dw->node, dw->cnum, &dw->rb, &dw->local_fingerprint);
} }
workset_release_ref(ws);
return arg; return arg;
} }
...@@ -626,11 +628,11 @@ deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_bloc ...@@ -626,11 +628,11 @@ deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_bloc
// deserialize the fifos // deserialize the fifos
if (0) printf("%s:%d T=%d N=%d %d\n", __FUNCTION__, __LINE__, T, result->u.n.n_children, n_nonempty_fifos); if (0) printf("%s:%d T=%d N=%d %d\n", __FUNCTION__, __LINE__, T, result->u.n.n_children, n_nonempty_fifos);
toku_pthread_t tids[T]; toku_thread_pool_run(brt_pool, 0, &T, deserialize_child_buffer_worker, &ws);
threadset_create(tids, &T, deserialize_child_buffer_worker, &ws); workset_add_ref(&ws, T);
deserialize_child_buffer_worker(&ws); deserialize_child_buffer_worker(&ws);
workset_join(&ws);
threadset_join(tids, T);
// combine the fingerprints and update the buffer counts // combine the fingerprints and update the buffer counts
uint32_t check_local_fingerprint = 0; uint32_t check_local_fingerprint = 0;
for (int i = 0; i < result->u.n.n_children; i++) { for (int i = 0; i < result->u.n.n_children; i++) {
...@@ -915,7 +917,7 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, ...@@ -915,7 +917,7 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size,
unsigned char *uncompressed_data = rb->buf + node_header_overhead; unsigned char *uncompressed_data = rb->buf + node_header_overhead;
// decompress all the compressed sub blocks into the uncompressed buffer // decompress all the compressed sub blocks into the uncompressed buffer
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, brt_pool);
assert(r == 0); assert(r == 0);
toku_trace("decompress done"); toku_trace("decompress done");
......
...@@ -2842,7 +2842,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2842,7 +2842,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// compress and checksum the sub blocks // compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), (char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 1); (char *) (compressed_buf + header_len), 1, NULL);
// cppy the uncompressed header to the compressed buffer // cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning); memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#include "threadpool.h" #include "threadpool.h"
#include "cachetable.h" #include "cachetable.h"
#include "rwlock.h" #include "rwlock.h"
#include "toku_worker.h"
#include "log_header.h" #include "log_header.h"
#include "checkpoint.h" #include "checkpoint.h"
#include "minicron.h" #include "minicron.h"
...@@ -684,7 +683,7 @@ WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) { ...@@ -684,7 +683,7 @@ WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) {
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) { void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) {
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1); *n_in_queue = workqueue_n_in_queue(&ct->wq, 1);
*n_threads = threadpool_get_current_threads(ct->threadpool); *n_threads = toku_thread_pool_get_current_threads(ct->threadpool);
} }
//Test-only function //Test-only function
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "x1764.h" #include "x1764.h"
#include "threadpool.h"
#include "sub_block.h" #include "sub_block.h"
void void
...@@ -147,11 +148,12 @@ compress_worker(void *arg) { ...@@ -147,11 +148,12 @@ compress_worker(void *arg) {
break; break;
compress_sub_block(w->sub_block); compress_sub_block(w->sub_block);
} }
workset_release_ref(ws);
return arg; return arg;
} }
size_t size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores) { compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores, struct toku_thread_pool *pool) {
char *compressed_base_ptr = compressed_ptr; char *compressed_base_ptr = compressed_ptr;
size_t compressed_len; size_t compressed_len;
...@@ -186,12 +188,12 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un ...@@ -186,12 +188,12 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un
// compress the sub-blocks // compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks); if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T]; toku_thread_pool_run(pool, 0, &T, compress_worker, &ws);
threadset_create(tids, &T, compress_worker, &ws); workset_add_ref(&ws, T);
compress_worker(&ws); compress_worker(&ws);
// wait for all of the work to complete // wait for all of the work to complete
threadset_join(tids, T); workset_join(&ws);
// squeeze out the holes not used by the compress bound // squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size; compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
...@@ -246,11 +248,12 @@ decompress_worker(void *arg) { ...@@ -246,11 +248,12 @@ decompress_worker(void *arg) {
break; break;
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum); dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
} }
workset_release_ref(ws);
return arg; return arg;
} }
int int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) { decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores, struct toku_thread_pool *pool) {
int r; int r;
if (n_sub_blocks == 1) { if (n_sub_blocks == 1) {
...@@ -281,12 +284,12 @@ decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsign ...@@ -281,12 +284,12 @@ decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsign
// decompress the sub-blocks // decompress the sub-blocks
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T); if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T]; toku_thread_pool_run(pool, 0, &T, decompress_worker, &ws);
threadset_create(tids, &T, decompress_worker, &ws); workset_add_ref(&ws, T);
decompress_worker(&ws); decompress_worker(&ws);
// cleanup // cleanup
threadset_join(tids, T); workset_join(&ws);
workset_destroy(&ws); workset_destroy(&ws);
r = 0; r = 0;
......
...@@ -74,7 +74,7 @@ void * ...@@ -74,7 +74,7 @@ void *
compress_worker(void *arg); compress_worker(void *arg);
size_t size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores); compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores, struct toku_thread_pool *pool);
struct decompress_work { struct decompress_work {
struct work base; struct work base;
...@@ -104,10 +104,10 @@ decompress_worker(void *arg); ...@@ -104,10 +104,10 @@ decompress_worker(void *arg);
// decompress all sub blocks from the compressed_data buffer to the uncompressed_data buffer // decompress all sub blocks from the compressed_data buffer to the uncompressed_data buffer
// Returns 0 if success, otherwise an error // Returns 0 if success, otherwise an error
int int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores); decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores, struct toku_thread_pool *pool);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
}; }
#endif #endif
#endif #endif
...@@ -9,9 +9,8 @@ ...@@ -9,9 +9,8 @@
static TOKUTXN const null_txn = 0; static TOKUTXN const null_txn = 0;
static DB * const null_db = 0; static DB * const null_db = 0;
static void test_flat (void) { static void test_flat (u_int64_t limit) {
char fname[]= __FILE__ ".brt"; char fname[]= __FILE__ ".brt";
const u_int64_t limit=10000;
u_int64_t permute[limit]; u_int64_t permute[limit];
unlink(fname); unlink(fname);
CACHETABLE ct; CACHETABLE ct;
...@@ -67,7 +66,8 @@ static void test_flat (void) { ...@@ -67,7 +66,8 @@ static void test_flat (void) {
int int
test_main (int argc , const char *argv[]) { test_main (int argc , const char *argv[]) {
#define DO_AFFINITY 0 u_int64_t limit = 10000;
#define DO_AFFINITY 1
#if DO_AFFINITY == 0 #if DO_AFFINITY == 0
default_parse_args(argc, argv); default_parse_args(argc, argv);
#else #else
...@@ -83,6 +83,10 @@ test_main (int argc , const char *argv[]) { ...@@ -83,6 +83,10 @@ test_main (int argc , const char *argv[]) {
ncpus = atoi(argv[++i]); ncpus = atoi(argv[++i]);
continue; continue;
} }
if (strcmp(arg, "--limit") == 0 && i+1 < argc) {
limit = atoi(argv[++i]);
continue;
}
break; break;
} }
...@@ -103,7 +107,7 @@ test_main (int argc , const char *argv[]) { ...@@ -103,7 +107,7 @@ test_main (int argc , const char *argv[]) {
} }
#endif #endif
test_flat(); test_flat(limit);
if (verbose) printf("test ok\n"); if (verbose) printf("test ok\n");
return 0; return 0;
......
...@@ -23,7 +23,7 @@ set_uint8_at_offset(void *vp, size_t offset, uint8_t newv) { ...@@ -23,7 +23,7 @@ set_uint8_at_offset(void *vp, size_t offset, uint8_t newv) {
} }
static void static void
test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_cores) { test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_cores, struct toku_thread_pool *pool) {
if (verbose) if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, total_size, my_max_sub_blocks); printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, total_size, my_max_sub_blocks);
...@@ -42,7 +42,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -42,7 +42,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
void *cbuf = toku_malloc(cbuf_size_bound); void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf); assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores); size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores, pool);
assert(cbuf_size <= cbuf_size_bound); assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size); void *ubuf = toku_malloc(total_size);
...@@ -52,13 +52,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -52,13 +52,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
// corrupt a checksum // corrupt a checksum
sub_blocks[xidx].xsum += 1; sub_blocks[xidx].xsum += 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0); assert(r != 0);
// reset the checksums // reset the checksums
sub_blocks[xidx].xsum -= 1; sub_blocks[xidx].xsum -= 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0); assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0); assert(memcmp(buf, ubuf, total_size) == 0);
...@@ -67,13 +67,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -67,13 +67,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
unsigned char c = get_uint8_at_offset(cbuf, offset); unsigned char c = get_uint8_at_offset(cbuf, offset);
set_uint8_at_offset(cbuf, offset, c+1); set_uint8_at_offset(cbuf, offset, c+1);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0); assert(r != 0);
// reset the data // reset the data
set_uint8_at_offset(cbuf, offset, c); set_uint8_at_offset(cbuf, offset, c);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0); assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0); assert(memcmp(buf, ubuf, total_size) == 0);
} }
...@@ -90,16 +90,16 @@ set_random(void *buf, int total_size) { ...@@ -90,16 +90,16 @@ set_random(void *buf, int total_size) {
} }
static void static void
run_test(int total_size, int n_cores) { run_test(int total_size, int n_cores, struct toku_thread_pool *pool) {
void *buf = toku_malloc(total_size); void *buf = toku_malloc(total_size);
assert(buf); assert(buf);
for (int my_max_sub_blocks = 1; my_max_sub_blocks <= max_sub_blocks; my_max_sub_blocks++) { for (int my_max_sub_blocks = 1; my_max_sub_blocks <= max_sub_blocks; my_max_sub_blocks++) {
memset(buf, 0, total_size); memset(buf, 0, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores); test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores, pool);
set_random(buf, total_size); set_random(buf, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores); test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores, pool);
} }
toku_free(buf); toku_free(buf);
...@@ -129,11 +129,16 @@ test_main (int argc, const char *argv[]) { ...@@ -129,11 +129,16 @@ test_main (int argc, const char *argv[]) {
} }
} }
struct toku_thread_pool *pool = NULL;
int r = toku_thread_pool_create(&pool, 8); assert(r == 0);
for (int total_size = 256*1024; total_size <= 4*1024*1024; total_size *= 2) { for (int total_size = 256*1024; total_size <= 4*1024*1024; total_size *= 2) {
for (int size = total_size - e; size <= total_size + e; size++) { for (int size = total_size - e; size <= total_size + e; size++) {
run_test(size, n_cores); run_test(size, n_cores, pool);
} }
} }
toku_thread_pool_destroy(&pool);
return 0; return 0;
} }
...@@ -30,13 +30,13 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int ...@@ -30,13 +30,13 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
void *cbuf = toku_malloc(cbuf_size_bound); void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf); assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores); size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores, NULL);
assert(cbuf_size <= cbuf_size_bound); assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size); void *ubuf = toku_malloc(total_size);
assert(ubuf); assert(ubuf);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, NULL);
assert(r == 0); assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0); assert(memcmp(buf, ubuf, total_size) == 0);
......
...@@ -25,7 +25,7 @@ struct my_threadpool { ...@@ -25,7 +25,7 @@ struct my_threadpool {
static void static void
my_threadpool_init (struct my_threadpool *my_threadpool, int max_threads) { my_threadpool_init (struct my_threadpool *my_threadpool, int max_threads) {
int r; int r;
r = threadpool_create(&my_threadpool->threadpool, max_threads); assert(r == 0); r = toku_thread_pool_create(&my_threadpool->threadpool, max_threads); assert(r == 0);
assert(my_threadpool != 0); assert(my_threadpool != 0);
r = toku_pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0); r = toku_pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0);
r = toku_pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0); r = toku_pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0);
...@@ -41,8 +41,8 @@ my_threadpool_destroy (struct my_threadpool *my_threadpool, int max_threads) { ...@@ -41,8 +41,8 @@ my_threadpool_destroy (struct my_threadpool *my_threadpool, int max_threads) {
r = toku_pthread_cond_broadcast(&my_threadpool->wait); assert(r == 0); r = toku_pthread_cond_broadcast(&my_threadpool->wait); assert(r == 0);
r = toku_pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0); r = toku_pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool)); if (verbose) printf("current %d\n", toku_thread_pool_get_current_threads(my_threadpool->threadpool));
threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0); toku_thread_pool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
assert(my_threadpool->counter == max_threads); assert(my_threadpool->counter == max_threads);
r = toku_pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0); r = toku_pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0);
r = toku_pthread_cond_destroy(&my_threadpool->wait); assert(r == 0); r = toku_pthread_cond_destroy(&my_threadpool->wait); assert(r == 0);
...@@ -117,10 +117,11 @@ test_main (int argc, const char *argv[]) { ...@@ -117,10 +117,11 @@ test_main (int argc, const char *argv[]) {
threadpool = my_threadpool.threadpool; threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_busy\n"); if (verbose) printf("test threadpool_set_busy\n");
for (i=0; i<2*max_threads; i++) { for (i=0; i<2*max_threads; i++) {
assert(threadpool_get_current_threads(threadpool) == (i >= max_threads ? max_threads : i)); assert(toku_thread_pool_get_current_threads(threadpool) == (i >= max_threads ? max_threads : i));
threadpool_maybe_add(threadpool, my_thread_f, &my_threadpool); int n = 1;
toku_thread_pool_run(threadpool, 0, &n, my_thread_f, &my_threadpool);
} }
assert(threadpool_get_current_threads(threadpool) == max_threads); assert(toku_thread_pool_get_current_threads(threadpool) == max_threads);
my_threadpool_destroy(&my_threadpool, max_threads); my_threadpool_destroy(&my_threadpool, max_threads);
#if DO_MALLOC_HOOK #if DO_MALLOC_HOOK
...@@ -133,8 +134,8 @@ test_main (int argc, const char *argv[]) { ...@@ -133,8 +134,8 @@ test_main (int argc, const char *argv[]) {
void *(*orig_malloc_hook) (size_t, const __malloc_ptr_t) = __malloc_hook; void *(*orig_malloc_hook) (size_t, const __malloc_ptr_t) = __malloc_hook;
__malloc_hook = my_malloc_always_fails; __malloc_hook = my_malloc_always_fails;
int r; int r;
r = threadpool_create(&threadpool, 0); assert(r == ENOMEM); r = toku_thread_pool_create(&threadpool, 0); assert(r == ENOMEM);
r = threadpool_create(&threadpool, 1); assert(r == ENOMEM); r = toku_thread_pool_create(&threadpool, 1); assert(r == ENOMEM);
__malloc_hook = orig_malloc_hook; __malloc_hook = orig_malloc_hook;
} }
#endif #endif
......
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "threadpool.h"
int verbose = 0;
static int usage(int ncpus, int poolsize) {
fprintf(stderr, "[-q] [-v] [--verbose] (%d)\n", verbose);
fprintf(stderr, "[--ncpus %d]\n", ncpus);
fprintf(stderr, "[--poolsize %d]\n", poolsize);
return 1;
}
static void *f(void *arg) {
return arg;
}
static void dotest(int poolsize, int nloops) {
int r;
struct toku_thread_pool *pool = NULL;
r = toku_thread_pool_create(&pool, poolsize);
assert(r == 0 && pool != NULL);
int i;
for (i = 0; i < nloops; i++) {
int n = 1;
r = toku_thread_pool_run(pool, 1, &n, f, NULL);
assert(r == 0);
}
if (verbose)
toku_thread_pool_print(pool, stderr);
toku_thread_pool_destroy(&pool);
}
int main(int argc, char *argv[]) {
// defaults
int ncpus = 0;
int poolsize = 1;
int nloops = 100000;
// options
int i;
for (i = 1; i < argc; i++) {
char *arg = argv[i];
if (arg[0] != '-')
break;
if (strcmp(arg, "--ncpus") == 0 && i+1 < argc) {
ncpus = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--poolsize") == 0 && i+1 < argc) {
poolsize = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
verbose = verbose+1;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = verbose > 0 ? verbose-1 : 0;
continue;
}
return usage(ncpus, poolsize);
}
int starti = i;
if (ncpus > 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (i = 0; i < ncpus; i++)
CPU_SET(i, &cpuset);
int r;
r = sched_setaffinity(getpid(), sizeof cpuset, &cpuset);
assert(r == 0);
cpu_set_t use_cpuset;
CPU_ZERO(&use_cpuset);
r = sched_getaffinity(getpid(), sizeof use_cpuset, &use_cpuset);
assert(r == 0);
assert(memcmp(&cpuset, &use_cpuset, sizeof cpuset) == 0);
}
if (starti == argc) {
dotest(poolsize, nloops);
} else {
for (i = starti; i < argc; i++) {
nloops = atoi(argv[i]);
dotest(poolsize, nloops);
}
}
return 0;
}
...@@ -161,9 +161,10 @@ test_flow_control (int limit, int n, int maxthreads) { ...@@ -161,9 +161,10 @@ test_flow_control (int limit, int n, int maxthreads) {
THREADPOOL tp; THREADPOOL tp;
int i; int i;
rwfc_init(rwfc, limit); rwfc_init(rwfc, limit);
threadpool_create(&tp, maxthreads); toku_thread_pool_create(&tp, maxthreads);
for (i=0; i<maxthreads; i++) int T = maxthreads;
threadpool_maybe_add(tp, rwfc_worker, &rwfc->workqueue); toku_thread_pool_run(tp, 0, &T, rwfc_worker, &rwfc->workqueue);
assert(T == maxthreads);
sleep(1); // this is here to block the reader on the first deq sleep(1); // this is here to block the reader on the first deq
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
WORKITEM wi = new_workitem(); WORKITEM wi = new_workitem();
...@@ -179,7 +180,7 @@ test_flow_control (int limit, int n, int maxthreads) { ...@@ -179,7 +180,7 @@ test_flow_control (int limit, int n, int maxthreads) {
// toku_os_usleep(random() % 1); // toku_os_usleep(random() % 1);
} }
workqueue_set_closed(&rwfc->workqueue, 1); workqueue_set_closed(&rwfc->workqueue, 1);
threadpool_destroy(&tp); toku_thread_pool_destroy(&tp);
rwfc_destroy(rwfc); rwfc_destroy(rwfc);
} }
......
...@@ -11,49 +11,244 @@ ...@@ -11,49 +11,244 @@
#include "toku_pthread.h" #include "toku_pthread.h"
#include "toku_assert.h" #include "toku_assert.h"
#include "memory.h" #include "memory.h"
#include "toku_list.h"
#include "threadpool.h" #include "threadpool.h"
struct threadpool { struct toku_thread {
struct toku_thread_pool *pool;
toku_pthread_t tid;
void *(*f)(void *arg);
void *arg;
int doexit;
struct toku_list free_link;
struct toku_list all_link;
toku_pthread_cond_t wait;
};
struct toku_thread_pool {
int max_threads; int max_threads;
int current_threads; int cur_threads;
toku_pthread_t tids[]; struct toku_list free_threads;
struct toku_list all_threads;
toku_pthread_mutex_t lock;
toku_pthread_cond_t wait_free;
uint64_t gets, get_blocks;
}; };
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) { static void *toku_thread_run_internal(void *arg);
size_t size = sizeof (struct threadpool) + max_threads*sizeof (toku_pthread_t); static void toku_thread_pool_lock(struct toku_thread_pool *pool);
struct threadpool *threadpool = toku_malloc(size); static void toku_thread_pool_unlock(struct toku_thread_pool *pool);
if (threadpool == 0)
return ENOMEM; static int
threadpool->max_threads = max_threads; toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) {
threadpool->current_threads = 0; int r;
int i; struct toku_thread *thread = (struct toku_thread *) toku_malloc(sizeof *thread);
for (i=0; i<max_threads; i++) if (thread == NULL) {
memset(&threadpool->tids[i], 0, sizeof(threadpool->tids[i])); r = errno;
*threadpoolptr = threadpool; } else {
return 0; memset(thread, 0, sizeof *thread);
thread->pool = pool;
r = toku_pthread_cond_init(&thread->wait, NULL); invariant(r == 0);
r = toku_pthread_create(&thread->tid, NULL, toku_thread_run_internal, thread); invariant(r == 0);
*toku_thread_return = thread;
}
return r;
} }
void threadpool_destroy(THREADPOOL *threadpoolptr) { void
struct threadpool *threadpool = *threadpoolptr; toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) {
int i; int r;
for (i=0; i<threadpool->current_threads; i++) { toku_thread_pool_lock(thread->pool);
int r; void *ret; thread->f = f;
r = toku_pthread_join(threadpool->tids[i], &ret); thread->arg = arg;
assert(r == 0); toku_thread_pool_unlock(thread->pool);
r = toku_pthread_cond_signal(&thread->wait); invariant(r == 0);
}
static void
toku_thread_destroy(struct toku_thread *thread) {
int r;
void *ret;
r = toku_pthread_join(thread->tid, &ret); invariant(r == 0 && ret == thread);
struct toku_thread_pool *pool = thread->pool;
toku_thread_pool_lock(pool);
toku_list_remove(&thread->free_link);
toku_thread_pool_unlock(pool);
r = toku_pthread_cond_destroy(&thread->wait); invariant(r == 0);
toku_free(thread);
}
static void
toku_thread_ask_exit(struct toku_thread *thread) {
thread->doexit = 1;
int r = toku_pthread_cond_signal(&thread->wait); invariant(r == 0);
}
static void *
toku_thread_run_internal(void *arg) {
struct toku_thread *thread = (struct toku_thread *) arg;
struct toku_thread_pool *pool = thread->pool;
int r;
toku_thread_pool_lock(pool);
while (1) {
r = toku_pthread_cond_signal(&pool->wait_free); invariant(r == 0);
void *(*thread_f)(void *); void *thread_arg; int doexit;
while (1) {
thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy
if (thread_f || doexit)
break;
r = toku_pthread_cond_wait(&thread->wait, &pool->lock); invariant(r == 0);
}
toku_thread_pool_unlock(pool);
if (thread_f)
(void) thread_f(thread_arg);
if (doexit)
break;
toku_thread_pool_lock(pool);
thread->f = NULL;
toku_list_push(&pool->free_threads, &thread->free_link);
}
return arg;
}
int
toku_thread_pool_create(struct toku_thread_pool **pool_return, int max_threads) {
int r;
struct toku_thread_pool *pool = (struct toku_thread_pool *) toku_malloc(sizeof *pool);
if (pool == NULL) {
r = errno;
} else {
memset(pool, 0, sizeof *pool);
r = toku_pthread_mutex_init(&pool->lock, NULL); invariant(r == 0);
toku_list_init(&pool->free_threads);
toku_list_init(&pool->all_threads);
r = toku_pthread_cond_init(&pool->wait_free, NULL); invariant(r == 0);
pool->cur_threads = 0;
pool->max_threads = max_threads;
*pool_return = pool;
r = 0;
}
return r;
}
static void
toku_thread_pool_lock(struct toku_thread_pool *pool) {
int r = toku_pthread_mutex_lock(&pool->lock); invariant(r == 0);
}
static void
toku_thread_pool_unlock(struct toku_thread_pool *pool) {
int r = toku_pthread_mutex_unlock(&pool->lock); invariant(r == 0);
}
void
toku_thread_pool_destroy(struct toku_thread_pool **poolptr) {
struct toku_thread_pool *pool = *poolptr;
*poolptr = NULL;
// ask the threads to exit
toku_thread_pool_lock(pool);
struct toku_list *list;
for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) {
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
toku_thread_ask_exit(thread);
}
toku_thread_pool_unlock(pool);
// wait for all of the threads to exit
while (!toku_list_empty(&pool->all_threads)) {
list = toku_list_pop_head(&pool->all_threads);
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
toku_thread_destroy(thread);
pool->cur_threads -= 1;
}
invariant(pool->cur_threads == 0);
// cleanup
int r;
r = toku_pthread_cond_destroy(&pool->wait_free); invariant(r == 0);
r = toku_pthread_mutex_destroy(&pool->lock); invariant(r == 0);
toku_free(pool);
}
static int
toku_thread_pool_add(struct toku_thread_pool *pool) {
struct toku_thread *thread = NULL;
int r = toku_thread_create(pool, &thread);
if (r == 0) {
pool->cur_threads += 1;
toku_list_push(&pool->all_threads, &thread->all_link);
toku_list_push(&pool->free_threads, &thread->free_link);
r = toku_pthread_cond_signal(&pool->wait_free); invariant(r == 0);
} }
*threadpoolptr = 0; return r;
toku_free(threadpool);
} }
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) { // get one thread from the free pool.
if (threadpool->current_threads < threadpool->max_threads) { static int
int r = toku_pthread_create(&threadpool->tids[threadpool->current_threads], 0, f, arg); toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) {
int r = 0;
toku_thread_pool_lock(pool);
pool->gets++;
while (1) {
if (!toku_list_empty(&pool->free_threads))
break;
if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads)
(void) toku_thread_pool_add(pool);
if (toku_list_empty(&pool->free_threads) && !dowait) {
r = EWOULDBLOCK;
break;
}
pool->get_blocks++;
r = toku_pthread_cond_wait(&pool->wait_free, &pool->lock); invariant(r == 0);
}
if (r == 0) { if (r == 0) {
threadpool->current_threads++; struct toku_list *list = toku_list_pop_head(&pool->free_threads);
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link);
*toku_thread_return = thread;
} else
*toku_thread_return = NULL;
toku_thread_pool_unlock(pool);
return r;
}
int
toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) {
int r = 0;
int n = *nthreads;
int i;
for (i = 0; i < n; i++) {
r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]);
if (r != 0)
break;
} }
*nthreads = i;
return r;
}
int
toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) {
int n = *nthreads;
struct toku_thread *tids[n];
int r = toku_thread_pool_get(pool, dowait, nthreads, tids);
if (r == 0 || r == EWOULDBLOCK) {
n = *nthreads;
for (int i = 0; i < n; i++)
toku_thread_run(tids[i], f, arg);
} }
return r;
}
void
toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) {
fprintf(out, "%s:%d %p %llu %llu\n", __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks);
} }
int threadpool_get_current_threads(THREADPOOL threadpool) { int
return threadpool->current_threads; toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) {
return pool->cur_threads;
} }
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef THREADPOOL_H #ifndef TOKU_THREADPOOL_H
#define THREADPOOL_H #define TOKU_THREADPOOL_H
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus) #include "c_dialects.h"
extern "C" {
#endif
C_BEGIN
// A threadpool is a limited set of threads that can be used to apply a // A toku_thread is toku_pthread that can be cached.
// function to work contained in a work queue. The work queue is outside struct toku_thread;
// of the scope of the threadpool; the threadpool merely provides
// mechanisms to grow the number of threads in the threadpool on demand.
typedef struct threadpool *THREADPOOL; // Run a function f on a thread
// This function setups up the thread to run function f with argument arg and then wakes up
// the thread to run it.
void toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg);
// Create a new threadpool // A toku_thread_pool is a pool of toku_threads. These threads can be allocated from the pool
// Effects: a new threadpool is allocated and initialized. the number of // and can run an arbitrary function.
// threads in the threadpool is limited to max_threads. initially, there struct toku_thread_pool;
// are no threads in the pool.
// Returns: if there are no errors, the threadpool is set and zero is returned. typedef struct toku_thread_pool *THREADPOOL;
// Otherwise, an error number is returned.
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads); // Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of threads in the threadpool is limited to max_threads.
// If max_threads == 0 then there is no limit on the number of threads in the pool.
// Initially, there are no threads in the pool. Threads are allocated by the _get or _run functions.
// Returns: if there are no errors, the threadpool is set and zero is returned. Otherwise, an error number is returned.
int toku_thread_pool_create(struct toku_thread_pool **threadpoolptr, int max_threads);
// Destroy a threadpool // Destroy a threadpool
// Effects: the calling thread joins with all of the threads in the threadpool. // Effects: the calling thread joins with all of the threads in the threadpool.
// Effects: the threadpool memory is freed. // Effects: the threadpool memory is freed.
// Returns: the threadpool is set to null. // Returns: the threadpool is set to null.
void toku_thread_pool_destroy(struct toku_thread_pool **threadpoolptr);
void threadpool_destroy(THREADPOOL *threadpoolptr); // Get the current number of threads in the thread pool
int toku_thread_pool_get_current_threads(struct toku_thread_pool *pool);
// Maybe add a thread to the threadpool.
// Effects: the number of threads in the threadpool is expanded by 1 as long
// as the current number of threads in the threadpool is less than the max
// and there are no idle threads.
// Effects: if the thread is create, it calls the function f with argument arg
// Expects: external serialization on this function; only one thread may
// execute this function
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg); // Get one or more threads from the thread pool
// dowait indicates whether or not the caller blocks waiting for threads to free up
// nthreads on input determines the number of threads that are wanted
// nthreads on output indicates the number of threads that were allocated
// toku_thread_return on input supplies an array of thread pointers (all NULL). This function returns the threads
// that were allocated in the array.
int toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return);
// get the current number of threads // Run a function f on one or more threads allocated from the thread pool
int toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg);
int threadpool_get_current_threads(THREADPOOL); // Print the state of the thread pool
void toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out);
#if defined(__cplusplus) || defined(__cilkplusplus) C_END
};
#endif
#endif #endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_WORKER_H
#define _TOKU_WORKER_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// initialize the work queue and worker
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// destroy the work queue and worker
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
#include "toku_pthread.h" #include "toku_pthread.h"
#include "workqueue.h" #include "workqueue.h"
#include "threadpool.h" #include "threadpool.h"
#include "toku_worker.h"
// Create fixed number of worker threads, all waiting on a single queue // Create fixed number of worker threads, all waiting on a single queue
// of work items (WORKQUEUE). // of work items (WORKQUEUE).
...@@ -21,15 +20,13 @@ void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr) { ...@@ -21,15 +20,13 @@ void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_init(wq); workqueue_init(wq);
int nprocs = toku_os_get_number_active_processors(); int nprocs = toku_os_get_number_active_processors();
int nthreads = nprocs*2; int nthreads = nprocs*2;
threadpool_create(tpptr, nthreads); toku_thread_pool_create(tpptr, nthreads);
int i; toku_thread_pool_run(*tpptr, 0, &nthreads, toku_worker, wq);
for (i=0; i<nthreads; i++)
threadpool_maybe_add(*tpptr, toku_worker, wq);
} }
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) { void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()] workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()]
threadpool_destroy(tpptr); // wait for all of the worker threads to exit toku_thread_pool_destroy(tpptr); // wait for all of the worker threads to exit
workqueue_destroy(wq); workqueue_destroy(wq);
} }
......
...@@ -9,10 +9,9 @@ ...@@ -9,10 +9,9 @@
#include <errno.h> #include <errno.h>
#include "toku_assert.h" #include "toku_assert.h"
#include "toku_pthread.h" #include "toku_pthread.h"
#include "c_dialects.h"
#if defined(__cplusplus) || defined(__cilkplusplus) C_BEGIN
extern "C" {
#endif
struct workitem; struct workitem;
...@@ -203,9 +202,20 @@ static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) { ...@@ -203,9 +202,20 @@ static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
return r; return r;
} }
#if defined(__cplusplus) || defined(__cilkplusplus) #include "threadpool.h"
};
#endif // initialize the work queue and worker
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// destroy the work queue and worker
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
C_END
#endif #endif
...@@ -8,55 +8,67 @@ ...@@ -8,55 +8,67 @@
#include <toku_list.h> #include <toku_list.h>
#include <toku_pthread.h> #include <toku_pthread.h>
#if defined(__cplusplus) || defined(__cilkplusplus) #include "c_dialects.h"
extern "C" {
#endif
C_BEGIN
// the work struct is the base class for work to be done by some threads // The work struct is the base class for work to be done by some threads
struct work { struct work {
struct toku_list next; struct toku_list next;
}; };
// the workset struct contains the set of work to be done by some threads // The workset struct contains the set of work to be done by some threads
// the lock protects the work list
struct workset { struct workset {
toku_pthread_mutex_t lock; toku_pthread_mutex_t lock;
struct toku_list worklist; struct toku_list worklist; // a list of work
int refs; // number of workers that have a reference on the workset
toku_pthread_cond_t worker_wait; // a condition variable used to wait for all of the worker to release their reference on the workset
}; };
static inline void workset_init(struct workset *ws) { static inline void
int r = toku_pthread_mutex_init(&ws->lock, NULL); assert(r == 0); workset_init(struct workset *ws) {
int r;
r = toku_pthread_mutex_init(&ws->lock, NULL); invariant(r == 0);
toku_list_init(&ws->worklist); toku_list_init(&ws->worklist);
ws->refs = 1; // the calling thread gets a reference
r = toku_pthread_cond_init(&ws->worker_wait, NULL); invariant(r == 0);
} }
static inline void workset_destroy(struct workset *ws) { static inline void
assert(toku_list_empty(&ws->worklist)); workset_destroy(struct workset *ws) {
int r = toku_pthread_mutex_destroy(&ws->lock); assert(r == 0); invariant(toku_list_empty(&ws->worklist));
int r;
r = toku_pthread_cond_destroy(&ws->worker_wait); invariant(r == 0);
r = toku_pthread_mutex_destroy(&ws->lock); invariant(r == 0);
} }
static inline void workset_lock(struct workset *ws) { static inline void
int r = toku_pthread_mutex_lock(&ws->lock); assert(r == 0); workset_lock(struct workset *ws) {
int r = toku_pthread_mutex_lock(&ws->lock); invariant(r == 0);
} }
static inline void workset_unlock(struct workset *ws) { static inline void
int r = toku_pthread_mutex_unlock(&ws->lock); assert(r == 0); workset_unlock(struct workset *ws) {
int r = toku_pthread_mutex_unlock(&ws->lock); invariant(r == 0);
} }
// put work in the workset // Put work in the workset. Assume the workset is already locked.
static inline void workset_put(struct workset *ws, struct work *w) { static inline void
workset_lock(ws); workset_put_locked(struct workset *ws, struct work *w) {
toku_list_push(&ws->worklist, &w->next); toku_list_push(&ws->worklist, &w->next);
workset_unlock(ws);
} }
// put work in the workset. assume already locked. // Put work in the workset
static inline void workset_put_locked(struct workset *ws, struct work *w) { static inline void
toku_list_push(&ws->worklist, &w->next); workset_put(struct workset *ws, struct work *w) {
workset_lock(ws);
workset_put_locked(ws, w);
workset_unlock(ws);
} }
// get work from the workset // Get work from the workset
static inline struct work *workset_get(struct workset *ws) { static inline struct work *
workset_get(struct workset *ws) {
workset_lock(ws); workset_lock(ws);
struct work *w = NULL; struct work *w = NULL;
if (!toku_list_empty(&ws->worklist)) { if (!toku_list_empty(&ws->worklist)) {
...@@ -67,30 +79,34 @@ static inline struct work *workset_get(struct workset *ws) { ...@@ -67,30 +79,34 @@ static inline struct work *workset_get(struct workset *ws) {
return w; return w;
} }
// create a set of threads to run a given function // Add references to the workset
// tids will contain the thread id's of the created threads static inline void
// *ntids on input contains the number of threads requested, on output contains the number of threads created workset_add_ref(struct workset *ws, int refs) {
static inline void threadset_create(toku_pthread_t tids[], int *ntids, void *(*f)(void *arg), void *arg) { workset_lock(ws);
int n = *ntids; ws->refs += refs;
int i; workset_unlock(ws);
for (i = 0; i < n; i++) { }
int r = toku_pthread_create(&tids[i], NULL, f, arg);
if (r != 0) // Release a reference on the workset
break; static inline void
workset_release_ref(struct workset *ws) {
workset_lock(ws);
if (--ws->refs == 0) {
int r = toku_pthread_cond_broadcast(&ws->worker_wait); invariant(r == 0);
} }
*ntids = i; workset_unlock(ws);
} }
// join with a set of threads // Wait until all of the worker threads have released their reference on the workset
static inline void threadset_join(toku_pthread_t tids[], int ntids) { static inline void
for (int i = 0; i < ntids; i++) { workset_join(struct workset *ws) {
void *ret; workset_lock(ws);
int r = toku_pthread_join(tids[i], &ret); assert(r == 0); while (ws->refs != 0) {
int r = toku_pthread_cond_wait(&ws->worker_wait, &ws->lock); invariant(r == 0);
} }
workset_unlock(ws);
} }
#if defined(__cplusplus) || defined(__cilkplusplus) C_END
};
#endif
#endif #endif
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
#define C_BEGIN extern "C" { #define C_BEGIN extern "C" {
#define C_END }; #define C_END }
#else #else
#define C_BEGIN #define C_BEGIN
#define C_END #define C_END
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#if defined(__cilkplusplus) #if defined(__cilkplusplus)
#define CILK_BEGIN extern "Cilk++" { #define CILK_BEGIN extern "Cilk++" {
#define CILK_END }; #define CILK_END }
#else #else
#define CILK_BEGIN #define CILK_BEGIN
#define CILK_END #define CILK_END
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment