Commit d33726aa authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge the 1458 and 1465 branches to main. addresses #1458, #1465

git-svn-id: file:///svn/toku/tokudb@9363 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3a93d259
......@@ -530,9 +530,7 @@ static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
}
}
static void cachetable_abort_fetch_pair(CACHETABLE ct, PAIR p) {
cachetable_remove_pair(ct, p);
p->state = CTPAIR_INVALID;
static void abort_fetch_pair(PAIR p) {
ctpair_write_unlock(&p->rwlock);
if (ctpair_users(&p->rwlock) == 0)
ctpair_destroy(p);
......@@ -559,27 +557,29 @@ static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
cachetable_lock(ct);
if (r) {
cachetable_remove_pair(ct, p);
p->state = CTPAIR_INVALID;
if (p->cq) {
workqueue_enq(p->cq, &p->asyncwork, 1);
return r;
}
cachetable_abort_fetch_pair(ct, p);
abort_fetch_pair(p);
return r;
}
lru_touch(ct, p);
p->value = toku_value;
p->written_lsn = written_lsn;
p->size = size;
ct->size_current += size;
if (p->cq) {
workqueue_enq(p->cq, &p->asyncwork, 1);
} else {
lru_touch(ct, p);
p->value = toku_value;
p->written_lsn = written_lsn;
p->size = size;
ct->size_current += size;
if (p->cq) {
workqueue_enq(p->cq, &p->asyncwork, 1);
return 0;
}
p->state = CTPAIR_IDLE;
ctpair_write_unlock(&p->rwlock);
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
return 0;
}
p->state = CTPAIR_IDLE;
ctpair_write_unlock(&p->rwlock);
if (0) printf("%s:%d %"PRId64" complete\n", __FUNCTION__, __LINE__, key.b);
return 0;
}
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove);
......@@ -1081,11 +1081,14 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
cachetable_lock(ct);
PAIR p = workitem_arg(wi);
p->cq = 0;
if (p->state == CTPAIR_READING)
cachetable_abort_fetch_pair(ct, p);
else if (p->state == CTPAIR_WRITING)
if (p->state == CTPAIR_READING) {
ctpair_write_unlock(&p->rwlock);
cachetable_maybe_remove_and_free_pair(ct, p);
} else if (p->state == CTPAIR_WRITING) {
cachetable_complete_write_pair(ct, p, TRUE);
else
} else if (p->state == CTPAIR_INVALID) {
abort_fetch_pair(p);
} else
assert(0);
}
workqueue_destroy(&cq);
......@@ -1250,8 +1253,14 @@ static void cachetable_reader(WORKITEM wi) {
CACHETABLE ct = p->cachefile->cachetable;
cachetable_lock(ct);
int r = cachetable_fetch_pair(ct, p->cachefile, p);
if (r == 0)
#define DO_FLUSH_FROM_READER 0
if (r == 0) {
#if DO_FLUSH_FROM_READER
maybe_flush_some(ct, 0);
#else
r = r;
#endif
}
cachetable_unlock(ct);
}
......
......@@ -63,10 +63,10 @@ REGRESSION_TESTS_RAW = \
cachetable-prefetch2-test \
cachetable-prefetch-close-test \
cachetable-prefetch-close-fail-test \
cachetable-prefetch-close-leak-test \
cachetable-prefetch-getandpin-test \
cachetable-prefetch-getandpin-fail-test \
cachetable-prefetch-checkpoint-test \
cachetable-prefetch-flowcontrol-test \
fifo-test \
list-test \
keyrange \
......
/* -*- mode: C; c-basic-offset: 4 -*- */
// verify that closing the cachetable with an in progress prefetch works
#include "includes.h"
#include "test.h"
static void
flush (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *e __attribute__((__unused__)),
long s __attribute__((__unused__)),
BOOL w __attribute__((__unused__)),
BOOL keep __attribute__((__unused__)),
LSN m __attribute__((__unused__)),
BOOL r __attribute__((__unused__))
) {
assert(w == FALSE && v != NULL);
toku_free(v);
}
static int fetch_calls = 0;
static int
fetch (CACHEFILE f __attribute__((__unused__)),
CACHEKEY k __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
fetch_calls++;
sleep(10);
*value = toku_malloc(1);
*sizep = 1;
*written_lsn = ZERO_LSN;
return 0;
}
static void cachetable_prefetch_close_leak_test (void) {
const int test_limit = 1;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, 0);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
// all of the reads and writes are complete.
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
cachetable_prefetch_close_leak_test();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment