Commit adc11042 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

{{{test_log8}}} recovers now. Fixes #544.

git-svn-id: file:///svn/tokudb@2880 c7de825b-a66e-492c-adef-691d508d4ae1
parent e18229f3
......@@ -359,8 +359,9 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
}
} else {
int n_in_buf = rbuf_int(&rc);
int index_limit = rbuf_int(&rc);
result->u.l.n_bytes_in_buffer = 0;
r=toku_pma_create(&result->u.l.buffer, bt_compare, db, filenum, nodesize);
r=toku_pma_create(&result->u.l.buffer, bt_compare, db, filenum, nodesize, index_limit);
if (r!=0) {
if (0) { died_21: toku_pma_free(&result->u.l.buffer); }
goto died1;
......@@ -369,62 +370,29 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
toku_pma_set_dup_compare(result->u.l.buffer, dup_compare);
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_verify_counts(result);
#define BRT_USE_PMA_BULK_INSERT 1
#if BRT_USE_PMA_BULK_INSERT
int index_limit __attribute__((__unused__))= rbuf_int(&rc);
if (n_in_buf > 0) {
#define BRT_BULK_INSERT_MALLOC 1
#if BRT_BULK_INSERT_MALLOC
/* some applications run with small stacks so we malloc the
keys and vals structs */
size_t n = 2 * n_in_buf * sizeof (DBT);
DBT *keys = toku_malloc(n);
if (keys == 0) goto died_21;
DBT *vals = &keys[n_in_buf];
#else
DBT keys[n_in_buf], vals[n_in_buf];
#endif
for (i=0; i<n_in_buf; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
// The counts are wrong here
int idx __attribute__((__unused__)) = rbuf_int(&rc);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
toku_fill_dbt(&keys[i], key, keylen);
rbuf_bytes(&rc, &val, &vallen);
toku_fill_dbt(&vals[i], val, vallen);
result->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD + PMA_ITEM_OVERHEAD;
}
u_int32_t actual_sum = 0;
r = toku_pma_bulk_insert((TOKULOGGER)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum, 0);
#if BRT_BULK_INSERT_MALLOC
toku_free_n(keys, n);
#endif
if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
return DB_BADFORMAT;
goto died_21;
} else {
//fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height);
}
}
#else
u_int32_t actual_sum = 0;
for (i=0; i<n_in_buf; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
toku_verify_counts(result);
int idx = rbuf_int(&rc);
DBT keydbt, datadbt;
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen);
{
DBT k,v;
r = toku_pma_insert(result->u.l.buffer, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, val, vallen), 0);
if (r!=0) goto died_21;
}
result->u.l.n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD + PMA_ITEM_OVERHEAD;
r = toku_pma_set_at_index(result->u.l.buffer, idx, toku_fill_dbt(&keydbt, key, keylen), toku_fill_dbt(&datadbt, val, vallen));
actual_sum += result->rand4fingerprint*toku_calccrc32_kvpair(key, keylen, val, vallen);
}
#endif
if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
return DB_BADFORMAT;
goto died_21;
} else {
//fprintf(stderr, "%s:%d Good checksum=%08x height=%d\n", __FILE__, __LINE__, actual_sum, result->height);
}
toku_verify_counts(result);
}
{
......
......@@ -267,7 +267,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->u.n.childinfos=0;
n->u.n.childkeys=0;
} else {
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize);
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize, 0);
assert(r==0);
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
......
......@@ -48,6 +48,7 @@ void dump_node (int f, DISKOFF off, struct brt_header *h) {
toku_default_compare_fun, toku_default_compare_fun,
(DB*)0, (FILENUM){0});
assert(r==0);
assert(n!=0);
printf("brtnode\n");
printf(" nodesize =%u\n", n->nodesize);
printf(" sizeonddisk =%d\n", toku_serialize_brtnode_size(n));
......@@ -107,6 +108,7 @@ void dump_node (int f, DISKOFF off, struct brt_header *h) {
printf(" items_in_buffer =%d\n", toku_pma_n_entries(n->u.l.buffer));
PMA_ITERATE_IDX(n->u.l.buffer, idx, key, keylen, data, datalen,
({
printf("%d: ", idx);
print_item(key, keylen);
printf(" ");
print_item(data, datalen);
......
This diff is collapsed.
......@@ -477,7 +477,7 @@ static unsigned int pma_array_size(PMA pma __attribute__((unused)), int asksize)
return n;
}
int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM filenum, int maxsize) {
int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM filenum, int maxsize, int initial_n_pairs /* set to zero to get default. */) {
int error;
TAGMALLOC(PMA, result);
if (result==0) return -1;
......@@ -490,7 +490,7 @@ int toku_pma_create(PMA *pma, pma_compare_fun_t compare_fun, DB *db, FILENUM fil
result->filenum = filenum;
result->skey = 0;
result->sval = 0;
result->N = PMA_MIN_ARRAY_SIZE;
result->N = initial_n_pairs ? initial_n_pairs : PMA_MIN_ARRAY_SIZE;
result->pairs = 0;
{
unsigned int n = pma_array_size(result, result->N);
......
......@@ -21,7 +21,7 @@ typedef struct pma *PMA;
/* compare 2 DBT's. return a value < 0, = 0, > 0 if a < b, a == b, a > b respectively */
typedef int (*pma_compare_fun_t)(DB *, const DBT *a, const DBT *b);
int toku_pma_create(PMA *, pma_compare_fun_t compare_fun, DB *, FILENUM filenum, int maxsize);
int toku_pma_create(PMA *, pma_compare_fun_t compare_fun, DB *, FILENUM filenum, int maxsize, int initial_n_pairs);
int toku_pma_set_compare(PMA pma, pma_compare_fun_t compare_fun);
......
......@@ -153,7 +153,7 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
n->local_fingerprint = 0; // nothing there yet
n->dirty = 1;
if (height==0) {
r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, filenum, nodesize);
r=toku_pma_create(&n->u.l.buffer, toku_dont_call_this_compare_fun, null_db, filenum, nodesize, 0);
assert(r==0);
n->u.l.n_bytes_in_buffer=0;
} else {
......@@ -348,6 +348,9 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(keybs.data, keybs.len, databs.data, databs.len);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + keybs.len + databs.len;
// PMA_ITERATE_IDX(node->u.l.buffer, idx, skey, keylen __attribute__((__unused__)), sdata, datalen __attribute__((__unused__)),
// printf("%d: %s %s\n", idx, (char*)skey, (char*)sdata));
VERIFY_COUNTS(node);
node->log_lsn = lsn;
......@@ -415,10 +418,13 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff,
BRTNODE nodeb = node_vb; assert(nodeb->height==0);
{
unsigned int i;
//printf("{");
for (i=0; i<fromto.size; i++) {
//printf(" {%d %d}", fromto.array[i].a, fromto.array[i].b);
assert(fromto.array[i].a < toku_pma_index_limit(nodea->u.l.buffer));
assert(fromto.array[i].b < toku_pma_index_limit(nodeb->u.l.buffer));
}
//printf("}\n");
}
r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, fromto,
nodea->rand4fingerprint, &nodea->local_fingerprint,
......@@ -427,6 +433,9 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff,
);
// The bytes in buffer and fingerprint shouldn't change
// PMA_ITERATE_IDX(nodeb->u.l.buffer, idx, key, keylen __attribute__((__unused__)), data, datalen __attribute__((__unused__)),
// printf("%d: %s %s\n", idx, (char*)key, (char*)data));
VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb);
......@@ -514,12 +523,13 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
FILE *f = fopen(logfiles[i], "r");
struct log_entry le;
u_int32_t version;
//printf("Reading file %s\n", logfiles[i]);
r=toku_read_and_print_logmagic(f, &version);
assert(r==0 && version==0);
r=chdir(data_wd);
assert(r==0);
while ((r = toku_log_fread(f, &le))==0) {
//printf("%lld: Got cmd %c\n", le.u.commit.lsn.lsn, le.cmd);
//printf("%lld: Got cmd %c\n", (long long)le.u.commit.lsn.lsn, le.cmd);
logtype_dispatch_args(&le, toku_recover_);
entrycount++;
}
......
......@@ -35,7 +35,9 @@ static void make_db (void) {
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
#ifdef TOKUDB
r=env->set_lk_max_locks(env, 2*maxcount); CKERR(r);
#endif
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
......@@ -58,6 +60,13 @@ static void make_db (void) {
key.data = hello; key.size=strlen(hello)+1;
data.data = there; data.size=strlen(there)+1;
r=db->put(db, tid, &key, &data, 0); assert(r==0);
#ifndef TOKUDB
// BDB cannot handle this huge transaction even with a lot of locks.
if (i%1000==599) {
r=tid->commit(tid, 0); assert(r==0);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
}
#endif
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
......
......@@ -82,7 +82,7 @@ static void make_db (void) {
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
for (i=0; i<2; i++)
for (i=0; i<1; i++)
insert_some(i);
while (items) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment