Commit ab12ba3a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Implemented tests for the confused-about-provisional-deletes bug. ...

Implemented tests for the confused-about-provisional-deletes bug.  {{{test_log6a_abort}}} works now. Fixes #677.

git-svn-id: file:///svn/tokudb@3349 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3879c7b7
......@@ -30,6 +30,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include "toku_assert.h"
......@@ -256,6 +257,23 @@ u_int32_t mp_pool_size_for_nodesize (u_int32_t nodesize) {
}
// Simple LCG random number generator. Not high quality, but good enough.
static int r_seeded=0;
static uint32_t rstate=1;
static inline void mysrandom (int s) {
rstate=s;
r_seeded=1;
}
static inline uint32_t myrandom (void) {
if (!r_seeded) {
struct timeval tv;
gettimeofday(&tv, 0);
mysrandom(tv.tv_sec);
}
rstate = (279470275ull*(uint64_t)rstate)%4294967291ull;
return rstate;
}
static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height) {
n->tag = TYP_BRTNODE;
n->nodesize = t->h->nodesize;
......@@ -1435,7 +1453,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
while (1) {
int vallen = le_any_vallen(storeddata);
void *save_val = toku_memdup(le_any_val(storeddata), storedlen);
void *save_val = toku_memdup(le_any_val(storeddata), vallen);
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storedlen, storeddata);
if (r!=0) return r;
......@@ -2351,7 +2369,7 @@ int toku_dump_brtnode (BRT brt, DISKOFF off, int depth, bytevec lorange, ITEMLEN
printf("%*sNode %lld nodesize=%d height=%d n_bytes_in_buffer=%d keyrange=%d %d\n",
depth, "", off, node->nodesize, node->height, node->u.l.n_bytes_in_buffer, lorange ? ntohl(*(int*)lorange) : 0, hirange ? ntohl(*(int*)hirange) : 0);
//GPMA_ITERATE(node->u.l.buffer, idx, len, data,
// ( keylen=keylen, vallen=vallen, printf(" (%d)%d ", keylen, ntohl(*(int*)key))));
// printf(" (%d)%u ", len, *(int*)le_any_key(data)));
printf("\n");
}
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
......@@ -2533,8 +2551,8 @@ int pair_leafval_bessel_le_both (TXNID xid __attribute__((__unused__)),
}
int pair_leafval_bessel_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
brt_search_t *be) {
return pair_leafval_bessel_le_committed(klen, kval, clen, cval, be);
}
......@@ -2563,7 +2581,7 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
ok: ;
u_int32_t len;
void * data;
u_int32_t idx; // Don't need this
u_int32_t idx;
int r = toku_gpma_lookup_bessel(node->u.l.buffer,
bessel_from_search_t,
direction,
......@@ -2574,8 +2592,26 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
LEAFENTRY le = data;
if (le_is_provdel(le)) {
// Provisionally deleted stuff is gone.
return DB_NOTFOUND;
// So we need to scan in the direction to see if we can find something
while (1) {
switch (search->direction) {
case BRT_SEARCH_LEFT:
idx++;
if (idx>toku_gpma_index_limit(node->u.l.buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND;
idx--;
break;
}
if (!toku_gpma_valididx(node->u.l.buffer, idx)) continue;
r = toku_gpma_get_from_index(node->u.l.buffer, idx, &len, &data);
assert(r==0); // we just validated the index
le = data;
if (!le_is_provdel(le)) goto got_a_good_value;
}
}
got_a_good_value:
if (newkey) {
r = toku_dbt_set_value(newkey, le_latest_key(le), le_latest_keylen(le), &brt->skey);
if (r!=0) return r;
......
......@@ -50,7 +50,7 @@ DBT *dbt_init_malloc(DBT *dbt) {
return dbt;
}
// Simle LCG random number generator. Not high quality, but good enough.
// Simple LCG random number generator. Not high quality, but good enough.
static uint32_t rstate=1;
static inline void mysrandom (int s) {
rstate=s;
......
......@@ -8,14 +8,13 @@
#include <db.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include "test.h"
static DB_ENV *env;
static DB *db;
static DB_TXN *txn;
// Got a different problem when N=1000.
void insert (int i) {
char hello[30], there[30];
DBT key,data;
......@@ -50,7 +49,7 @@ void find (int i) {
CKERR(r);
}
void find_first (int i) {
void find_first_or_last (int i, int cflag) {
int r;
DBC *cursor;
DBT key, val;
......@@ -59,7 +58,7 @@ void find_first (int i) {
r = db->cursor(db, txn, &cursor, 0);
CKERR(r);
r = cursor->c_get(cursor, &key, &val, DB_FIRST);
r = cursor->c_get(cursor, &key, &val, cflag);
assert(r==0);
char hello[30], there[30];
......@@ -68,9 +67,13 @@ void find_first (int i) {
assert(strcmp(hello, key.data)==0);
assert(strcmp(there, val.data)==0);
r = cursor->c_close(cursor);
}
void do_abort_delete_first(int N) {
void do_abort_delete_first_or_last(int N,
int first // 1 for first, 0 for last
) {
int r,i;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
......@@ -94,25 +97,35 @@ void do_abort_delete_first(int N) {
// Now delete a bunch of stuff and see if we can do DB_FIRST
r=env->txn_begin(env, 0, &txn, 0); assert(r==0);
for (i=0; i<N-1; i++) {
delete(i);
if (first) {
for (i=0; i<N-1; i++) {
delete(i);
}
find(i);
find_first_or_last(i, DB_FIRST);
} else {
for (i=1; i<N; i++) {
delete(i);
}
find_first_or_last(0, DB_LAST);
}
find(i);
find_first(i);
r=txn->commit(txn, 0); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
#ifdef TOKUDB
r=system("../../newbrt/brtdump " ENVDIR "/foo.db > /dev/null");
assert(WIFEXITED(r) && WEXITSTATUS(r)==0);
#endif
}
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
do_abort_delete_first(10);
int r=system("../../newbrt/brtdump " ENVDIR "/foo.db");
assert(WIFEXITED(r) && WEXITSTATUS(r)==0);
do_abort_delete_first(1000);
int r=system("../../newbrt/brtdump " ENVDIR "/foo.db");
assert(WIFEXITED(r) && WEXITSTATUS(r)==0);
int f;
for (f=0; f<2; f++) {
do_abort_delete_first_or_last(10, f);
do_abort_delete_first_or_last(1000,f);
}
return 0;
}
......@@ -21,19 +21,44 @@
// ENVDIR is defined in the Makefile
// How many iterations are we going to do insertions and deletions. This is a bound to the number of distinct keys in the DB.
#define N 10000
#define N 1000
int n_keys_mentioned=0;
int random_keys_mentioned[N];
DB *pending_i, *pending_d, *committed;
// Keep track of what's in the committed database separately
struct pair {int x,y;};
void insert_in_mem (int x, int y, int *count, struct pair *pairs) {
assert(*count<N);
pairs[(*count)++]=(struct pair){x,y};
}
void delete_in_mem (int x, int *count, struct pair *pairs) {
int i;
for (i=0; i<*count; i++) {
if (pairs[i].x==x) {
pairs[i]=pairs[--(*count)];
return;
}
}
}
static int com_count=0, pend_count=0, peni_count=0;
static struct pair com_data[N], pend_data[N], peni_data[N];
void insert_pending(int key, int val, DB_TXN *bookx) {
DBT keyd,datad;
//printf("IP %u,%u\n", key,val);
insert_in_mem(key, val, &peni_count, peni_data);
pending_i->put(pending_i, bookx,
dbt_init(&keyd, &key, sizeof(key)),
dbt_init(&datad, &val, sizeof(val)),
0);
delete_in_mem(key, &pend_count, pend_data);
pending_d->del(pending_d, bookx,
dbt_init(&keyd, &key, sizeof(key)),
0);
......@@ -66,8 +91,13 @@ static void delete_a_random_item (DB *db, DB_TXN *tid, DB_TXN *bookx) {
//printf("Delete %u\n", rand);
dbt_init(&keyd, &rand, sizeof(rand));
dbt_init(&vald, &rand, sizeof(rand));
pending_i->del(pending_i, bookx, &keyd, 0);
pending_i->put(pending_d, bookx, &keyd, &vald, 0);
delete_in_mem(rand, &peni_count, peni_data);
pending_d->put(pending_d, bookx, &keyd, &vald, 0);
insert_in_mem(rand, rand, &pend_count, pend_data);
db->del(db, tid, &keyd, DB_DELETE_ANY);
}
......@@ -80,6 +110,7 @@ static void commit_items (DB_ENV *env, int i) {
DBT k,v;
memset(&k,0,sizeof(k));
memset(&v,0,sizeof(v));
//printf("%d items in peni\n", peni_count);
while (cursor->c_get(cursor, &k, &v, DB_FIRST)==0) {
assert(k.size==4);
assert(v.size==4);
......@@ -87,6 +118,7 @@ static void commit_items (DB_ENV *env, int i) {
int vi=*(int*)v.data;
//printf(" put %u %u\n", ki, vi);
r=committed->put(committed, txn, dbt_init(&k, &ki, sizeof(ki)), dbt_init(&v, &vi, sizeof(vi)), 0);
insert_in_mem(ki, vi, &com_count, com_data);
assert(r==0);
r=pending_i->del(pending_i, txn, &k, 0);
assert(r==0);
......@@ -105,6 +137,7 @@ static void commit_items (DB_ENV *env, int i) {
assert(ki==vi);
//printf(" del %u\n", ki);
committed->del(committed, txn, dbt_init(&k, &ki, sizeof(ki)), DB_AUTO_COMMIT);
delete_in_mem(ki, &com_count, com_data);
// ignore result from that del
r=pending_d->del(pending_d, txn, &k, 0);
assert(r==0);
......@@ -149,6 +182,10 @@ static void abort_items (DB_ENV *env) {
r=txn->commit(txn, 0); assert(r==0);
}
int compare_pairs (const void *a, const void *b) {
return memcmp(a,b,4);
}
static void verify_items (DB_ENV *env, DB *db) {
DB_TXN *txn;
int r=env->txn_begin(env, 0, &txn, 0); assert(r==0);
......@@ -168,9 +205,14 @@ static void verify_items (DB_ENV *env, DB *db) {
r = committed->cursor(committed, txn, &cursor, 0);
assert(r==0);
qsort(com_data, com_count, sizeof(com_data[0]), compare_pairs);
int curscount=0;
//printf(" count=%d\n", com_count);
while (cursor->c_get(cursor, &k, &v, DB_NEXT)==0) {
int kv=*(int*)k.data;
int dv=*(int*)v.data;
//printf(" sorted com_data[%d]=%d, cursor got %d\n", curscount, com_data[curscount].x, kv);
assert(com_data[curscount].x==kv);
DBT k2,v2;
memset(&k2, 0, sizeof(k2));
memset(&v2, 0, sizeof(v2));
......@@ -178,11 +220,13 @@ static void verify_items (DB_ENV *env, DB *db) {
snprintf(hello, sizeof(hello), "hello%d.%d", kv, dv);
snprintf(there, sizeof(hello), "there%d", dv);
k2.data = hello; k2.size=strlen(hello)+1;
printf("kv=%d dv=%d\n", kv, dv);
//printf("committed: %u,%u\n", kv, dv);
r=db->get(db, txn, &k2, &v2, 0);
assert(r==0);
assert(strcmp(v2.data, there)==0);
curscount++;
}
assert(curscount==com_count);
r=cursor->c_close(cursor);
assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment