Commit 0f140383 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge tokudb.1183 to main. addresses #1183

git-svn-id: file:///svn/toku/tokudb@9061 c7de825b-a66e-492c-adef-691d508d4ae1
parent 25be31c7
...@@ -171,7 +171,7 @@ int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lo ...@@ -171,7 +171,7 @@ int toku_verify_brtnode (BRT brt, BLOCKNUM blocknum, bytevec lorange, ITEMLEN lo
struct check_increasing_arg ciarg = { brt , 0 }; struct check_increasing_arg ciarg = { brt , 0 };
toku_omt_iterate(node->u.l.buffer, check_increasing, &ciarg); toku_omt_iterate(node->u.l.buffer, check_increasing, &ciarg);
} }
if ((r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, 0, 0))) return r; if ((r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, CACHETABLE_CLEAN, 0))) return r;
return result; return result;
} }
......
...@@ -359,7 +359,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) { ...@@ -359,7 +359,7 @@ int toku_unpin_brtnode (BRT brt, BRTNODE node) {
// } // }
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(brt,node); VERIFY_NODE(brt,node);
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, node->dirty, brtnode_memory_size(node)); return toku_cachetable_unpin(brt->cf, node->thisnodename, node->fullhash, (enum cachetable_dirty) node->dirty, brtnode_memory_size(node));
} }
void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) { void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, LSN modified_lsn __attribute__((__unused__)) , BOOL rename_p __attribute__((__unused__))) {
...@@ -4281,7 +4281,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo ...@@ -4281,7 +4281,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
// printf(" (%d)%u ", len, *(int*)le_any_key(data))); // printf(" (%d)%u ", len, *(int*)le_any_key(data)));
fprintf(file, "\n"); fprintf(file, "\n");
} }
r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, 0, 0); r = toku_cachetable_unpin(brt->cf, blocknum, fullhash, CACHETABLE_CLEAN, 0);
assert(r==0); assert(r==0);
return result; return result;
} }
......
...@@ -35,10 +35,10 @@ static void cachetable_reader(WORKITEM); ...@@ -35,10 +35,10 @@ static void cachetable_reader(WORKITEM);
#endif #endif
enum ctpair_state { enum ctpair_state {
CTPAIR_INVALID = 0, // pair is invalid CTPAIR_INVALID = 0, // invalid
CTPAIR_IDLE = 1, // pair is in memory CTPAIR_IDLE = 1, // in memory
CTPAIR_READING = 2, // pair is being fetched into memory CTPAIR_READING = 2, // being read into memory
CTPAIR_WRITING = 3, // pair is being flushed from memory CTPAIR_WRITING = 3, // being written from memory
}; };
typedef struct ctpair *PAIR; typedef struct ctpair *PAIR;
...@@ -49,9 +49,9 @@ struct ctpair { ...@@ -49,9 +49,9 @@ struct ctpair {
void *value; void *value;
long size; long size;
enum ctpair_state state; enum ctpair_state state;
char dirty; enum cachetable_dirty dirty;
char verify_flag; // Used in verify_cachetable() char verify_flag; // Used in verify_cachetable()
char write_me; BOOL write_me;
u_int32_t fullhash; u_int32_t fullhash;
...@@ -81,20 +81,20 @@ static inline void ctpair_destroy(PAIR p) { ...@@ -81,20 +81,20 @@ static inline void ctpair_destroy(PAIR p) {
// The cachetable is as close to an ENV as we get. // The cachetable is as close to an ENV as we get.
struct cachetable { struct cachetable {
enum typ_tag tag; enum typ_tag tag;
u_int32_t n_in_table; u_int32_t n_in_table; // number of pairs in the hash table
u_int32_t table_size; u_int32_t table_size; // number of buckets in the hash table
PAIR *table; // hash table PAIR *table; // hash table
PAIR head,tail; // of LRU list. head is the most recently used. tail is least recently used. PAIR head,tail; // of LRU list. head is the most recently used. tail is least recently used.
CACHEFILE cachefiles; // list of cachefiles that use this cachetable CACHEFILE cachefiles; // list of cachefiles that use this cachetable
long size_current; // the sum of the sizes of the pairs in the cachetable long size_current; // the sum of the sizes of the pairs in the cachetable
long size_limit; // the limit to the sum of the pair sizes long size_limit; // the limit to the sum of the pair sizes
long size_writing; // the sum of the sizes of the pairs being written long size_writing; // the sum of the sizes of the pairs being written
LSN lsn_of_checkpoint; // the most recent checkpoint in the log. LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger; TOKULOGGER logger;
toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's
struct workqueue wq; // async work queue struct workqueue wq; // async work queue
THREADPOOL threadpool; // pool of worker threads THREADPOOL threadpool; // pool of worker threads
char checkpointing; // checkpoint in progress char checkpointing; // checkpoint in progress
}; };
// Lock the cachetable // Lock the cachetable
...@@ -253,7 +253,6 @@ void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_th ...@@ -253,7 +253,6 @@ void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_th
*n_threads = threadpool_get_current_threads(ct->threadpool); *n_threads = threadpool_get_current_threads(ct->threadpool);
} }
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) { int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
int r; int r;
struct fileid fileid; struct fileid fileid;
...@@ -299,7 +298,7 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) { ...@@ -299,7 +298,7 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
} }
} }
static int cachefile_write_maybe_remove (CACHETABLE, CACHEFILE cf, BOOL do_remove); static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf);
// Increment the reference count // Increment the reference count
void toku_cachefile_refup (CACHEFILE cf) { void toku_cachefile_refup (CACHEFILE cf) {
...@@ -313,23 +312,22 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -313,23 +312,22 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
assert(cf->refcount>0); assert(cf->refcount>0);
cf->refcount--; cf->refcount--;
if (cf->refcount==0) { if (cf->refcount==0) {
int r = 0; int r;
if ((r = cachefile_write_maybe_remove(ct, cf, TRUE))) { if ((r = cachetable_flush_cachefile(ct, cf))) {
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
if (cf->close_userdata) { if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata, error_string))) {
r = cf->close_userdata(cf, cf->userdata, error_string); cachetable_unlock(ct);
return r;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL; cf->checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct); cachetable_unlock(ct);
{ r = close(cf->fd);
int r2 = close(cf->fd); assert(r == 0);
assert(r2 == 0);
}
cf->fd = -1; cf->fd = -1;
if (logger) { if (logger) {
//assert(cf->fname); //assert(cf->fname);
...@@ -351,7 +349,7 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -351,7 +349,7 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
int toku_cachefile_flush (CACHEFILE cf) { int toku_cachefile_flush (CACHEFILE cf) {
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
int r = cachefile_write_maybe_remove(ct, cf, TRUE); int r = cachetable_flush_cachefile(ct, cf);
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
...@@ -459,6 +457,7 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) { ...@@ -459,6 +457,7 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
// Thus we need to rename it if it is dirty, // Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality) // if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality) // and the last time it was written was in a previous checkpoint regime (strict inequality)
static BOOL need_to_rename_p (CACHETABLE ct, PAIR p) { static BOOL need_to_rename_p (CACHETABLE ct, PAIR p) {
return (BOOL)(p->dirty return (BOOL)(p->dirty
&& p->modified_lsn.lsn>=ct->lsn_of_checkpoint.lsn // nonstrict && p->modified_lsn.lsn>=ct->lsn_of_checkpoint.lsn // nonstrict
...@@ -491,10 +490,20 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) { ...@@ -491,10 +490,20 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
if (ctpair_users(&p->rwlock) == 0) { if (ctpair_users(&p->rwlock) == 0) {
cachetable_remove_pair(ct, p); cachetable_remove_pair(ct, p);
// helgrind
CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
CACHEFILE cachefile = p->cachefile;
CACHEKEY key = p->key;
void *value = p->value;
void *extraargs = p->extraargs;
long size = p->size;
LSN lsn_of_checkpoint = ct->lsn_of_checkpoint;
BOOL need_to_rename = need_to_rename_p(ct, p);
cachetable_unlock(ct); cachetable_unlock(ct);
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, FALSE, FALSE, flush_callback(cachefile, key, value, extraargs, size, FALSE, FALSE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p)); lsn_of_checkpoint, need_to_rename);
cachetable_lock(ct); cachetable_lock(ct);
...@@ -502,32 +511,53 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) { ...@@ -502,32 +511,53 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
} }
} }
static void cachetable_abort_fetch_pair(CACHETABLE ct, PAIR p) {
cachetable_remove_pair(ct, p);
p->state = CTPAIR_INVALID;
ctpair_write_unlock(&p->rwlock);
if (ctpair_users(&p->rwlock) == 0)
ctpair_destroy(p);
}
// Read a pair from a cachefile into memory using the pair's fetch callback // Read a pair from a cachefile into memory using the pair's fetch callback
static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) { static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
// helgrind
CACHETABLE_FETCH_CALLBACK fetch_callback = p->fetch_callback;
CACHEKEY key = p->key;
u_int32_t fullhash = p->fullhash;
void *extraargs = p->extraargs;
void *toku_value = 0; void *toku_value = 0;
long size = 0; long size = 0;
LSN written_lsn = ZERO_LSN; LSN written_lsn = ZERO_LSN;
cachetable_unlock(ct);
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
int r = p->fetch_callback(cf, p->key, p->fullhash, &toku_value, &size, p->extraargs, &written_lsn);
cachetable_unlock(ct);
int r = fetch_callback(cf, key, fullhash, &toku_value, &size, extraargs, &written_lsn);
cachetable_lock(ct); cachetable_lock(ct);
if (r) { if (r) {
cachetable_remove_pair(ct, p); if (p->cq) {
p->state = CTPAIR_INVALID; workqueue_enq(p->cq, &p->asyncwork, 1);
ctpair_write_unlock(&p->rwlock); return r;
if (ctpair_users(&p->rwlock) == 0) }
ctpair_destroy(p); cachetable_abort_fetch_pair(ct, p);
return r; return r;
} }
lru_touch(ct, p); lru_touch(ct, p);
p->state = CTPAIR_IDLE;
p->value = toku_value; p->value = toku_value;
p->written_lsn = written_lsn; p->written_lsn = written_lsn;
p->size = size; p->size = size;
ct->size_current += size; ct->size_current += size;
if (p->cq) {
workqueue_enq(p->cq, &p->asyncwork, 1);
return 0;
}
p->state = CTPAIR_IDLE;
ctpair_write_unlock(&p->rwlock); ctpair_write_unlock(&p->rwlock);
return 0; return 0;
} }
...@@ -543,11 +573,21 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov ...@@ -543,11 +573,21 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov
static void cachetable_write_pair(CACHETABLE ct, PAIR p) { static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
ctpair_write_lock(&p->rwlock, ct->mutex); ctpair_write_lock(&p->rwlock, ct->mutex);
// helgrind
CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
CACHEFILE cachefile = p->cachefile;
CACHEKEY key = p->key;
void *value = p->value;
void *extraargs = p->extraargs;
long size = p->size;
BOOL dowrite = (BOOL)(p->dirty && p->write_me);
LSN lsn_of_checkpoint = ct->lsn_of_checkpoint;
BOOL need_to_rename = need_to_rename_p(ct, p);
cachetable_unlock(ct); cachetable_unlock(ct);
// write callback // write callback
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, (BOOL)(p->dirty && p->write_me), TRUE, flush_callback(cachefile, key, value, extraargs, size, dowrite, TRUE, lsn_of_checkpoint, need_to_rename);
ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
#if DO_CALLBACK_USLEEP #if DO_CALLBACK_USLEEP
usleep(DO_CALLBACK_USLEEP); usleep(DO_CALLBACK_USLEEP);
#endif #endif
...@@ -568,7 +608,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { ...@@ -568,7 +608,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
// the pair is no longer dirty once written // the pair is no longer dirty once written
if (p->dirty && p->write_me) if (p->dirty && p->write_me)
p->dirty = FALSE; p->dirty = CACHETABLE_CLEAN;
// stuff it into a completion queue for delayed completion if a completion queue exists // stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now // otherwise complete the write now
...@@ -601,15 +641,16 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov ...@@ -601,15 +641,16 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov
// flush and remove a pair from the cachetable. the callbacks are run by a thread in // flush and remove a pair from the cachetable. the callbacks are run by a thread in
// a thread pool. // a thread pool.
static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) { static void flush_and_maybe_remove (CACHETABLE ct, PAIR p, BOOL write_me) {
p->state = CTPAIR_WRITING; p->state = CTPAIR_WRITING;
ct->size_writing += p->size; assert(ct->size_writing >= 0); ct->size_writing += p->size; assert(ct->size_writing >= 0);
p->write_me = (char)(write_me?1:0); p->write_me = write_me;
#if DO_WORKER_THREAD #if DO_WORKER_THREAD
WORKITEM wi = &p->asyncwork; WORKITEM wi = &p->asyncwork;
workitem_init(wi, cachetable_writer, p); workitem_init(wi, cachetable_writer, p);
if (!p->dirty || !p->write_me) { // evictions without a write or unpinned paris that are clean
// evictions without a write can be run in the current thread // can be run in the current thread
if (!p->write_me || (!ctpair_pinned(&p->rwlock) && !p->dirty)) {
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
} else { } else {
workqueue_enq(&ct->wq, wi, 0); workqueue_enq(&ct->wq, wi, 0);
...@@ -632,8 +673,8 @@ static int maybe_flush_some (CACHETABLE ct, long size) { ...@@ -632,8 +673,8 @@ static int maybe_flush_some (CACHETABLE ct, long size) {
/* Try to remove one. */ /* Try to remove one. */
PAIR remove_me; PAIR remove_me;
for (remove_me = ct->tail; remove_me; remove_me = remove_me->prev) { for (remove_me = ct->tail; remove_me; remove_me = remove_me->prev) {
if (!ctpair_users(&remove_me->rwlock) && remove_me->state == CTPAIR_IDLE) { if (remove_me->state == CTPAIR_IDLE && !ctpair_users(&remove_me->rwlock)) {
flush_and_remove(ct, remove_me, 1); flush_and_maybe_remove(ct, remove_me, TRUE);
goto again; goto again;
} }
} }
...@@ -656,7 +697,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -656,7 +697,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs, void *extraargs,
int dirty, enum cachetable_dirty dirty,
LSN written_lsn) { LSN written_lsn) {
TAGMALLOC(PAIR, p); TAGMALLOC(PAIR, p);
assert(p); assert(p);
...@@ -665,7 +706,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct, ...@@ -665,7 +706,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
p->key = key; p->key = key;
p->value = value; p->value = value;
p->fullhash = fullhash; p->fullhash = fullhash;
p->dirty = (char)(dirty ? 1 : 0); //printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty); p->dirty = dirty;
p->size = size; p->size = size;
p->state = state; p->state = state;
p->flush_callback = flush_callback; p->flush_callback = flush_callback;
...@@ -820,7 +861,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -820,7 +861,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
} }
int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, int dirty, long size) { int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, enum cachetable_dirty dirty, long size) {
CACHETABLE ct = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
...@@ -833,7 +874,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -833,7 +874,7 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
assert(p->rwlock.pinned>0); assert(p->rwlock.pinned>0);
ctpair_read_unlock(&p->rwlock); ctpair_read_unlock(&p->rwlock);
if (dirty) p->dirty = TRUE; if (dirty) p->dirty = CACHETABLE_DIRTY;
if (size != 0) { if (size != 0) {
ct->size_current -= p->size; if (p->state == CTPAIR_WRITING) ct->size_writing -= p->size; ct->size_current -= p->size; if (p->state == CTPAIR_WRITING) ct->size_writing -= p->size;
p->size = size; p->size = size;
...@@ -985,13 +1026,17 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf ...@@ -985,13 +1026,17 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf
} }
} }
// Write all of the pairs associated with a cachefile to storage. Maybe remove // Flush all of the pairs that belong to a cachefile (or all pairs if
// these pairs from the cachetable after they have been written. // the cachefile is NULL.
static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_remove) {
unsigned nfound = 0; unsigned nfound = 0;
struct workqueue cq; struct workqueue cq;
workqueue_init(&cq); workqueue_init(&cq);
// find all of the pairs owned by a cachefile and redirect their completion
// to a completion queue. flush and remove pairs in the IDLE state if they
// are dirty. pairs in the READING or WRITING states are already in the
// work queue.
unsigned i; unsigned i;
for (i=0; i < ct->table_size; i++) { for (i=0; i < ct->table_size; i++) {
PAIR p; PAIR p;
...@@ -1000,21 +1045,28 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem ...@@ -1000,21 +1045,28 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem
nfound++; nfound++;
p->cq = &cq; p->cq = &cq;
if (p->state == CTPAIR_IDLE) if (p->state == CTPAIR_IDLE)
flush_and_remove(ct, p, 1); flush_and_maybe_remove(ct, p, TRUE);
} }
} }
} }
// wait for all of the pairs in the work queue to complete
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
cachetable_unlock(ct); cachetable_unlock(ct);
WORKITEM wi = 0; WORKITEM wi = 0;
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0); int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
PAIR p = workitem_arg(wi);
cachetable_lock(ct); cachetable_lock(ct);
cachetable_complete_write_pair(ct, p, do_remove); PAIR p = workitem_arg(wi);
p->cq = 0;
if (p->state == CTPAIR_READING)
cachetable_abort_fetch_pair(ct, p);
else if (p->state == CTPAIR_WRITING)
cachetable_complete_write_pair(ct, p, TRUE);
else
assert(0);
} }
workqueue_destroy(&cq); workqueue_destroy(&cq);
if (do_remove) assert_cachefile_is_flushed_and_removed(ct, cf);
assert_cachefile_is_flushed_and_removed(ct, cf);
if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4)) if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4))
cachetable_rehash(ct, ct->table_size/2); cachetable_rehash(ct, ct->table_size/2);
...@@ -1027,7 +1079,7 @@ int toku_cachetable_close (CACHETABLE *ctp) { ...@@ -1027,7 +1079,7 @@ int toku_cachetable_close (CACHETABLE *ctp) {
CACHETABLE ct=*ctp; CACHETABLE ct=*ctp;
int r; int r;
cachetable_lock(ct); cachetable_lock(ct);
if ((r=cachefile_write_maybe_remove(ct, 0, TRUE))) { if ((r=cachetable_flush_cachefile(ct, 0))) {
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
...@@ -1055,14 +1107,14 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) { ...@@ -1055,14 +1107,14 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = 0; // clear the dirty bit. We're just supposed to remove it. p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(p->rwlock.pinned==1); assert(p->rwlock.pinned==1);
ctpair_read_unlock(&p->rwlock); ctpair_read_unlock(&p->rwlock);
struct workqueue cq; struct workqueue cq;
workqueue_init(&cq); workqueue_init(&cq);
p->cq = &cq; p->cq = &cq;
if (p->state == CTPAIR_IDLE) if (p->state == CTPAIR_IDLE)
flush_and_remove(ct, p, 0); flush_and_maybe_remove(ct, p, FALSE);
cachetable_unlock(ct); cachetable_unlock(ct);
WORKITEM wi = 0; WORKITEM wi = 0;
r = workqueue_deq(&cq, &wi, 1); r = workqueue_deq(&cq, &wi, 1);
...@@ -1081,51 +1133,6 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) { ...@@ -1081,51 +1133,6 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
return r; return r;
} }
#if 0
static void flush_and_keep (PAIR flush_me) {
if (flush_me->dirty) {
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=1, 0)\n", __FILE__, __LINE__, flush_me->key, flush_me->value));
flush_me->flush_callback(flush_me->cachefile, flush_me->key, flush_me->value, flush_me->size, 1, 1);
flush_me->dirty=0;
}
}
static int cachetable_fsync_pairs (CACHETABLE ct, PAIR p) {
if (p) {
int r = cachetable_fsync_pairs(ct, p->hash_chain);
if (r!=0) return r;
flush_and_keep(p);
}
return 0;
}
int cachetable_fsync (CACHETABLE ct) {
int i;
int r;
for (i=0; i<ct->table_size; i++) {
r=cachetable_fsync_pairs(ct, ct->table[i]);
if (r!=0) return r;
}
return 0;
}
#endif
#if 0
int cachefile_pwrite (CACHEFILE cf, const void *buf, size_t count, toku_off_t offset) {
ssize_t r = pwrite(cf->fd, buf, count, offset);
if (r==-1) return errno;
assert((size_t)r==count);
return 0;
}
int cachefile_pread (CACHEFILE cf, void *buf, size_t count, toku_off_t offset) {
ssize_t r = pread(cf->fd, buf, count, offset);
if (r==-1) return errno;
if (r==0) return -1; /* No error for EOF ??? */
assert((size_t)r==count);
return 0;
}
#endif
int toku_cachetable_checkpoint (CACHETABLE ct) { int toku_cachetable_checkpoint (CACHETABLE ct) {
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then // Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.) // grab them (or else the unpinner has to do something.)
...@@ -1153,12 +1160,17 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1153,12 +1160,17 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
PAIR p; PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) { for (p = ct->table[i]; p; p=p->hash_chain) {
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn // p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
if (1) { if (p->state == CTPAIR_READING)
continue; // skip pairs being read as they will be clean
else if (p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING) {
nfound++; nfound++;
p->cq = &cq; p->cq = &cq;
// TODO force all IDLE pairs through the worker threads as that will
// serialize with any readers
if (p->state == CTPAIR_IDLE) if (p->state == CTPAIR_IDLE)
flush_and_remove(ct, p, 1); flush_and_maybe_remove(ct, p, TRUE);
} } else
assert(0);
} }
} }
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
...@@ -1167,6 +1179,7 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1167,6 +1179,7 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0); int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
cachetable_lock(ct); cachetable_lock(ct);
PAIR p = workitem_arg(wi); PAIR p = workitem_arg(wi);
assert(p->state == CTPAIR_IDLE || p->state == CTPAIR_WRITING);
cachetable_complete_write_pair(ct, p, FALSE); cachetable_complete_write_pair(ct, p, FALSE);
} }
...@@ -1213,7 +1226,9 @@ static void cachetable_reader(WORKITEM wi) { ...@@ -1213,7 +1226,9 @@ static void cachetable_reader(WORKITEM wi) {
PAIR p = workitem_arg(wi); PAIR p = workitem_arg(wi);
CACHETABLE ct = p->cachefile->cachetable; CACHETABLE ct = p->cachefile->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
cachetable_fetch_pair(ct, p->cachefile, p); int r = cachetable_fetch_pair(ct, p->cachefile, p);
if (r == 0)
maybe_flush_some(ct, 0);
cachetable_unlock(ct); cachetable_unlock(ct);
} }
...@@ -1266,7 +1281,7 @@ void toku_cachetable_print_state (CACHETABLE ct) { ...@@ -1266,7 +1281,7 @@ void toku_cachetable_print_state (CACHETABLE ct) {
if (p != 0) { if (p != 0) {
printf("t[%u]=", i); printf("t[%u]=", i);
for (p=ct->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
printf(" {%"PRId64", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, p->dirty, p->rwlock.pinned, p->size); printf(" {%"PRId64", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, p->rwlock.pinned, p->size);
} }
printf("\n"); printf("\n");
} }
...@@ -1317,7 +1332,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo ...@@ -1317,7 +1332,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
void void
toku_cachefile_set_userdata (CACHEFILE cf, toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
int (*close_userdata)(CACHEFILE, void*, char**/*error_string*/), int (*close_userdata)(CACHEFILE, void*, char**),
int (*checkpoint_userdata)(CACHEFILE, void*)) int (*checkpoint_userdata)(CACHEFILE, void*))
{ {
cf->userdata = userdata; cf->userdata = userdata;
......
...@@ -53,6 +53,7 @@ int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fn ...@@ -53,6 +53,7 @@ int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fn
// Get access to the asynchronous work queue // Get access to the asynchronous work queue
// Returns: a pointer to the work queue // Returns: a pointer to the work queue
WORKQUEUE toku_cachetable_get_workqueue (CACHETABLE); WORKQUEUE toku_cachetable_get_workqueue (CACHETABLE);
void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threads); void toku_cachefile_get_workqueue_load (CACHEFILE, int *n_in_queue, int *n_threads);
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable. // The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
...@@ -73,7 +74,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userd ...@@ -73,7 +74,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userd
// If userdata is already non-NULL, then we simply overwrite it. // If userdata is already non-NULL, then we simply overwrite it.
void *toku_cachefile_get_userdata(CACHEFILE); void *toku_cachefile_get_userdata(CACHEFILE);
// Effect: Get the user dataa. // Effect: Get the user data.
// Put a memory object into the cachetable. // Put a memory object into the cachetable.
// Effects: Lookup the key in the cachetable. If the key is not in the cachetable, // Effects: Lookup the key in the cachetable. If the key is not in the cachetable,
...@@ -103,8 +104,8 @@ int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, ...@@ -103,8 +104,8 @@ int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
// void**. If the item is not in memory, then return a nonzero error number. // void**. If the item is not in memory, then return a nonzero error number.
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**); int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// cachetable object state WRT external memory // cachetable pair clean or dirty WRT external memory
enum cachetable_object_state { enum cachetable_dirty {
CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile
CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile
}; };
...@@ -113,7 +114,7 @@ enum cachetable_object_state { ...@@ -113,7 +114,7 @@ enum cachetable_object_state {
// Effects: If the memory object is in the cachetable, then OR the dirty flag, // Effects: If the memory object is in the cachetable, then OR the dirty flag,
// update the size, and release the read lock on the memory object. // update the size, and release the read lock on the memory object.
// Returns: 0 if success, otherwise returns an error number. // Returns: 0 if success, otherwise returns an error number.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size); int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, enum cachetable_dirty dirty, long size);
int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing something already present is OK. */ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing something already present is OK. */
// Effect: Remove an object from the cachetable. Don't write it back. // Effect: Remove an object from the cachetable. Don't write it back.
......
...@@ -213,7 +213,7 @@ toku_recover_newbrtnode (LSN lsn, FILENUM filenum, BLOCKNUM blocknum,u_int32_t h ...@@ -213,7 +213,7 @@ toku_recover_newbrtnode (LSN lsn, FILENUM filenum, BLOCKNUM blocknum,u_int32_t h
VERIFY_COUNTS(n); VERIFY_COUNTS(n);
n->log_lsn = lsn; n->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, fullhash, 1, toku_serialize_brtnode_size(n)); r = toku_cachetable_unpin(pair->cf, blocknum, fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(n));
assert(r==0); assert(r==0);
} }
...@@ -249,7 +249,7 @@ static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILE ...@@ -249,7 +249,7 @@ static void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILE
assert(r==0); assert(r==0);
r = toku_fifo_deq(h->fifo); r = toku_fifo_deq(h->fifo);
assert(r==0); assert(r==0);
//r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, 1, 0); //r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, CACHETABLE_DIRTY, 0);
//assert(r==0); //assert(r==0);
} }
...@@ -270,7 +270,7 @@ toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, ...@@ -270,7 +270,7 @@ toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum,
struct brt_header *h=h_v; struct brt_header *h=h_v;
r = toku_fifo_enq(h->fifo, key.data, key.len, val.data, val.len, typ, xid); r = toku_fifo_enq(h->fifo, key.data, key.len, val.data, val.len, typ, xid);
assert(r==0); assert(r==0);
r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, 1, 0); r = toku_cachetable_unpin(pair->cf, header_blocknum, fullhash, CACHETABLE_DIRTY, 0);
assert(r==0); assert(r==0);
toku_free(key.data); toku_free(key.data);
toku_free(val.data); toku_free(val.data);
...@@ -297,7 +297,7 @@ toku_recover_brtdeq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t chil ...@@ -297,7 +297,7 @@ toku_recover_brtdeq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t chil
node->u.n.n_bytes_in_buffers -= sizediff; node->u.n.n_bytes_in_buffers -= sizediff;
BNC_NBYTESINBUF(node, childnum) -= sizediff; BNC_NBYTESINBUF(node, childnum) -= sizediff;
r = toku_fifo_deq(BNC_BUFFER(node, childnum)); // don't deq till were' done looking at the data. r = toku_fifo_deq(BNC_BUFFER(node, childnum)); // don't deq till were' done looking at the data.
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
...@@ -314,7 +314,7 @@ toku_recover_brtenq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t chil ...@@ -314,7 +314,7 @@ toku_recover_brtenq (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t chil
node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(typ, xid, key.data, key.len, data.data, data.len); node->local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(typ, xid, key.data, key.len, data.data, data.len);
node->log_lsn = lsn; node->log_lsn = lsn;
u_int32_t sizediff = key.len + data.len + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; u_int32_t sizediff = key.len + data.len + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
node->u.n.n_bytes_in_buffers += sizediff; node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff; BNC_NBYTESINBUF(node, childnum) += sizediff;
...@@ -348,7 +348,7 @@ toku_recover_addchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch ...@@ -348,7 +348,7 @@ toku_recover_addchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch
BNC_NBYTESINBUF(node, childnum) = 0; BNC_NBYTESINBUF(node, childnum) = 0;
node->u.n.n_children++; node->u.n.n_children++;
node->log_lsn = lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
...@@ -385,7 +385,7 @@ toku_recover_delchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch ...@@ -385,7 +385,7 @@ toku_recover_delchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch
node->u.n.n_children--; node->u.n.n_children--;
node->log_lsn = lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(pivotkey.data); toku_free(pivotkey.data);
} }
...@@ -406,7 +406,7 @@ toku_recover_setchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch ...@@ -406,7 +406,7 @@ toku_recover_setchild (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch
assert(childnum < (unsigned)node->u.n.n_children); assert(childnum < (unsigned)node->u.n.n_children);
BNC_BLOCKNUM(node, childnum) = newchild; BNC_BLOCKNUM(node, childnum) = newchild;
node->log_lsn = lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
static void static void
...@@ -429,7 +429,7 @@ toku_recover_setpivot (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch ...@@ -429,7 +429,7 @@ toku_recover_setpivot (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int32_t ch
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum]); node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[childnum]);
node->log_lsn = lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free(pivotkey.data); toku_free(pivotkey.data);
...@@ -452,7 +452,7 @@ toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, BLOCKNUM blocknum ...@@ -452,7 +452,7 @@ toku_recover_changechildfingerprint (LSN lsn, FILENUM filenum, BLOCKNUM blocknum
assert((signed)childnum <= node->u.n.n_children); // we allow the childnum to be one too large. assert((signed)childnum <= node->u.n.n_children); // we allow the childnum to be one too large.
BNC_SUBTREE_FINGERPRINT(node, childnum) = newfingerprint; BNC_SUBTREE_FINGERPRINT(node, childnum) = newfingerprint;
node->log_lsn = lsn; node->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
#endif #endif
...@@ -598,10 +598,10 @@ toku_recover_leafsplit (LSN lsn, FILENUM filenum, BLOCKNUM old_blocknum, BLOCKNU ...@@ -598,10 +598,10 @@ toku_recover_leafsplit (LSN lsn, FILENUM filenum, BLOCKNUM old_blocknum, BLOCKNU
toku_cachetable_put(pair->cf, new_blocknum, newn->fullhash, toku_cachetable_put(pair->cf, new_blocknum, newn->fullhash,
newn, toku_serialize_brtnode_size(newn), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0); newn, toku_serialize_brtnode_size(newn), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
newn->log_lsn = lsn; newn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, new_blocknum, newn->fullhash, 1, toku_serialize_brtnode_size(newn)); r = toku_cachetable_unpin(pair->cf, new_blocknum, newn->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(newn));
assert(r==0); assert(r==0);
oldn->log_lsn = lsn; oldn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, old_blocknum, oldn->fullhash, 1, toku_serialize_brtnode_size(oldn)); r = toku_cachetable_unpin(pair->cf, old_blocknum, oldn->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(oldn));
assert(r==0); assert(r==0);
} }
...@@ -630,7 +630,7 @@ toku_recover_insertleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int ...@@ -630,7 +630,7 @@ toku_recover_insertleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + leafentry_disksize(newleafentry); node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + leafentry_disksize(newleafentry);
node->local_fingerprint += node->rand4fingerprint * toku_le_crc(newleafentry); node->local_fingerprint += node->rand4fingerprint * toku_le_crc(newleafentry);
} }
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
toku_free_LEAFENTRY(newleafentry); toku_free_LEAFENTRY(newleafentry);
} }
...@@ -663,7 +663,7 @@ toku_recover_deleteleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int ...@@ -663,7 +663,7 @@ toku_recover_deleteleafentry (LSN lsn, FILENUM filenum, BLOCKNUM blocknum, u_int
r = toku_omt_delete_at(node->u.l.buffer, idx); r = toku_omt_delete_at(node->u.l.buffer, idx);
assert(r==0); assert(r==0);
} }
r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(pair->cf, blocknum, node->fullhash, CACHETABLE_DIRTY, toku_serialize_brtnode_size(node));
assert(r==0); assert(r==0);
} }
......
...@@ -45,6 +45,15 @@ REGRESSION_TESTS_RAW = \ ...@@ -45,6 +45,15 @@ REGRESSION_TESTS_RAW = \
cachetable-count-pinned-test \ cachetable-count-pinned-test \
cachetable-debug-test \ cachetable-debug-test \
cachetable-debug-test \ cachetable-debug-test \
cachetable-checkpoint-test \
cachetable-prefetch-maybegetandpin-test \
cachetable-prefetch2-test \
cachetable-prefetch-close-test \
cachetable-prefetch-close-fail-test \
cachetable-prefetch-getandpin-test \
cachetable-prefetch-getandpin-fail-test \
cachetable-prefetch-checkpoint-test \
cachetable-prefetch-flowcontrol-test \
fifo-test \ fifo-test \
list-test \ list-test \
keyrange \ keyrange \
......
...@@ -5,28 +5,33 @@ ...@@ -5,28 +5,33 @@
#include "test.h" #include "test.h"
#include "cachetable.h" #include "cachetable.h"
const int item_size = 1; static const int item_size = 1;
int n_flush, n_write_me, n_keep_me, n_fetch; static int n_flush, n_write_me, n_keep_me, n_fetch;
void flush(CACHEFILE cf, CACHEKEY key, void *value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p) { static void flush(CACHEFILE cf, CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p) {
cf = cf; modified_lsn = modified_lsn; rename_p = rename_p; cf = cf; key = key; value = value; extraargs = extraargs; modified_lsn = modified_lsn; rename_p = rename_p;
assert(key == (CACHEKEY)(long)value); // assert(key == make_blocknum((long)value));
assert(size == item_size); assert(size == item_size);
n_flush++; n_flush++;
if (write_me) n_write_me++; if (write_me) n_write_me++;
if (keep_me) n_keep_me++; if (keep_me) n_keep_me++;
} }
int fetch() { static int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn) {
cf = cf; key = key; fullhash = fullhash; value = value; sizep = sizep; extraargs = extraargs; written_lsn = written_lsn;
assert(0); // should not be called
n_fetch++; n_fetch++;
*value = 0;
*sizep = item_size;
return 0; return 0;
} }
// put n items into the cachetable, maybe mark them dirty, do a checkpoint, and // put n items into the cachetable, maybe mark them dirty, do a checkpoint, and
// verify that all of the items have been written and are clean. // verify that all of the items have been written and are clean.
void cachetable_checkpoint_test(int n, int dirty) { static void cachetable_checkpoint_test(int n, enum cachetable_dirty dirty) {
if (verbose) printf("%s:%d n=%d dirty=%d\n", __FUNCTION__, __LINE__, n, (int) dirty);
const int test_limit = n; const int test_limit = n;
int r; int r;
CACHETABLE ct; CACHETABLE ct;
...@@ -39,18 +44,19 @@ void cachetable_checkpoint_test(int n, int dirty) { ...@@ -39,18 +44,19 @@ void cachetable_checkpoint_test(int n, int dirty) {
// insert items into the cachetable. all should be dirty // insert items into the cachetable. all should be dirty
int i; int i;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
u_int32_t hi = toku_cachetable_hash(f1, i); CACHEKEY key = make_blocknum(i);
r = toku_cachetable_put(f1, i, hi, (void *)(long)i, 1, flush, fetch, 0); u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_unpin(f1, i, hi, dirty, item_size); r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
assert(r == 0); assert(r == 0);
void *v; void *v;
int its_dirty; int its_dirty;
long long its_pin; long long its_pin;
long its_size; long its_size;
r = toku_cachetable_get_key_state(ct, i, f1, &v, &its_dirty, &its_pin, &its_size); r = toku_cachetable_get_key_state(ct, key, f1, &v, &its_dirty, &its_pin, &its_size);
if (r != 0) if (r != 0)
continue; continue;
assert(its_dirty == CACHETABLE_DIRTY); assert(its_dirty == CACHETABLE_DIRTY);
...@@ -67,18 +73,19 @@ void cachetable_checkpoint_test(int n, int dirty) { ...@@ -67,18 +73,19 @@ void cachetable_checkpoint_test(int n, int dirty) {
// after the checkpoint, all of the items should be clean // after the checkpoint, all of the items should be clean
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
u_int32_t hi = toku_cachetable_hash(f1, i); CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key);
void *v; void *v;
r = toku_cachetable_maybe_get_and_pin(f1, i, hi, &v); r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v);
if (r != 0) if (r != 0)
continue; continue;
r = toku_cachetable_unpin(f1, i, hi, CACHETABLE_CLEAN, item_size); r = toku_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, item_size);
assert(r == 0); assert(r == 0);
int its_dirty; int its_dirty;
long long its_pin; long long its_pin;
long its_size; long its_size;
r = toku_cachetable_get_key_state(ct, i, f1, &v, &its_dirty, &its_pin, &its_size); r = toku_cachetable_get_key_state(ct, key, f1, &v, &its_dirty, &its_pin, &its_size);
if (r != 0) if (r != 0)
continue; continue;
assert(its_dirty == CACHETABLE_CLEAN); assert(its_dirty == CACHETABLE_CLEAN);
...@@ -93,7 +100,7 @@ void cachetable_checkpoint_test(int n, int dirty) { ...@@ -93,7 +100,7 @@ void cachetable_checkpoint_test(int n, int dirty) {
assert(r == 0); assert(r == 0);
assert(n_flush == n && n_write_me == 0 && n_keep_me == n); assert(n_flush == n && n_write_me == 0 && n_keep_me == n);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that the cache table checkpoint with prefetched blocks active works.
// the blocks in the reading state should be ignored.
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include "test.h"
#include "cachetable.h"
const int item_size = 1;
int n_flush, n_write_me, n_keep_me, n_fetch;
static void flush(CACHEFILE cf, CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p) {
cf = cf; key = key; value = value; extraargs = extraargs; modified_lsn = modified_lsn; rename_p = rename_p;
// assert(key == make_blocknum((long)value));
assert(size == item_size);
n_flush++;
if (write_me) n_write_me++;
if (keep_me) n_keep_me++;
}
static int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn) {
cf = cf; key = key; fullhash = fullhash; value = value; sizep = sizep; extraargs = extraargs; written_lsn = written_lsn;
n_fetch++;
sleep(10);
*value = 0;
*sizep = item_size;
return 0;
}
// put n items into the cachetable, maybe mark them dirty, do a checkpoint, and
// verify that all of the items have been written and are clean.
static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dirty) {
if (verbose) printf("%s:%d n=%d dirty=%d\n", __FUNCTION__, __LINE__, n, (int) dirty);
const int test_limit = n;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block n+1. this will take 10 seconds.
{
CACHEKEY key = make_blocknum(n+1);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
}
// insert items into the cachetable. all should be dirty
int i;
for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key);
r = toku_cachetable_put(f1, key, hi, (void *)(long)i, 1, flush, fetch, 0);
assert(r == 0);
r = toku_cachetable_unpin(f1, key, hi, dirty, item_size);
assert(r == 0);
void *v;
int its_dirty;
long long its_pin;
long its_size;
r = toku_cachetable_get_key_state(ct, key, f1, &v, &its_dirty, &its_pin, &its_size);
if (r != 0)
continue;
assert(its_dirty == CACHETABLE_DIRTY);
assert(its_pin == 0);
assert(its_size == item_size);
}
// the checkpoint should cause n writes, but since n <= the cachetable size,
// all items should be kept in the cachetable
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_cachetable_checkpoint(ct);
assert(r == 0);
assert(n_flush == n && n_write_me == n && n_keep_me == n);
// after the checkpoint, all of the items should be clean
for (i=0; i<n; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t hi = toku_cachetable_hash(f1, key);
void *v;
r = toku_cachetable_maybe_get_and_pin(f1, key, hi, &v);
if (r != 0)
continue;
r = toku_cachetable_unpin(f1, key, hi, CACHETABLE_CLEAN, item_size);
assert(r == 0);
int its_dirty;
long long its_pin;
long its_size;
r = toku_cachetable_get_key_state(ct, key, f1, &v, &its_dirty, &its_pin, &its_size);
if (r != 0)
continue;
assert(its_dirty == CACHETABLE_CLEAN);
assert(its_pin == 0);
assert(its_size == item_size);
}
// a subsequent checkpoint should cause n flushes, but no writes since all
// of the items are clean
n_flush = n_write_me = n_keep_me = n_fetch = 0;
r = toku_cachetable_checkpoint(ct);
assert(r == 0);
assert(n_flush == n && n_write_me == 0 && n_keep_me == n);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
verbose++;
continue;
}
}
for (i=0; i<8; i++) {
cachetable_prefetch_checkpoint_test(i, CACHETABLE_CLEAN);
cachetable_prefetch_checkpoint_test(i, CACHETABLE_DIRTY);
}
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that closing the cachetable with prefetches in progress works
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int fetch_calls = 0;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
fetch_calls++;
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return -42;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
// all of the reads and writes are complete.
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that closing the cachetable with prefetches in progress works
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int fetch_calls = 0;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
fetch_calls++;
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
// all of the reads and writes are complete.
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that cachetable prefetch multiple blocks hits the cachetable size limit
// and flushes eventually happen.
#include "includes.h"
#include "test.h"
static int flush_calls = 0;
static int flush_evict_calls = 0;
static int evicted_keys = 0;
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
flush_calls++;
if (keep == FALSE) {
flush_evict_calls++;
if (verbose) printf("%s:%d flush %"PRId64"\n", __FUNCTION__, __LINE__, k.b);
evicted_keys |= 1 << k.b;
}
}
static int fetch_calls = 0;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
fetch_calls++;
if (verbose) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, k.b);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
// Note: cachetable_size_limit must be a power of 2
static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, cachetable_size_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
int i;
// prefetch keys 0 .. N-1. they should all fit in the cachetable
for (i=0; i<cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
}
// wait for all of the blocks to be fetched
sleep(10);
// prefetch keys N .. 2*N-1. 0 .. N-1 should be evicted.
for (i=i; i<2*cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// sleep(1);
}
// wait for everything to finish
sleep(10);
assert(flush_evict_calls == cachetable_size_limit);
assert(evicted_keys == (1 << cachetable_size_limit)-1);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
if (verbose) printf("%s:%d 0x%x 0x%x\n", __FUNCTION__, __LINE__,
evicted_keys, (1 << (2*cachetable_size_limit))-1);
assert(evicted_keys == (1 << (2*cachetable_size_limit))-1);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_flowcontrol_test(8);
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
/* verify that get_and_pin waits while a prefetch block is pending */
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return -42;
}
static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) {
uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec;
t -= tstart->tv_sec * 1000000 + tstart->tv_usec;
return t;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
struct timeval tstart;
gettimeofday(&tstart, NULL);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
void *v = 0;
long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, NULL);
assert(r != 0);
struct timeval tend;
gettimeofday(&tend, NULL);
assert(tdelta_usec(&tend, &tstart) >= 10000000);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
/* verify that get_and_pin waits while a prefetch block is pending */
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
static uint64_t tdelta_usec(struct timeval *tend, struct timeval *tstart) {
uint64_t t = tend->tv_sec * 1000000 + tend->tv_usec;
t -= tstart->tv_sec * 1000000 + tstart->tv_usec;
return t;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
struct timeval tstart;
gettimeofday(&tstart, NULL);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
void *v = 0;
long size = 0;
r = toku_cachetable_get_and_pin(f1, key, fullhash, &v, &size, flush, fetch, NULL);
assert(r == 0 && v == 0 && size == 1);
struct timeval tend;
gettimeofday(&tend, NULL);
assert(tdelta_usec(&tend, &tstart) >= 10000000);
toku_cachetable_verify(ct);
r = toku_cachetable_unpin(f1, key, fullhash, CACHETABLE_CLEAN, 1);
assert(r == 0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
/* verify that maybe_get_and_pin returns an error while a prefetch block is pending */
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
int i;
for (i=1; i>=0; i++) {
void *v;
r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v);
if (r == 0) break;
toku_pthread_yield();
}
if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, i);
assert(i>1);
toku_cachetable_verify(ct);
r = toku_cachetable_unpin(f1, key, fullhash, CACHETABLE_CLEAN, 1);
assert(r == 0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that cachetable prefetch of the same block multiple times only
// fetches the block once.
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE);
}
static int fetch_calls = 0;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
fetch_calls++;
sleep(10);
*value = 0;
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
static void cachetable_prefetch_maybegetandpin_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// prefetch again. this should do nothing.
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
int i;
for (i=1; i>=0; i++) {
void *v;
r = toku_cachetable_maybe_get_and_pin(f1, key, fullhash, &v);
if (r == 0) break;
toku_pthread_yield();
}
if (verbose) printf("%s:%d %d\n", __FUNCTION__, __LINE__, i);
assert(i>1);
toku_cachetable_verify(ct);
// there should only be 1 fetch callback
assert(fetch_calls == 1);
r = toku_cachetable_unpin(f1, key, fullhash, CACHETABLE_CLEAN, 1);
assert(r == 0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_maybegetandpin_test();
return 0;
}
...@@ -67,7 +67,7 @@ static void writeit (void) { ...@@ -67,7 +67,7 @@ static void writeit (void) {
int j; int j;
for (j=0; j<BLOCKSIZE; j++) ((char*)buf)[j]=(char)((i+j)%256); for (j=0; j<BLOCKSIZE; j++) ((char*)buf)[j]=(char)((i+j)%256);
r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, f_fetch, 0); assert(r==0); r = toku_cachetable_put(f, key, fullhash, buf, BLOCKSIZE, f_flush, f_fetch, 0); assert(r==0);
r = toku_cachetable_unpin(f, key, fullhash, 0, BLOCKSIZE); assert(r==0); r = toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
} }
gettimeofday(&end, 0); gettimeofday(&end, 0);
double diff = tdiff(&end, &start); double diff = tdiff(&end, &start);
...@@ -87,7 +87,7 @@ static void readit (void) { ...@@ -87,7 +87,7 @@ static void readit (void) {
CACHEKEY key = make_blocknum(i*BLOCKSIZE); CACHEKEY key = make_blocknum(i*BLOCKSIZE);
u_int32_t fullhash = toku_cachetable_hash(f, key); u_int32_t fullhash = toku_cachetable_hash(f, key);
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, 0); assert(r==0); r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, 0); assert(r==0);
r=toku_cachetable_unpin(f, key, fullhash, 0, BLOCKSIZE); assert(r==0); r=toku_cachetable_unpin(f, key, fullhash, CACHETABLE_CLEAN, BLOCKSIZE); assert(r==0);
} }
r = toku_cachefile_close(&f, 0, 0); assert(r == 0); r = toku_cachefile_close(&f, 0, 0); assert(r == 0);
r = toku_cachetable_close(&t); assert(r == 0); r = toku_cachetable_close(&t); assert(r == 0);
......
...@@ -29,7 +29,7 @@ static inline void test_mutex_unlock() { ...@@ -29,7 +29,7 @@ static inline void test_mutex_unlock() {
// hook my_malloc_always_fails into malloc to control malloc and verify // hook my_malloc_always_fails into malloc to control malloc and verify
// the correct recovery from malloc failures // the correct recovery from malloc failures
#if defined(__linux__) #if defined(__linux__)
#define DO_MALLOC_HOOK 0 #define DO_MALLOC_HOOK 1
#else #else
#define DO_MALLOC_HOOK 0 #define DO_MALLOC_HOOK 0
#endif #endif
...@@ -334,19 +334,19 @@ static void test_nested_pin (void) { ...@@ -334,19 +334,19 @@ static void test_nested_pin (void) {
assert(r==0); assert(r==0);
assert(vv==&i0); assert(vv==&i0);
assert(i0==0); assert(i0==0);
r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, 0, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, &vv2); r = toku_cachetable_maybe_get_and_pin(f, make_blocknum(1), f1hash, &vv2);
assert(r==0); assert(r==0);
assert(vv2==vv); assert(vv2==vv);
r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, 0, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
u_int32_t f2hash = toku_cachetable_hash(f, make_blocknum(2)); u_int32_t f2hash = toku_cachetable_hash(f, make_blocknum(2));
r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, fetch_n, f2); r = toku_cachetable_put(f, make_blocknum(2), f2hash, &i1, test_object_size, flush_n, fetch_n, f2);
assert(r==0); // The other one is pinned, but now the cachetable fails gracefully: It allows the pin to happen assert(r==0); // The other one is pinned, but now the cachetable fails gracefully: It allows the pin to happen
r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, 0, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(1), f1hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
r = toku_cachetable_unpin(f, make_blocknum(2), f2hash, 0, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(2), f2hash, CACHETABLE_CLEAN, test_object_size);
assert(r==0); assert(r==0);
// toku_os_usleep(1*1000000); // toku_os_usleep(1*1000000);
r = toku_cachefile_close(&f, 0, 0); assert(r==0); r = toku_cachefile_close(&f, 0, 0); assert(r==0);
......
...@@ -63,7 +63,7 @@ my_thread_f (void *arg) { ...@@ -63,7 +63,7 @@ my_thread_f (void *arg) {
} }
#if defined(__linux__) #if defined(__linux__)
#define DO_MALLOC_HOOK 0 #define DO_MALLOC_HOOK 1
#else #else
#define DO_MALLOC_HOOK 0 #define DO_MALLOC_HOOK 0
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment