Commit d51bc82a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Turn on higher compression (#1192), increase block size (#1168), and provide a...

Turn on higher compression (#1192), increase block size (#1168), and provide a way to measure workqueue fullness (#1399).

Fixes #1168, #1192, #1399.

{{{
svn merge -r8927:9039 https://svn.tokutek.com/tokudb/toku/tokudb.1399
}}}
and resolve conflicts.


git-svn-id: file:///svn/toku/tokudb@9040 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3b5808f0
......@@ -26,7 +26,7 @@ enum { BRT_CMD_OVERHEAD = (1 // the type
};
enum { LE_OVERHEAD_BOUND = 9 }; // the type and xid
enum { BRT_DEFAULT_NODE_SIZE = 1 << 20 };
enum { BRT_DEFAULT_NODE_SIZE = 1 << 22 };
struct nodeheader_in_file {
int n_in_buffer;
......@@ -183,7 +183,7 @@ struct brt {
};
/* serialization code */
int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h);
int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -208,7 +208,9 @@ enum { compression_header_len = (4 // compressed_len
+4 // uncompressed_len
) };
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) {
static inline void ignore_int (int UU(ignore_me)) {}
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads) {
struct wbuf w;
int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
......@@ -323,9 +325,22 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
buf[uncompressed_magic_len], buf[uncompressed_magic_len+1],
buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]);
{
#ifdef ADAPTIVE_COMPRESSION
// Marketing has expressed concern that this algorithm will make customers go crazy.
int compression_level;
if (n_workitems <= n_threads) compression_level = 5;
else if (n_workitems <= 2*n_threads) compression_level = 4;
else if (n_workitems <= 3*n_threads) compression_level = 3;
else if (n_workitems <= 4*n_threads) compression_level = 2;
else compression_level = 1;
#else
int compression_level = 5;
ignore_int(n_workitems); ignore_int(n_threads);
#endif
//printf("compress(%d) n_workitems=%d n_threads=%d\n", compression_level, n_workitems, n_threads);
int r = compress2(((Bytef*)compressed_buf)+uncompressed_magic_len + compression_header_len, &compressed_len,
((Bytef*)buf)+uncompressed_magic_len, calculated_size-uncompressed_magic_len,
1);
compression_level);
assert(r==Z_OK);
}
......
......@@ -378,7 +378,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
if (!h->panic) { // if the brt panicked, stop writing, otherwise try to write it.
int r = toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
int r = toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h, n_workitems, n_threads);
if (r) {
if (h->panic==0) {
char s[200];
......
......@@ -247,6 +247,13 @@ WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) {
return &ct->wq;
}
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) {
CACHETABLE ct = cf->cachetable;
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1);
*n_threads = threadpool_get_current_threads(ct->threadpool);
}
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
int r;
struct fileid fileid;
......
......@@ -52,7 +52,8 @@ int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fn
// Get access to the asynchronous work queue
// Returns: a pointer to the work queue
WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE);
WORKQUEUE toku_cachetable_get_workqueue (CACHETABLE);
void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threads);
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
// When write_me is true, the value should be written to storage.
......
......@@ -64,7 +64,7 @@ static void test_serialize(void) {
assert(b==4096);
}
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h); assert(r==0);
toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1); assert(r==0);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, brt_h);
assert(r==0);
......
......@@ -16,9 +16,10 @@
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_init(wq);
int nprocs = toku_os_get_number_active_processors();
threadpool_create(tpptr, nprocs);
int nthreads = nprocs*2;
threadpool_create(tpptr, nthreads);
int i;
for (i=0; i<nprocs; i++)
for (i=0; i<nthreads; i++)
threadpool_maybe_add(*tpptr, toku_worker, wq);
}
......
......@@ -42,13 +42,14 @@ static inline void *workitem_arg(WORKITEM wi) {
// divide the workqueue into per worker thread queues.
typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // list of workitem's
WORKITEM head, tail; // list of workitems
toku_pthread_mutex_t lock;
toku_pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
char closed; // kicks waiting threads off of the write queue
int n_in_queue; // count of how many workitems are in the queue.
};
// Get a pointer to the workqueue lock. This is used by workqueue client software
......@@ -80,6 +81,7 @@ static void workqueue_init(WORKQUEUE wq) {
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->closed = 0;
wq->n_in_queue = 0;
}
// Destroy a work queue
......@@ -120,6 +122,7 @@ static inline int workqueue_empty(WORKQUEUE wq) {
__attribute__((unused))
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
if (dolock) workqueue_lock(wq);
wq->n_in_queue++;
wi->next = 0;
if (wq->tail)
wq->tail->next = wi;
......@@ -141,6 +144,7 @@ static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
__attribute__((unused))
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
if (dolock) workqueue_lock(wq);
assert(wq->n_in_queue >= 0);
while (workqueue_empty(wq)) {
if (wq->closed) {
if (dolock) workqueue_unlock(wq);
......@@ -150,6 +154,7 @@ static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
int r = toku_pthread_cond_wait(&wq->wait_read, &wq->lock); assert(r == 0);
wq->want_read--;
}
wq->n_in_queue--;
WORKITEM wi = wq->head;
wq->head = wi->next;
if (wq->head == 0)
......@@ -182,4 +187,12 @@ static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
}
}
__attribute__((unused))
static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
int r = wq->n_in_queue;
if (dolock) workqueue_unlock(wq);
return r;
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment