Commit 305a7f19 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1765, #1730, #1733, Merge 12079:12119 from 2.0.0 into main

git-svn-id: file:///svn/toku/tokudb@12121 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7c98e657
...@@ -183,6 +183,7 @@ struct cachefile { ...@@ -183,6 +183,7 @@ struct cachefile {
* because then we'd have to figure out if the transaction was already counted. If we simply use a count for * because then we'd have to figure out if the transaction was already counted. If we simply use a count for
* every record in the transaction, we'll be ok. Hence we use a 64-bit counter to make sure we don't run out. * every record in the transaction, we'll be ok. Hence we use a 64-bit counter to make sure we don't run out.
*/ */
BOOL is_closing; /* TRUE if a cachefile is being close/has been closed. */
int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */ int fd; /* Bug: If a file is opened read-only, then it is stuck in read-only. If it is opened read-write, then subsequent writers can write to it too. */
CACHETABLE cachetable; CACHETABLE cachetable;
struct fileid fileid; struct fileid fileid;
...@@ -299,7 +300,7 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char ...@@ -299,7 +300,7 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) { if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
//File is already open (and in cachetable as extant) //File is already open (and in cachetable as extant)
cachefile_refup(extant); cachefile_refup(extant);
if (extant->refcount==1) { if (extant->is_closing) {
// if another thread is closing this file, wait until the close is fully complete // if another thread is closing this file, wait until the close is fully complete
r = toku_pthread_cond_wait(&extant->openfd_wait, ct->mutex); r = toku_pthread_cond_wait(&extant->openfd_wait, ct->mutex);
assert(r == 0); assert(r == 0);
...@@ -430,6 +431,8 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -430,6 +431,8 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
//Checkpoint holds a reference, close should be impossible if still in use by a checkpoint. //Checkpoint holds a reference, close should be impossible if still in use by a checkpoint.
assert(!cf->next_in_checkpoint); assert(!cf->next_in_checkpoint);
assert(!cf->for_checkpoint); assert(!cf->for_checkpoint);
assert(!cf->is_closing);
cf->is_closing = TRUE; //Mark this cachefile so that no one will re-use it.
int r; int r;
// cachetable_flush_cachefile() may release and retake cachetable_lock, // cachetable_flush_cachefile() may release and retake cachetable_lock,
// allowing another thread to get into toku_cachetable_openfd() // allowing another thread to get into toku_cachetable_openfd()
...@@ -439,6 +442,8 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -439,6 +442,8 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
if (cf->refcount > 0) { if (cf->refcount > 0) {
int rs; int rs;
assert(cf->refcount == 1); // toku_cachetable_openfd() is single-threaded assert(cf->refcount == 1); // toku_cachetable_openfd() is single-threaded
assert(!cf->next_in_checkpoint); //checkpoint cannot run on a closing file
assert(!cf->for_checkpoint); //checkpoint cannot run on a closing file
rs = toku_pthread_cond_signal(&cf->openfd_wait); assert(rs == 0); rs = toku_pthread_cond_signal(&cf->openfd_wait); assert(rs == 0);
} }
// we can destroy the condition variable because if there was another thread waiting, it was already signalled // we can destroy the condition variable because if there was another thread waiting, it was already signalled
...@@ -1556,7 +1561,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1556,7 +1561,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
CACHEFILE cf; CACHEFILE cf;
assert(ct->cachefiles_in_checkpoint==NULL); assert(ct->cachefiles_in_checkpoint==NULL);
for (cf = ct->cachefiles; cf; cf=cf->next) { for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->refcount>0) { assert(!cf->is_closing); //Closing requires ydb lock (or in checkpoint). Cannot happen.
assert(cf->refcount>0); //Must have a reference if not closing.
//Incremement reference count of cachefile because we're using it for the checkpoint. //Incremement reference count of cachefile because we're using it for the checkpoint.
//This will prevent closing during the checkpoint. //This will prevent closing during the checkpoint.
cachefile_refup(cf); cachefile_refup(cf);
...@@ -1565,7 +1571,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1565,7 +1571,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
cf->for_checkpoint = TRUE; cf->for_checkpoint = TRUE;
} }
} }
}
if (logger) { if (logger) {
// The checkpoint must be performed after the lock is acquired. // The checkpoint must be performed after the lock is acquired.
......
...@@ -289,8 +289,12 @@ toku_rollback_rollinclude (BYTESTRING bs, ...@@ -289,8 +289,12 @@ toku_rollback_rollinclude (BYTESTRING bs,
} }
int int
toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v)) toku_rollback_tablelock_on_empty_table (FILENUM filenum,
TOKUTXN txn,
YIELDF yield,
void* yield_v)
{ {
yield(toku_checkpoint_safe_client_lock, yield_v);
// on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it. // on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it.
CACHEFILE cf; CACHEFILE cf;
...@@ -307,6 +311,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU( ...@@ -307,6 +311,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(
r = toku_brt_truncate(brt); r = toku_brt_truncate(brt);
assert(r==0); assert(r==0);
} }
toku_checkpoint_safe_client_unlock();
return r; return r;
} }
......
...@@ -5,9 +5,9 @@ ...@@ -5,9 +5,9 @@
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include <string.h>
#include <toku_portability.h> #include <toku_portability.h>
#include <string.h>
#include "toku_pthread.h" #include "toku_pthread.h"
#include "toku_assert.h" #include "toku_assert.h"
#include "memory.h" #include "memory.h"
......
...@@ -98,6 +98,7 @@ BDB_DONTRUN_TESTS = \ ...@@ -98,6 +98,7 @@ BDB_DONTRUN_TESTS = \
root_fifo_41 \ root_fifo_41 \
checkpoint_1 \ checkpoint_1 \
checkpoint_stress \ checkpoint_stress \
checkpoint_truncate_1 \
#\ ends prev line #\ ends prev line
# checkpoint tests depend on this header file, # checkpoint tests depend on this header file,
......
...@@ -112,60 +112,6 @@ thin_out(DB* db, int iter) { ...@@ -112,60 +112,6 @@ thin_out(DB* db, int iter) {
} }
// assert that correct values are in expected rows
static void
verify_sequential_rows(DB* compare_db, int64_t firstkey, int64_t numkeys) {
//This does not lock the dbs/grab table locks.
//This means that you CANNOT CALL THIS while another thread is modifying the db.
//You CAN call it while a txn is open however.
int rval = 0;
DB_TXN *compare_txn;
int r, r1;
assert(numkeys >= 1);
r = env->txn_begin(env, NULL, &compare_txn, DB_READ_UNCOMMITTED);
CKERR(r);
DBC *c1;
r = compare_db->cursor(compare_db, compare_txn, &c1, 0);
CKERR(r);
DBT key1, val1;
DBT key2, val2;
int64_t k, v;
dbt_init_realloc(&key1);
dbt_init_realloc(&val1);
dbt_init(&key2, &k, sizeof(k));
dbt_init(&val2, &v, sizeof(v));
k = firstkey;
v = generate_val(k);
r1 = c1->c_get(c1, &key2, &val2, DB_GET_BOTH);
CKERR(r1);
int64_t i;
for (i = 1; i<numkeys; i++) {
k = i + firstkey;
v = generate_val(k);
r1 = c1->c_get(c1, &key1, &val1, DB_NEXT);
assert(r1==0);
rval = verify_identical_dbts(&key1, &key2) |
verify_identical_dbts(&val1, &val2);
assert(rval == 0);
}
// now verify that there are no rows after the last expected
r1 = c1->c_get(c1, &key1, &val1, DB_NEXT);
assert(r1 == DB_NOTFOUND);
c1->c_close(c1);
if (key1.data) toku_free(key1.data);
if (val1.data) toku_free(val1.data);
compare_txn->commit(compare_txn, 0);
}
static void static void
drop_dead(void) { drop_dead(void) {
...@@ -268,7 +214,7 @@ run_test (int iter, int die) { ...@@ -268,7 +214,7 @@ run_test (int iter, int die) {
cachebytes = K256 * (iter + 1) - (128 * 1024); cachebytes = K256 * (iter + 1) - (128 * 1024);
if (cachebytes > max_windows_cachesize) if (cachebytes > max_windows_cachesize)
cachebytes = 0; cachebytes = 0;
if (iter & 1) cachebytes = 0; // every other iteration use default cachesize if (iter & 2) cachebytes = 0; // use default cachesize half the time
if (verbose) if (verbose)
printf("checkpoint_stress: iter = %d, cachesize (bytes) = 0x%08"PRIx64"\n", iter, cachebytes); printf("checkpoint_stress: iter = %d, cachesize (bytes) = 0x%08"PRIx64"\n", iter, cachebytes);
......
...@@ -246,6 +246,15 @@ db_replace(DICTIONARY d, DB_TXN *open_txn) { ...@@ -246,6 +246,15 @@ db_replace(DICTIONARY d, DB_TXN *open_txn) {
*d = temp; *d = temp;
} }
static void UU()
db_truncate(DB* db, DB_TXN *txn) {
u_int32_t row_count;
u_int32_t flags = 0;
int r = db->truncate(db, txn, &row_count, flags);
assert(row_count == 0);
CKERR(r);
}
static void UU() static void UU()
insert_random(DB *db1, DB *db2, DB_TXN *txn) { insert_random(DB *db1, DB *db2, DB_TXN *txn) {
int64_t k = random64(); int64_t k = random64();
...@@ -355,6 +364,61 @@ insert_n_fixed(DB *db1, DB *db2, DB_TXN *txn, int firstkey, int n) { ...@@ -355,6 +364,61 @@ insert_n_fixed(DB *db1, DB *db2, DB_TXN *txn, int firstkey, int n) {
} }
// assert that correct values are in expected rows
static void UU()
verify_sequential_rows(DB* compare_db, int64_t firstkey, int64_t numkeys) {
//This does not lock the dbs/grab table locks.
//This means that you CANNOT CALL THIS while another thread is modifying the db.
//You CAN call it while a txn is open however.
int rval = 0;
DB_TXN *compare_txn;
int r, r1;
assert(numkeys >= 1);
r = env->txn_begin(env, NULL, &compare_txn, DB_READ_UNCOMMITTED);
CKERR(r);
DBC *c1;
r = compare_db->cursor(compare_db, compare_txn, &c1, 0);
CKERR(r);
DBT key1, val1;
DBT key2, val2;
int64_t k, v;
dbt_init_realloc(&key1);
dbt_init_realloc(&val1);
dbt_init(&key2, &k, sizeof(k));
dbt_init(&val2, &v, sizeof(v));
k = firstkey;
v = generate_val(k);
r1 = c1->c_get(c1, &key2, &val2, DB_GET_BOTH);
CKERR(r1);
int64_t i;
for (i = 1; i<numkeys; i++) {
k = i + firstkey;
v = generate_val(k);
r1 = c1->c_get(c1, &key1, &val1, DB_NEXT);
assert(r1==0);
rval = verify_identical_dbts(&key1, &key2) |
verify_identical_dbts(&val1, &val2);
assert(rval == 0);
}
// now verify that there are no rows after the last expected
r1 = c1->c_get(c1, &key1, &val1, DB_NEXT);
assert(r1 == DB_NOTFOUND);
c1->c_close(c1);
if (key1.data) toku_free(key1.data);
if (val1.data) toku_free(val1.data);
compare_txn->commit(compare_txn, 0);
}
static void UU() static void UU()
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include <db.h>
#include <sys/stat.h>
#include "toku_pthread.h"
#include "test.h"
#include "checkpoint_test.h"
// Purpose of this test is to verify that truncate performed during checkpoint does not cause crash.
// Method:
// write two dictionaries, control and test
// take a checkpoint
// during the checkpoint (in the callback) trigger a second thread that will truncate test dictionary
// verify contents of control dictionary
//
// Potential improvements if necessary:
// make test more strenuous by checkpointing and truncating many dictionaries
// add random delays to alter timing per iteration
// verify that test dictionary does not exist on disk
// Only useful for single threaded testing,
// but can be accessed from checkpoint_callback.
static DICTIONARY test_dictionary = NULL;
static toku_pthread_t thread;
static int iter = 0;
static void
checkpoint_truncate_test(u_int32_t flags, u_int32_t n) {
void * ignore;
if (verbose) {
printf("%s(%s):%d, n=0x%03x, checkpoint=%01x, flags=0x%05x\n",
__FILE__, __FUNCTION__, __LINE__,
n, 1, flags);
printf("Verify that truncate done during checkpoint does not crash, iter = %d\n", iter);
fflush(stdout);
}
dir_create();
env_startup(0);
DICTIONARY_S db_control;
init_dictionary(&db_control, flags, "control");
DICTIONARY_S db_test;
init_dictionary(&db_test, flags, "test");
test_dictionary = &db_test;
db_startup(&db_test, NULL);
db_startup(&db_control, NULL);
int64_t firstkey = 0;
int64_t numkeys = n;
insert_n_fixed(db_test.db, db_control.db, NULL, firstkey, numkeys);
snapshot(&db_test, TRUE); // take checkpoint, truncate db_test during checkpoint callback
verify_sequential_rows(db_control.db, firstkey, numkeys);
pthread_join(thread, &ignore);
db_shutdown(&db_control);
env_shutdown();
}
// Purpose is to truncate test db while checkpoint is
// in progress.
void *
truncate_thread(void * extra) {
DICTIONARY d = *(DICTIONARY*) extra;
char name[MAX_NAME*2];
fill_name(d, name, sizeof(name));
// maybe insert random delay here if necessary
if (verbose) {
printf("truncating %s\n",
name);
fflush(stdout);
}
if (iter & 1)
pthread_yield(); // increase probability of collision by having some different timing
db_truncate(d->db, NULL);
return NULL;
}
void checkpoint_callback_1(void * extra) {
int r = toku_pthread_create(&thread, 0, truncate_thread, extra);
CKERR(r);
}
int
test_main (int argc, char *argv[]) {
int limit = 512;
parse_args(argc, argv);
db_env_set_checkpoint_callback(checkpoint_callback_1, &test_dictionary);
for (iter = 0; iter < limit; iter++) {
checkpoint_truncate_test(0, 16*1024+1);
}
db_env_set_checkpoint_callback(NULL, NULL);
return 0;
}
...@@ -3665,7 +3665,12 @@ static const DBT* toku_db_dbt_neg_infty(void) { ...@@ -3665,7 +3665,12 @@ static const DBT* toku_db_dbt_neg_infty(void) {
} }
static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) { static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
toku_ydb_lock(); int r = toku_db_truncate(db, txn, row_count, flags); toku_ydb_unlock(); return r; toku_checkpoint_safe_client_lock();
toku_ydb_lock();
int r = toku_db_truncate(db, txn, row_count, flags);
toku_ydb_unlock();
toku_checkpoint_safe_client_unlock();
return r;
} }
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment