Commit e6d021b3 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

close[t:4015] Fix #4015. {{{svn merge -r 37828:37890 ../tokudb.4015}}}....

close[t:4015] Fix #4015.  {{{svn merge -r 37828:37890 ../tokudb.4015}}}. (Still need to do the code review. Scheduled for tomorrow.)

git-svn-id: file:///svn/toku/tokudb@37892 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6cc79aee
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <brt-flusher.h> #include <brt-flusher.h>
#include <brt-cachetable-wrappers.h> #include <brt-cachetable-wrappers.h>
#include <brt-internal.h> #include <brt-internal.h>
#include <valgrind/drd.h>
#define ft_flush_before_applying_inbox 1 #define ft_flush_before_applying_inbox 1
#define ft_flush_before_child_pin 2 #define ft_flush_before_child_pin 2
......
...@@ -375,6 +375,8 @@ struct brt_header { ...@@ -375,6 +375,8 @@ struct brt_header {
unsigned int flags; unsigned int flags;
DESCRIPTOR_S descriptor; DESCRIPTOR_S descriptor;
int free_me_count; // Descriptors are freed asynchronously, so we cannot free() them.
void **free_me; // Instead we just accumulate them in this array. These are void* that we must free() later.
BLOCK_TABLE blocktable; BLOCK_TABLE blocktable;
// If a transaction created this BRT, which one? // If a transaction created this BRT, which one?
......
...@@ -112,6 +112,7 @@ Lookup: ...@@ -112,6 +112,7 @@ Lookup:
#include "sort.h" #include "sort.h"
#include <brt-cachetable-wrappers.h> #include <brt-cachetable-wrappers.h>
#include <brt-flusher.h> #include <brt-flusher.h>
#include <valgrind/drd.h>
#if defined(HAVE_CILK) #if defined(HAVE_CILK)
#include <cilk/cilk.h> #include <cilk/cilk.h>
...@@ -927,14 +928,17 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAI ...@@ -927,14 +928,17 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAI
} }
#define FAKE_DB(db, desc) struct __toku_db db = {.descriptor= desc} // Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer.
// In combination with the annotation in toku_update_descriptor, this seems to be enough to convince test_4015 that all is well.
// Otherwise, drd complains that the newly malloc'd descriptor string is touched later by some comparison operation.
#define FAKE_DB(db, desc_var, desc) DESCRIPTOR_S desc_var = *(desc); struct __toku_db db = {.descriptor= &desc_var}; ANNOTATE_HAPPENS_AFTER(&(desc)->dbt.data);
static int static int
leafval_heaviside_le (u_int32_t klen, void *kval, leafval_heaviside_le (u_int32_t klen, void *kval,
struct cmd_leafval_heaviside_extra *be) { struct cmd_leafval_heaviside_extra *be) {
DBT dbt; DBT dbt;
DBT const * const key = be->key; DBT const * const key = be->key;
FAKE_DB(db, be->desc); FAKE_DB(db, tmp_desc, be->desc);
return be->compare_fun(&db, return be->compare_fun(&db,
toku_fill_dbt(&dbt, kval, klen), toku_fill_dbt(&dbt, kval, klen),
key); key);
...@@ -957,7 +961,7 @@ brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, bytevec ...@@ -957,7 +961,7 @@ brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, bytevec
int r; int r;
DBT mydbt; DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck; struct kv_pair *kv = (struct kv_pair *) ck;
FAKE_DB(db, desc); FAKE_DB(db, tmp_desc, desc);
r = cmp(&db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv))); r = cmp(&db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
return r; return r;
} }
...@@ -1030,6 +1034,10 @@ brtheader_destroy(struct brt_header *h) { ...@@ -1030,6 +1034,10 @@ brtheader_destroy(struct brt_header *h) {
assert(h->type == BRTHEADER_CURRENT); assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable); toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data); if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data);
for (int i=0; i<h->free_me_count; i++) {
toku_free(h->free_me[i]);
}
toku_free(h->free_me);
} }
} }
...@@ -1387,7 +1395,7 @@ static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE b ...@@ -1387,7 +1395,7 @@ static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE b
struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, bn, cmd->msn, cmd->xids, struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, bn, cmd->msn, cmd->xids,
keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, 0, workdone}; keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, 0, workdone};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun() // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc); FAKE_DB(db, tmp_desc, desc);
int r = update_fun( int r = update_fun(
&db, &db,
keyp, keyp,
...@@ -1514,7 +1522,7 @@ brt_leaf_put_cmd ( ...@@ -1514,7 +1522,7 @@ brt_leaf_put_cmd (
DBT adbt; DBT adbt;
u_int32_t keylen; u_int32_t keylen;
void *keyp = le_key_and_len(storeddata, &keylen); void *keyp = le_key_and_len(storeddata, &keylen);
FAKE_DB(db, desc); FAKE_DB(db, tmp_desc, desc);
if (compare_fun(&db, if (compare_fun(&db,
toku_fill_dbt(&adbt, keyp, keylen), toku_fill_dbt(&adbt, keyp, keylen),
cmd->u.id.key) != 0) cmd->u.id.key) != 0)
...@@ -1625,7 +1633,7 @@ static inline int ...@@ -1625,7 +1633,7 @@ static inline int
key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
DESCRIPTOR descriptor, brt_compare_func key_cmp) DESCRIPTOR descriptor, brt_compare_func key_cmp)
{ {
FAKE_DB(db, descriptor); FAKE_DB(db, tmpdesc, descriptor);
int r = key_cmp(&db, a, b); int r = key_cmp(&db, a, b);
if (r == 0) { if (r == 0) {
r = (amsn.msn > bmsn.msn) - (amsn.msn < bmsn.msn); r = (amsn.msn > bmsn.msn) - (amsn.msn < bmsn.msn);
...@@ -2946,8 +2954,22 @@ verify_builtin_comparisons_consistent(BRT t, u_int32_t flags) { ...@@ -2946,8 +2954,22 @@ verify_builtin_comparisons_consistent(BRT t, u_int32_t flags) {
return 0; return 0;
} }
int int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd)
toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd) { // Effect: Change the descriptor in a tree (log the change, make sure it makes it to disk eventually).
// Updates to the descriptor must be performed while holding some sort of lock. (In the ydb layer
// there is a row lock on the directory that provides exclusion.)
// However, reads can occur concurrently.
// So the trickyness here is to update the size and data with atomic instructions.
// DRD ought to recognize if we do
// x = malloc();
// fill(x);
// atomic_set(y, x);
// and then another thread looks at
// *y
// then there's no race.
// So we tell drd that the newly mallocated memory was filled in before the assignment into dbt.data with a ANNOTATE_HAPPENS_BEFORE.
// The other half (the reads) are hacked in the FAKE_DB macro.
{
int r = 0; int r = 0;
DISKOFF offset; DISKOFF offset;
//4 for checksum //4 for checksum
...@@ -2956,11 +2978,27 @@ toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd) { ...@@ -2956,11 +2978,27 @@ toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd) {
if (r) { if (r) {
goto cleanup; goto cleanup;
} }
if (h->descriptor.dbt.data) { u_int32_t old_size = h->descriptor.dbt.size;
toku_free(h->descriptor.dbt.data); void *old_descriptor = h->descriptor.dbt.data;
void *new_descriptor = toku_memdup(d->dbt.data, d->dbt.size);
ANNOTATE_HAPPENS_BEFORE(&h->descriptor.dbt.data);
bool ok1 = __sync_bool_compare_and_swap(&h->descriptor.dbt.size, old_size, d->dbt.size);
bool ok2 = __sync_bool_compare_and_swap(&h->descriptor.dbt.data, old_descriptor, new_descriptor);
if (!ok1 || !ok2) {
// Don't quite raise an assert here, but if something goes wrong, I'd like to know.
static bool ever_wrote = false;
if (!ever_wrote) {
fprintf(stderr, "%s:%d compare_and_swap saw different values (%d %d)\n", __FILE__, __LINE__, ok1, ok2);
ever_wrote = true;
}
}
if (old_descriptor) {
// I don't need a lock here, since updates to the descriptor hold a lock.
h->free_me_count++;
XREALLOC_N(h->free_me_count, h->free_me);
h->free_me[h->free_me_count-1] = old_descriptor;
} }
h->descriptor.dbt.size = d->dbt.size;
h->descriptor.dbt.data = toku_memdup(d->dbt.data, d->dbt.size);
r = 0; r = 0;
cleanup: cleanup:
...@@ -4104,7 +4142,7 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_ ...@@ -4104,7 +4142,7 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_
// If searching from right to left, if we have already searched all the vlaues greater than pivot, we don't want to search again. // If searching from right to left, if we have already searched all the vlaues greater than pivot, we don't want to search again.
{ {
if (!search->have_pivot_bound) return TRUE; // isn't bounded. if (!search->have_pivot_bound) return TRUE; // isn't bounded.
FAKE_DB(db, desc); FAKE_DB(db, tmpdesc, desc);
int comp = cmp(&db, pivot, &search->pivot_bound); int comp = cmp(&db, pivot, &search->pivot_bound);
if (search->direction == BRT_SEARCH_LEFT) { if (search->direction == BRT_SEARCH_LEFT) {
// searching from left to right. If the comparison function says the pivot is <= something we already compared, don't do it again. // searching from left to right. If the comparison function says the pivot is <= something we already compared, don't do it again.
...@@ -4275,7 +4313,7 @@ bnc_find_iterate_bounds( ...@@ -4275,7 +4313,7 @@ bnc_find_iterate_bounds(
const long offset = (long) found_lb; const long offset = (long) found_lb;
DBT found_lbidbt; DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset)); fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
FAKE_DB(db, desc); FAKE_DB(db, tmpdesc, desc);
int c = cmp(&db, &found_lbidbt, &ubidbt_tmp); int c = cmp(&db, &found_lbidbt, &ubidbt_tmp);
// These DBTs really are both inclusive so we need strict inequality. // These DBTs really are both inclusive so we need strict inequality.
if (c > 0) { if (c > 0) {
...@@ -5036,7 +5074,7 @@ brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNC ...@@ -5036,7 +5074,7 @@ brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNC
} }
static inline int compare_k_x(BRT brt, const DBT *k, const DBT *x) { static inline int compare_k_x(BRT brt, const DBT *k, const DBT *x) {
FAKE_DB(db, &brt->h->descriptor); FAKE_DB(db, tmpdesc, &brt->h->descriptor);
return brt->compare_fun(&db, k, x); return brt->compare_fun(&db, k, x);
} }
......
...@@ -31,7 +31,7 @@ typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, bytevec, void ...@@ -31,7 +31,7 @@ typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, bytevec, void
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result)); int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result));
int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn); int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn);
int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd); int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd);
// Note: See the locking discussion in brt.c for toku_brt_change_descriptor and toku_update_descriptor.
int toku_dictionary_redirect (const char *dst_fname_in_env, BRT old_brt, TOKUTXN txn) __attribute__ ((warn_unused_result)); int toku_dictionary_redirect (const char *dst_fname_in_env, BRT old_brt, TOKUTXN txn) __attribute__ ((warn_unused_result));
// See the brt.c file for what this toku_redirect_brt does // See the brt.c file for what this toku_redirect_brt does
......
...@@ -15,23 +15,36 @@ static int my_compare (DB *db, const DBT *a, const DBT *b) { ...@@ -15,23 +15,36 @@ static int my_compare (DB *db, const DBT *a, const DBT *b) {
assert(data[1]=='o'); assert(data[1]=='o');
assert(data[2]=='o'); assert(data[2]=='o');
if (verbose) printf("compare descriptor=%s\n", data); if (verbose) printf("compare descriptor=%s\n", data);
usleep(1000); sched_yield();
return uint_dbt_cmp(db, a, b); return uint_dbt_cmp(db, a, b);
} }
DB_ENV *env; DB_ENV *env;
DB *db; DB *db;
char *env_dir = ENVDIR; char *env_dir = ENVDIR;
volatile int done = 0;
static void *startA (void *ignore __attribute__((__unused__))) { static void *startA (void *ignore __attribute__((__unused__))) {
for (int i=0;i<3; i++) { for (int i=0;i<999; i++) {
DBT k,v; DBT k,v;
int a=1; int a = (random()<<16) + i;
dbt_init(&k, &a, sizeof(a)); dbt_init(&k, &a, sizeof(a));
dbt_init(&v, &a, sizeof(a)); dbt_init(&v, &a, sizeof(a));
IN_TXN_COMMIT(env, NULL, txn, 0, DB_TXN *txn;
CHK(db->put(db, txn, &k, &v, 0))); again:
CHK(env->txn_begin(env, NULL, &txn, DB_TXN_NOSYNC));
{
int r = db->put(db, txn, &k, &v, 0);
if (r==DB_LOCK_NOTGRANTED) {
if (verbose) printf("lock not granted on %d\n", i);
CHK(txn->abort(txn));
goto again;
}
assert(r==0);
}
CHK(txn->commit(txn, 0));
} }
int r __attribute__((__unused__)) = __sync_fetch_and_add(&done, 1);
return NULL; return NULL;
} }
static void change_descriptor (DB_TXN *txn, int i) { static void change_descriptor (DB_TXN *txn, int i) {
...@@ -47,9 +60,10 @@ static void change_descriptor (DB_TXN *txn, int i) { ...@@ -47,9 +60,10 @@ static void change_descriptor (DB_TXN *txn, int i) {
if (verbose) printf("ok\n"); if (verbose) printf("ok\n");
} }
static void startB (void) { static void startB (void) {
for (int i=0; i<10; i++) { for (int i=0; !done; i++) {
IN_TXN_COMMIT(env, NULL, txn, 0, IN_TXN_COMMIT(env, NULL, txn, 0,
change_descriptor(txn, i)); change_descriptor(txn, i));
sched_yield();
} }
} }
...@@ -97,6 +111,7 @@ int test_main(int argc, char * const argv[]) { ...@@ -97,6 +111,7 @@ int test_main(int argc, char * const argv[]) {
CHK(env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO)); CHK(env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO));
CHK(db_create(&db, env, 0)); CHK(db_create(&db, env, 0));
CHK(db->set_pagesize(db, 1024));
CHK(db->open(db, NULL, "db", NULL, DB_BTREE, DB_CREATE, 0666)); CHK(db->open(db, NULL, "db", NULL, DB_BTREE, DB_CREATE, 0666));
DBT desc; DBT desc;
dbt_init(&desc, "foo", sizeof("foo")); dbt_init(&desc, "foo", sizeof("foo"));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment