Commit 407c145a authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 1469 to main, the cachetable evict from the main thread change. closes #1469, #1470

git-svn-id: file:///svn/toku/tokudb@9441 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4c28875d
...@@ -122,7 +122,7 @@ static void scanrace_lwc (void) { ...@@ -122,7 +122,7 @@ static void scanrace_lwc (void) {
r = db->cursor(db, tid, &dbc, 0); assert(r==0); r = db->cursor(db, tid, &dbc, 0); assert(r==0);
long rowcounter=0; long rowcounter=0;
while (0 == (r = dbc->c_getf_next(dbc, DB_PRELOCKED, counttotalbytes, &e))) { while (0 == (r = dbc->c_getf_next(dbc, DB_PRELOCKED, counttotalbytes, &e))) {
if (rowcounter%1024==0) { printf("."); fflush(stdout); } if (rowcounter%128==0) { printf("."); fflush(stdout); }
rowcounter++; rowcounter++;
if (limitcount>0 && rowcounter>=limitcount) break; if (limitcount>0 && rowcounter>=limitcount) break;
} }
......
...@@ -1688,7 +1688,7 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil ...@@ -1688,7 +1688,7 @@ static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int chil
if (BNC_NBYTESINBUF(node, childnum) == 0) { if (BNC_NBYTESINBUF(node, childnum) == 0) {
BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum); BLOCKNUM childblocknum = BNC_BLOCKNUM(node, childnum);
u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum); u_int32_t childfullhash = compute_child_fullhash(t->cf, node, childnum);
void *child_v; void *child_v = NULL;
int r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, childfullhash, &child_v); int r = toku_cachetable_maybe_get_and_pin(t->cf, childblocknum, childfullhash, &child_v);
if (r!=0) { if (r!=0) {
// It's not in main memory, so // It's not in main memory, so
......
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
#include "cachetable-rwlock.h" #include "cachetable-rwlock.h"
#include "toku_worker.h" #include "toku_worker.h"
#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
#error
#endif
// use worker threads 0->no 1->yes // use worker threads 0->no 1->yes
#define DO_WORKER_THREAD 1 #define DO_WORKER_THREAD 1
#if DO_WORKER_THREAD #if DO_WORKER_THREAD
...@@ -51,8 +55,10 @@ struct ctpair { ...@@ -51,8 +55,10 @@ struct ctpair {
long size; long size;
enum ctpair_state state; enum ctpair_state state;
enum cachetable_dirty dirty; enum cachetable_dirty dirty;
char verify_flag; // Used in verify_cachetable()
BOOL write_me; char verify_flag; // Used in verify_cachetable()
BOOL write_me; // write_pair
BOOL remove_me; // write_pair
u_int32_t fullhash; u_int32_t fullhash;
...@@ -60,7 +66,7 @@ struct ctpair { ...@@ -60,7 +66,7 @@ struct ctpair {
CACHETABLE_FETCH_CALLBACK fetch_callback; CACHETABLE_FETCH_CALLBACK fetch_callback;
void *extraargs; void *extraargs;
PAIR next,prev; // In LRU list. PAIR next,prev; // In LRU list.
PAIR hash_chain; PAIR hash_chain;
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty) LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
...@@ -635,7 +641,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { ...@@ -635,7 +641,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
if (p->cq) if (p->cq)
workqueue_enq(p->cq, &p->asyncwork, 1); workqueue_enq(p->cq, &p->asyncwork, 1);
else else
cachetable_complete_write_pair(ct, p, TRUE); cachetable_complete_write_pair(ct, p, p->remove_me);
} }
// complete the write of a pair by reseting the writing flag, adjusting the write // complete the write of a pair by reseting the writing flag, adjusting the write
...@@ -665,6 +671,7 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) { ...@@ -665,6 +671,7 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) {
p->state = CTPAIR_WRITING; p->state = CTPAIR_WRITING;
ct->size_writing += p->size; assert(ct->size_writing >= 0); ct->size_writing += p->size; assert(ct->size_writing >= 0);
p->write_me = write_me; p->write_me = write_me;
p->remove_me = TRUE;
#if DO_WORKER_THREAD #if DO_WORKER_THREAD
WORKITEM wi = &p->asyncwork; WORKITEM wi = &p->asyncwork;
workitem_init(wi, cachetable_writer, p); workitem_init(wi, cachetable_writer, p);
...@@ -673,6 +680,9 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) { ...@@ -673,6 +680,9 @@ static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) {
if (!p->write_me || (!ctpair_pinned(&p->rwlock) && !p->dirty)) { if (!p->write_me || (!ctpair_pinned(&p->rwlock) && !p->dirty)) {
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
} else { } else {
#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
p->remove_me = FALSE; // run the remove on the main thread
#endif
workqueue_enq(&ct->wq, wi, 0); workqueue_enq(&ct->wq, wi, 0);
} }
#else #else
...@@ -709,6 +719,12 @@ static int maybe_flush_some (CACHETABLE ct, long size) { ...@@ -709,6 +719,12 @@ static int maybe_flush_some (CACHETABLE ct, long size) {
return r; return r;
} }
void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
cachetable_lock(ct);
maybe_flush_some(ct, 0);
cachetable_unlock(ct);
}
static PAIR cachetable_insert_at(CACHETABLE ct, static PAIR cachetable_insert_at(CACHETABLE ct,
CACHEFILE cachefile, CACHEKEY key, void *value, CACHEFILE cachefile, CACHEKEY key, void *value,
enum ctpair_state state, enum ctpair_state state,
...@@ -863,6 +879,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -863,6 +879,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
int count = 0; int count = 0;
int r = -1;
cachetable_lock(ct); cachetable_lock(ct);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
...@@ -870,15 +887,14 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -870,15 +887,14 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
*value = p->value; *value = p->value;
ctpair_read_lock(&p->rwlock, ct->mutex); ctpair_read_lock(&p->rwlock, ct->mutex);
lru_touch(ct,p); lru_touch(ct,p);
cachetable_unlock(ct); r = 0;
note_hash_count(count);
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value); //printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
return 0; break;
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; return r;
} }
...@@ -888,6 +904,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -888,6 +904,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty); //printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
int count = 0; int count = 0;
int r = -1;
//assert(fullhash == toku_cachetable_hash(cachefile, key)); //assert(fullhash == toku_cachetable_hash(cachefile, key));
cachetable_lock(ct); cachetable_lock(ct);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
...@@ -903,20 +920,18 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -903,20 +920,18 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
} }
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned)); WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{ {
int r;
if ((r=maybe_flush_some(ct, 0))) { if ((r=maybe_flush_some(ct, 0))) {
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
} }
cachetable_unlock(ct); r = 0; // we found one
note_hash_count(count); break;
return 0;
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; return r;
} }
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
......
...@@ -216,6 +216,8 @@ int toku_graceful_delete(const char *db_fname); ...@@ -216,6 +216,8 @@ int toku_graceful_delete(const char *db_fname);
void toku_graceful_lock_init(void); void toku_graceful_lock_init(void);
void toku_graceful_lock_destroy(void); void toku_graceful_lock_destroy(void);
#define TOKU_CACHETABLE_DO_EVICT_FROM_WRITER 0
void toku_cachetable_maybe_flush_some(CACHETABLE ct);
#endif #endif
...@@ -26,6 +26,12 @@ static inline void test_mutex_unlock() { ...@@ -26,6 +26,12 @@ static inline void test_mutex_unlock() {
int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0); int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0);
} }
static void maybe_flush(CACHETABLE t) {
#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
toku_cachetable_maybe_flush_some(t);
#endif
}
enum { KEYLIMIT = 4, TRIALLIMIT=256000 }; enum { KEYLIMIT = 4, TRIALLIMIT=256000 };
static CACHEKEY keys[KEYLIMIT]; static CACHEKEY keys[KEYLIMIT];
static void* vals[KEYLIMIT]; static void* vals[KEYLIMIT];
...@@ -100,7 +106,7 @@ static void test_rename (void) { ...@@ -100,7 +106,7 @@ static void test_rename (void) {
test_mutex_lock(); test_mutex_lock();
while (n_keys >= KEYLIMIT) { while (n_keys >= KEYLIMIT) {
test_mutex_unlock(); test_mutex_unlock();
toku_pthread_yield(); toku_pthread_yield(); maybe_flush(t);
test_mutex_lock(); test_mutex_lock();
} }
assert(n_keys<KEYLIMIT); assert(n_keys<KEYLIMIT);
......
...@@ -155,6 +155,12 @@ static int fetch (CACHEFILE f, CACHEKEY key, u_int32_t fullhash __attribute__((_ ...@@ -155,6 +155,12 @@ static int fetch (CACHEFILE f, CACHEKEY key, u_int32_t fullhash __attribute__((_
return 0; return 0;
} }
static void maybe_flush(CACHETABLE t) {
#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
toku_cachetable_maybe_flush_some(t);
#endif
}
// verify that a sequence of cachetable operations causes a particular sequence of // verify that a sequence of cachetable operations causes a particular sequence of
// callbacks // callbacks
...@@ -218,7 +224,7 @@ static void test0 (void) { ...@@ -218,7 +224,7 @@ static void test0 (void) {
assert(r==0); assert(r==0);
test_mutex_lock(); test_mutex_lock();
while (expect_n_flushes != 0) { while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
} }
assert(expect_n_flushes==0); assert(expect_n_flushes==0);
test_mutex_unlock(); test_mutex_unlock();
...@@ -228,7 +234,7 @@ static void test0 (void) { ...@@ -228,7 +234,7 @@ static void test0 (void) {
assert(r==0); assert(r==0);
test_mutex_lock(); test_mutex_lock();
while (expect_n_flushes != 0) { while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
} }
assert(expect_n_flushes==0); assert(expect_n_flushes==0);
test_mutex_unlock(); test_mutex_unlock();
...@@ -260,7 +266,7 @@ static void test0 (void) { ...@@ -260,7 +266,7 @@ static void test0 (void) {
assert(strcmp(((struct item *)item_v)->something,"something")==0); assert(strcmp(((struct item *)item_v)->something,"something")==0);
test_mutex_lock(); test_mutex_lock();
while (expect_n_flushes != 0) { while (expect_n_flushes != 0) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); test_mutex_unlock(); toku_pthread_yield(); maybe_flush(t); test_mutex_lock();
} }
assert(expect_n_flushes==0); assert(expect_n_flushes==0);
test_mutex_unlock(); test_mutex_unlock();
...@@ -565,10 +571,13 @@ static void test_size_flush_callback(CACHEFILE f, ...@@ -565,10 +571,13 @@ static void test_size_flush_callback(CACHEFILE f,
BOOL rename_p __attribute__((__unused__))) { BOOL rename_p __attribute__((__unused__))) {
if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep); if (test_size_debug && verbose) printf("test_size_flush %p %" PRId64 " %p %ld %u %u\n", f, key.b, value, size, (unsigned)do_write, (unsigned)keep);
if (keep) { if (keep) {
assert(do_write != 0); if (do_write) {
test_mutex_lock(); test_mutex_lock();
test_size_flush_key = key; test_size_flush_key = key;
test_mutex_unlock(); test_mutex_unlock();
}
} else {
assert(!do_write);
} }
} }
...@@ -663,7 +672,7 @@ static void test_size_flush() { ...@@ -663,7 +672,7 @@ static void test_size_flush() {
int n_entries, hash_size; long size_current, size_limit; int n_entries, hash_size; long size_current, size_limit;
toku_cachetable_get_state(t, &n_entries, &hash_size, &size_current, &size_limit); toku_cachetable_get_state(t, &n_entries, &hash_size, &size_current, &size_limit);
while (n_entries != min2(i+1, n)) { while (n_entries != min2(i+1, n)) {
toku_pthread_yield(); toku_pthread_yield(); maybe_flush(t);
toku_cachetable_get_state(t, &n_entries, 0, 0, 0); toku_cachetable_get_state(t, &n_entries, 0, 0, 0);
} }
assert(n_entries == min2(i+1, n)); assert(n_entries == min2(i+1, n));
......
...@@ -26,6 +26,12 @@ static inline void test_mutex_unlock() { ...@@ -26,6 +26,12 @@ static inline void test_mutex_unlock() {
int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0); int r = toku_pthread_mutex_unlock(&test_mutex); assert(r == 0);
} }
static void maybe_flush(CACHETABLE t) {
#if !TOKU_CACHETABLE_DO_EVICT_FROM_WRITER
toku_cachetable_maybe_flush_some(t);
#endif
}
static const int test_object_size = 1; static const int test_object_size = 1;
static CACHETABLE ct; static CACHETABLE ct;
...@@ -47,10 +53,10 @@ static void print_ints(void) { ...@@ -47,10 +53,10 @@ static void print_ints(void) {
printf("}\n"); printf("}\n");
} }
static void item_becomes_present(CACHEFILE cf, CACHEKEY key) { static void item_becomes_present(CACHETABLE thect, CACHEFILE cf, CACHEKEY key) {
test_mutex_lock(); test_mutex_lock();
while (n_present >= N_PRESENT_LIMIT) { while (n_present >= N_PRESENT_LIMIT) {
test_mutex_unlock(); toku_pthread_yield(); test_mutex_lock(); test_mutex_unlock(); toku_pthread_yield(); maybe_flush(thect); test_mutex_lock();
} }
assert(n_present<N_PRESENT_LIMIT); assert(n_present<N_PRESENT_LIMIT);
present_items[n_present].cf = cf; present_items[n_present].cf = cf;
...@@ -159,7 +165,7 @@ static void test_chaining (void) { ...@@ -159,7 +165,7 @@ static void test_chaining (void) {
u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i)); u_int32_t fhash = toku_cachetable_hash(f[fnum], make_blocknum(i));
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i); r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i);
assert(r==0); assert(r==0);
item_becomes_present(f[fnum], make_blocknum(i)); item_becomes_present(ct, f[fnum], make_blocknum(i));
r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size); r = toku_cachetable_unpin(f[fnum], make_blocknum(i), fhash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
//print_ints(); //print_ints();
...@@ -204,7 +210,7 @@ static void test_chaining (void) { ...@@ -204,7 +210,7 @@ static void test_chaining (void) {
r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i); r = toku_cachetable_put(f[fnum], make_blocknum(i), fhash, (void*)i, test_object_size, flush_forchain, fetch_forchain, (void*)i);
assert(r==0 || r==-1); assert(r==0 || r==-1);
if (r==0) { if (r==0) {
item_becomes_present(f[fnum], make_blocknum(i)); item_becomes_present(ct, f[fnum], make_blocknum(i));
//print_ints(); //print_ints();
//cachetable_print_state(ct); //cachetable_print_state(ct);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment