Commit 79e100b3 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

close[t:4015] Fix #4015. {{{svn merge -r 37828:37890 ../tokudb.4015}}}....

close[t:4015] Fix #4015.  {{{svn merge -r 37828:37890 ../tokudb.4015}}}. (Still need to do the code review. Scheduled for tomorrow.)

git-svn-id: file:///svn/toku/tokudb@37892 c7de825b-a66e-492c-adef-691d508d4ae1
parent c3763754
......@@ -6,6 +6,7 @@
#include <brt-flusher.h>
#include <brt-cachetable-wrappers.h>
#include <brt-internal.h>
#include <valgrind/drd.h>
#define ft_flush_before_applying_inbox 1
#define ft_flush_before_child_pin 2
......
......@@ -375,6 +375,8 @@ struct brt_header {
unsigned int flags;
DESCRIPTOR_S descriptor;
int free_me_count; // Descriptors are freed asynchronously, so we cannot free() them.
void **free_me; // Instead we just accumulate them in this array. These are void* that we must free() later.
BLOCK_TABLE blocktable;
// If a transaction created this BRT, which one?
......
......@@ -112,6 +112,7 @@ Split_or_merge (node, childnum) {
#include "sort.h"
#include <brt-cachetable-wrappers.h>
#include <brt-flusher.h>
#include <valgrind/drd.h>
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
......@@ -927,14 +928,17 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAI
}
#define FAKE_DB(db, desc) struct __toku_db db = {.descriptor= desc}
// Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer.
// In combination with the annotation in toku_update_descriptor, this seems to be enough to convince test_4015 that all is well.
// Otherwise, drd complains that the newly malloc'd descriptor string is touched later by some comparison operation.
#define FAKE_DB(db, desc_var, desc) DESCRIPTOR_S desc_var = *(desc); struct __toku_db db = {.descriptor= &desc_var}; ANNOTATE_HAPPENS_AFTER(&(desc)->dbt.data);
static int
leafval_heaviside_le (u_int32_t klen, void *kval,
struct cmd_leafval_heaviside_extra *be) {
DBT dbt;
DBT const * const key = be->key;
FAKE_DB(db, be->desc);
FAKE_DB(db, tmp_desc, be->desc);
return be->compare_fun(&db,
toku_fill_dbt(&dbt, kval, klen),
key);
......@@ -957,7 +961,7 @@ brt_compare_pivot(DESCRIPTOR desc, brt_compare_func cmp, const DBT *key, bytevec
int r;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
FAKE_DB(db, desc);
FAKE_DB(db, tmp_desc, desc);
r = cmp(&db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
return r;
}
......@@ -1030,6 +1034,10 @@ brtheader_destroy(struct brt_header *h) {
assert(h->type == BRTHEADER_CURRENT);
toku_blocktable_destroy(&h->blocktable);
if (h->descriptor.dbt.data) toku_free(h->descriptor.dbt.data);
for (int i=0; i<h->free_me_count; i++) {
toku_free(h->free_me[i]);
}
toku_free(h->free_me);
}
}
......@@ -1387,7 +1395,7 @@ static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE b
struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, bn, cmd->msn, cmd->xids,
keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, 0, workdone};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc);
FAKE_DB(db, tmp_desc, desc);
int r = update_fun(
&db,
keyp,
......@@ -1514,7 +1522,7 @@ brt_leaf_put_cmd (
DBT adbt;
u_int32_t keylen;
void *keyp = le_key_and_len(storeddata, &keylen);
FAKE_DB(db, desc);
FAKE_DB(db, tmp_desc, desc);
if (compare_fun(&db,
toku_fill_dbt(&adbt, keyp, keylen),
cmd->u.id.key) != 0)
......@@ -1625,7 +1633,7 @@ static inline int
key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn,
DESCRIPTOR descriptor, brt_compare_func key_cmp)
{
FAKE_DB(db, descriptor);
FAKE_DB(db, tmpdesc, descriptor);
int r = key_cmp(&db, a, b);
if (r == 0) {
r = (amsn.msn > bmsn.msn) - (amsn.msn < bmsn.msn);
......@@ -2946,8 +2954,22 @@ verify_builtin_comparisons_consistent(BRT t, u_int32_t flags) {
return 0;
}
int
toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd) {
int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd)
// Effect: Change the descriptor in a tree (log the change, make sure it makes it to disk eventually).
// Updates to the descriptor must be performed while holding some sort of lock. (In the ydb layer
// there is a row lock on the directory that provides exclusion.)
// However, reads can occur concurrently.
// So the trickyness here is to update the size and data with atomic instructions.
// DRD ought to recognize if we do
// x = malloc();
// fill(x);
// atomic_set(y, x);
// and then another thread looks at
// *y
// then there's no race.
// So we tell drd that the newly mallocated memory was filled in before the assignment into dbt.data with a ANNOTATE_HAPPENS_BEFORE.
// The other half (the reads) are hacked in the FAKE_DB macro.
{
int r = 0;
DISKOFF offset;
//4 for checksum
......@@ -2956,11 +2978,27 @@ toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd) {
if (r) {
goto cleanup;
}
if (h->descriptor.dbt.data) {
toku_free(h->descriptor.dbt.data);
u_int32_t old_size = h->descriptor.dbt.size;
void *old_descriptor = h->descriptor.dbt.data;
void *new_descriptor = toku_memdup(d->dbt.data, d->dbt.size);
ANNOTATE_HAPPENS_BEFORE(&h->descriptor.dbt.data);
bool ok1 = __sync_bool_compare_and_swap(&h->descriptor.dbt.size, old_size, d->dbt.size);
bool ok2 = __sync_bool_compare_and_swap(&h->descriptor.dbt.data, old_descriptor, new_descriptor);
if (!ok1 || !ok2) {
// Don't quite raise an assert here, but if something goes wrong, I'd like to know.
static bool ever_wrote = false;
if (!ever_wrote) {
fprintf(stderr, "%s:%d compare_and_swap saw different values (%d %d)\n", __FILE__, __LINE__, ok1, ok2);
ever_wrote = true;
}
}
if (old_descriptor) {
// I don't need a lock here, since updates to the descriptor hold a lock.
h->free_me_count++;
XREALLOC_N(h->free_me_count, h->free_me);
h->free_me[h->free_me_count-1] = old_descriptor;
}
h->descriptor.dbt.size = d->dbt.size;
h->descriptor.dbt.data = toku_memdup(d->dbt.data, d->dbt.size);
r = 0;
cleanup:
......@@ -4104,7 +4142,7 @@ static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_
// If searching from right to left, if we have already searched all the vlaues greater than pivot, we don't want to search again.
{
if (!search->have_pivot_bound) return TRUE; // isn't bounded.
FAKE_DB(db, desc);
FAKE_DB(db, tmpdesc, desc);
int comp = cmp(&db, pivot, &search->pivot_bound);
if (search->direction == BRT_SEARCH_LEFT) {
// searching from left to right. If the comparison function says the pivot is <= something we already compared, don't do it again.
......@@ -4275,7 +4313,7 @@ bnc_find_iterate_bounds(
const long offset = (long) found_lb;
DBT found_lbidbt;
fill_dbt_for_fifo_entry(&found_lbidbt, toku_fifo_get_entry(buffer, offset));
FAKE_DB(db, desc);
FAKE_DB(db, tmpdesc, desc);
int c = cmp(&db, &found_lbidbt, &ubidbt_tmp);
// These DBTs really are both inclusive so we need strict inequality.
if (c > 0) {
......@@ -5036,7 +5074,7 @@ brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNC
}
static inline int compare_k_x(BRT brt, const DBT *k, const DBT *x) {
FAKE_DB(db, &brt->h->descriptor);
FAKE_DB(db, tmpdesc, &brt->h->descriptor);
return brt->compare_fun(&db, k, x);
}
......
......@@ -31,7 +31,7 @@ typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, bytevec, void
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result));
int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn);
int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd);
// Note: See the locking discussion in brt.c for toku_brt_change_descriptor and toku_update_descriptor.
int toku_dictionary_redirect (const char *dst_fname_in_env, BRT old_brt, TOKUTXN txn) __attribute__ ((warn_unused_result));
// See the brt.c file for what this toku_redirect_brt does
......
......@@ -15,23 +15,36 @@ static int my_compare (DB *db, const DBT *a, const DBT *b) {
assert(data[1]=='o');
assert(data[2]=='o');
if (verbose) printf("compare descriptor=%s\n", data);
usleep(1000);
sched_yield();
return uint_dbt_cmp(db, a, b);
}
DB_ENV *env;
DB *db;
char *env_dir = ENVDIR;
volatile int done = 0;
static void *startA (void *ignore __attribute__((__unused__))) {
for (int i=0;i<3; i++) {
for (int i=0;i<999; i++) {
DBT k,v;
int a=1;
int a = (random()<<16) + i;
dbt_init(&k, &a, sizeof(a));
dbt_init(&v, &a, sizeof(a));
IN_TXN_COMMIT(env, NULL, txn, 0,
CHK(db->put(db, txn, &k, &v, 0)));
DB_TXN *txn;
again:
CHK(env->txn_begin(env, NULL, &txn, DB_TXN_NOSYNC));
{
int r = db->put(db, txn, &k, &v, 0);
if (r==DB_LOCK_NOTGRANTED) {
if (verbose) printf("lock not granted on %d\n", i);
CHK(txn->abort(txn));
goto again;
}
assert(r==0);
}
CHK(txn->commit(txn, 0));
}
int r __attribute__((__unused__)) = __sync_fetch_and_add(&done, 1);
return NULL;
}
static void change_descriptor (DB_TXN *txn, int i) {
......@@ -47,9 +60,10 @@ static void change_descriptor (DB_TXN *txn, int i) {
if (verbose) printf("ok\n");
}
static void startB (void) {
for (int i=0; i<10; i++) {
for (int i=0; !done; i++) {
IN_TXN_COMMIT(env, NULL, txn, 0,
change_descriptor(txn, i));
sched_yield();
}
}
......@@ -97,6 +111,7 @@ int test_main(int argc, char * const argv[]) {
CHK(env->open(env, env_dir, envflags, S_IRWXU+S_IRWXG+S_IRWXO));
CHK(db_create(&db, env, 0));
CHK(db->set_pagesize(db, 1024));
CHK(db->open(db, NULL, "db", NULL, DB_BTREE, DB_CREATE, 0666));
DBT desc;
dbt_init(&desc, "foo", sizeof("foo"));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment