Commit fc13ce32 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

test_log5 recovers correctly now. Addresses #27.

git-svn-id: file:///svn/tokudb@1826 c7de825b-a66e-492c-adef-691d508d4ae1
parent bcf56672
......@@ -395,7 +395,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
}
if (n_in_buf > 0) {
u_int32_t actual_sum = 0;
r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum);
r = toku_pma_bulk_insert((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, result->u.l.buffer, keys, vals, n_in_buf, result->rand4fingerprint, &actual_sum, 0);
if (r!=0) goto died_21;
if (actual_sum!=result->local_fingerprint) {
//fprintf(stderr, "%s:%d Corrupted checksum stored=%08x rand=%08x actual=%08x height=%d n_keys=%d\n", __FILE__, __LINE__, result->rand4fingerprint, result->local_fingerprint, actual_sum, result->height, n_in_buf);
......
......@@ -79,6 +79,13 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnod
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void toku_update_brtnode_lsn(BRTNODE node, TOKUTXN txn) {
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
}
}
static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE child, BRT brt, TOKUTXN txn) {
u_int32_t old_fingerprint = BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node);
u_int32_t sum = child->local_fingerprint;
......@@ -93,6 +100,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_lsn(node, txn);
}
static int brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck) {
......@@ -191,12 +199,12 @@ int toku_unpin_brt_header (BRT brt) {
brt->h=0;
return r;
}
static int unpin_brtnode (BRT brt, BRTNODE node, TOKUTXN txn) {
if (node->dirty && txn) {
// For now just update the log_lsn. Later we'll have to deal with the checksums.
node->log_lsn = toku_txn_get_last_lsn(txn);
//if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
}
static int unpin_brtnode (BRT brt, BRTNODE node) {
// if (node->dirty && txn) {
// // For now just update the log_lsn. Later we'll have to deal with the checksums.
// node->log_lsn = toku_txn_get_last_lsn(txn);
// //if (node->log_lsn.lsn>33320) printf("%s:%d node%lld lsn=%lld\n", __FILE__, __LINE__, node->thisnodename, node->log_lsn.lsn);
// }
return toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
}
......@@ -302,6 +310,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
assert(r==0);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
toku_update_brtnode_lsn(n, txn);
}
static void delete_node (BRT t, BRTNODE node) {
......@@ -356,9 +365,9 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
assert(node->height>0 || node->u.l.buffer!=0);
int r;
r = toku_pma_split(txn, filenum,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn,
splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint, &B->log_lsn);
assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
......@@ -684,9 +693,9 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
toku_verify_counts(childa);
toku_verify_counts(childb);
r=unpin_brtnode(t, childa, txn);
r=unpin_brtnode(t, childa);
assert(r==0);
r=unpin_brtnode(t, childb, txn);
r=unpin_brtnode(t, childb);
assert(r==0);
if (node->u.n.n_children>TREE_FANOUT) {
......@@ -804,7 +813,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (debug) printf("%s:%d %*sdone push_some_brt_cmds_down, unpinning %lld\n", __FILE__, __LINE__, debug, "", targetchild);
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
//verify_local_fingerprint_nonleaf(node);
r=unpin_brtnode(t, child, txn);
r=unpin_brtnode(t, child);
if (r!=0) return r;
*did_split=0;
return 0;
......@@ -870,7 +879,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int replaced_v_size;
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint);
enum pma_errors pma_status = toku_pma_insert_or_replace(node->u.l.buffer, k, v, &replaced_v_size, txn, filenum, node->thisnodename, node->rand4fingerprint, &node->local_fingerprint, &node->log_lsn);
assert(pma_status==BRT_OK);
//printf("replaced_v_size=%d\n", replaced_v_size);
if (replaced_v_size>=0) {
......@@ -959,7 +968,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
r = toku_cachetable_maybe_get_and_pin(t->cf, BRTNODE_CHILD_DISKOFF(node, childnum), &child_v);
else
r = toku_cachetable_get_and_pin(t->cf, BRTNODE_CHILD_DISKOFF(node, childnum), &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
if (r != 0)
return r;
......@@ -970,7 +979,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
&child_did_split, &childa, &childb, &childsplitk, debug, txn);
if (r != 0) {
/* putting to the child failed for some reason, so unpin the child and return the error code */
int rr = unpin_brtnode(t, child, txn);
int rr = unpin_brtnode(t, child);
assert(rr == 0);
return r;
}
......@@ -985,7 +994,7 @@ static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
} else {
//verify_local_fingerprint_nonleaf(child);
fixup_child_fingerprint(node, childnum, child, t, txn);
int rr = unpin_brtnode(t, child, txn);
int rr = unpin_brtnode(t, child);
assert(rr == 0);
}
return r;
......@@ -1253,11 +1262,8 @@ static int setup_brt_root_node (BRT t, DISKOFF offset, TOKUTXN txn) {
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
//fprintf(stderr, "%s:%d last lsn=%" PRId64 "\n", __FILE__, __LINE__, node->log_lsn.lsn);
}
r=unpin_brtnode(t, node, txn);
toku_update_brtnode_lsn(node, txn);
r=unpin_brtnode(t, node);
if (r!=0) {
toku_free(node);
return r;
......@@ -1618,10 +1624,11 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_lsn(newroot, txn);
}
r=unpin_brtnode(brt, nodea, txn);
r=unpin_brtnode(brt, nodea);
if (r!=0) return r;
r=unpin_brtnode(brt, nodeb, txn);
r=unpin_brtnode(brt, nodeb);
if (r!=0) return r;
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
......@@ -1673,7 +1680,7 @@ static int brt_root_put_cmd(BRT brt, BRT_CMD *cmd, TOKUTXN txn) {
if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT);
}
r = unpin_brtnode(brt, node, txn);
r = unpin_brtnode(brt, node);
assert(r==0);
r = toku_unpin_brt_header(brt);
assert(r == 0);
......@@ -1883,7 +1890,7 @@ static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor
BRTNODE newnode;
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn, &newnode);
assert(r == 0);
r = unpin_brtnode(t, newnode, txn);
r = unpin_brtnode(t, newnode);
assert(r == 0);
} else {
BRTNODE upnode;
......
......@@ -36,9 +36,9 @@ struct pma {
int toku_pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void toku_pmainternal_calculate_parameters (PMA pma);
int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/);
int toku_pmainternal_smooth_region (TOKUTXN, FILENUM, DISKOFF, struct kv_pair */*pairs*/[], int /*n*/, int /*idx*/, int /*base*/, PMA /*pma*/, int */*new_idx*/, LSN */*node_lsn*/);
int toku_pmainternal_printpairs (struct kv_pair *pairs[], int N);
int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index);
int toku_pmainternal_make_space_at (TOKUTXN, FILENUM, DISKOFF, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn);
int toku_pmainternal_find (PMA pma, DBT *); // The DB is so the comparison fuction can be called.
void toku_print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
......
......@@ -44,7 +44,7 @@ static void test_make_space_at (void) {
r=toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0);
assert(toku_pma_n_entries(pma)==0);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0);
assert(toku_pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[toku_pma_index_limit(pma)]==0xdeadbeefL);
......@@ -52,7 +52,7 @@ static void test_make_space_at (void) {
pma->pairs[2] = key_A;
pma->n_pairs_present++;
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 2, &newi, (LSN*)0);
assert(r==0);
if (verbose) printf("Requested space at 2, got space at %d\n", newi);
if (verbose) toku_print_pma(pma);
......@@ -66,7 +66,7 @@ static void test_make_space_at (void) {
pma->pairs[3] = 0;
pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma);
toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi);
toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 0, &newi, (LSN*)0);
assert(r==0);
if (verbose) printf("Requested space at 0, got space at %d\n", newi);
if (verbose) toku_print_pma(pma);
......@@ -83,7 +83,7 @@ static void test_make_space_at (void) {
pma->pairs[7] = 0;
pma->n_pairs_present=2;
if (verbose) toku_print_pma(pma);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi);
r=toku_pmainternal_make_space_at(null_txn, null_filenum, null_diskoff, pma, 5, &newi, (LSN*)0);
assert(r==0);
if (verbose) toku_print_pma(pma);
if (verbose) printf("r=%d\n", newi);
......@@ -177,7 +177,7 @@ static void test_smooth_region_N (int N) {
}
}
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); }
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r);
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, insertat, 0, 0, &r, (LSN*)0);
if (verbose) { toku_pmainternal_printpairs(pairs, N); printf(" at %d\n", r); }
assert(0<=r); assert(r<N);
assert(pairs[r]==0);
......@@ -219,7 +219,7 @@ static void test_smooth_region6 (void) {
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r;
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r);
toku_pmainternal_smooth_region(null_txn, null_filenum, null_diskoff, pairs, N, 2, 0, 0, &r, (LSN*)0);
if (verbose) {
printf("{ ");
for (i=0; i<N; i++)
......@@ -284,7 +284,7 @@ static void add_fingerprint_and_check(u_int32_t rand4fingerprint, u_int32_t actu
static void do_insert (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k,v;
assert(*sum==*expect_fingerprint);
int r = toku_pma_insert(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), NULL_ARGS, rand4fingerprint, sum);
int r = toku_pma_insert(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), NULL_ARGS, rand4fingerprint, sum, (LSN*)0);
assert(r==BRT_OK);
add_fingerprint_and_check(rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen);
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
......@@ -880,9 +880,9 @@ static void test_pma_split_n(int n) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
null_diskoff, pmac, 0, crand, &csum);
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmac);
......@@ -940,9 +940,9 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
DBT splitk;
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
&splitk,
null_diskoff, pmac, 0, crand, &csum);
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmac);
......@@ -1006,9 +1006,9 @@ static void test_pma_split_varkey(void) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
null_diskoff, pmac, 0, crand, &csum);
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmac);
......@@ -1145,9 +1145,9 @@ static void test_pma_split_cursor(void) {
assert_cursor_val(cursorc, 16);
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, arand, &asum,
null_diskoff, pmaa, 0, arand, &asum, (LSN*)0,
0,
null_diskoff, pmac, 0, crand, &csum);
null_diskoff, pmac, 0, crand, &csum, (LSN*)0);
assert(error == 0);
toku_pma_verify_fingerprint(pmaa, arand, asum);
......@@ -1252,7 +1252,7 @@ static void test_pma_bulk_insert_n(int n) {
}
/* bulk insert n kv pairs */
error = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum);
error = toku_pma_bulk_insert(null_txn, null_filenum, (DISKOFF)0, pma, keys, vals, n, rand4fingerprint, &sum, 0);
assert(error == 0);
assert(sum==expect_fingerprint);
toku_pma_verify(pma);
......@@ -1305,14 +1305,14 @@ static void test_pma_insert_or_replace(void) {
u_int32_t expect_fingerprint = 0;
r = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, 0);
assert(r==0);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_fill_dbt(&dbtv, "zzz", 4), &n_diff, NULL_ARGS, rand4fingerprint, &sum);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_fill_dbt(&dbtv, "zzz", 4), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==-1);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "aaa", 4, "zzz", 4);
r = toku_pma_lookup(pma, toku_fill_dbt(&dbtk, "aaa", 4), toku_init_dbt(&dbtv));
assert(r==0); assert(dbtv.size==4); assert(memcmp(dbtv.data, "zzz", 4)==0);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "ww", 3), &n_diff, NULL_ARGS, rand4fingerprint, &sum);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "ww", 3), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==-1);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "ww", 3);
......@@ -1323,7 +1323,7 @@ static void test_pma_insert_or_replace(void) {
assert(r==0); assert(dbtv.size==3); assert(memcmp(dbtv.data, "ww", 3)==0);
// replae bbbb
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "xxxx", 5), &n_diff, NULL_ARGS, rand4fingerprint, &sum);
r = toku_pma_insert_or_replace(pma, toku_fill_dbt(&dbtk, "bbbb", 5), toku_fill_dbt(&dbtv, "xxxx", 5), &n_diff, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r==0); assert(n_diff==3);
expect_fingerprint -= rand4fingerprint*toku_calccrc32_kvpair("bbbb", 5, "ww", 3);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, "bbbb", 5, "xxxx", 5);
......@@ -1817,10 +1817,10 @@ static void test_pma_already_there() {
k = 1; v = 1;
toku_fill_dbt(&key, &k, sizeof k);
toku_fill_dbt(&val, &v, sizeof v);
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum);
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(error == 0);
u_int32_t savesum = sum;
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum);
error = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(error == BRT_ALREADY_THERE);
assert(sum==savesum);
......@@ -1847,7 +1847,7 @@ static void test_pma_cursor_first(int n) {
int i;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_pma_insert(pma, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), NULL_ARGS, rand4fingerprint, &sum);
r = toku_pma_insert(pma, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
assert(r == 0);
}
for (i=0; ; i++) {
......@@ -2217,7 +2217,7 @@ static void test_nodup_key_insert(int n) {
v = i;
toku_fill_dbt(&key, &k, sizeof k);
toku_fill_dbt(&val, &v, sizeof v);
r = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum);
r = toku_pma_insert(pma, &key, &val, NULL_ARGS, rand4fingerprint, &sum, (LSN*)0);
if (i == 0) {
assert(r == 0);
add_fingerprint_and_check(rand4fingerprint, sum, &expect_fingerprint, &k, sizeof k, &v, sizeof v);
......
......@@ -33,9 +33,9 @@ static void __pma_delete_finish(PMA pma, int here);
/*
* resize the pma array to asksize. zero all array entries starting from startx.
*/
static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx);
static int pma_resize_array(TOKUTXN, FILENUM, DISKOFF, PMA pma, int asksize, int startx, LSN *node_lsn);
static int old_pma_resize_array(PMA pma, int asksize, int startx) {
return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx);
return pma_resize_array((TOKUTXN)0, (FILENUM){0}, (DISKOFF)0, pma, asksize, startx, (LSN*)0);
}
/*
......@@ -363,7 +363,7 @@ static int distribute_data (struct kv_pair *destpairs[], int dcount,
}
}
static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs) {
static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, int n_pairs, struct kv_pair_tag *pairs, LSN *oldnode_lsn, LSN*newnode_lsn) {
INTPAIRARRAY ipa;
MALLOC_N(n_pairs, ipa.array);
if (ipa.array==0) return errno;
......@@ -378,6 +378,8 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
}
ipa.size=j;
int r=toku_log_pmadistribute(txn, toku_txn_get_txnid(txn), filenum, old_diskoff, new_diskoff, ipa);
if (txn && oldnode_lsn) *oldnode_lsn = toku_txn_get_last_lsn(txn);
if (txn && newnode_lsn) *newnode_lsn = toku_txn_get_last_lsn(txn);
// if (0 && pma) {
// printf("Pma state:\n");
// PMA_ITERATE_IDX (pma, pidx, key, keylen, data, datalen,
......@@ -389,7 +391,7 @@ static int pma_log_distribute (TOKUTXN txn, FILENUM filenum, DISKOFF old_diskoff
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */
int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx) {
int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, struct kv_pair *pairs[], int n, int idx, int base, PMA pma, int *new_idx, LSN *node_lsn) {
int i;
int n_present=0;
for (i=0; i<n; i++) {
......@@ -426,7 +428,8 @@ int toku_pmainternal_smooth_region (TOKUTXN txn, FILENUM filenum, DISKOFF diskof
tmppairs, n_saved, pma);
int r = pma_log_distribute(txn, filenum, diskoff, diskoff,
n_saved,
tmppairs);
tmppairs,
node_lsn, node_lsn);
if (r!=0) goto cleanup;
if (pma && !list_empty(&pma->cursors))
__pma_update_my_cursors(pma, tmppairs, n_present);
......@@ -537,13 +540,14 @@ int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
return 0;
}
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz) {
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
unsigned int oldN = pma->N;
unsigned int n = pma_array_size(pma, asksize);
int r = toku_resize_pma_exactly(pma, startz, n);
if (r!=0) return r;
toku_pmainternal_calculate_parameters(pma);
toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return 0;
}
......@@ -769,7 +773,7 @@ int toku_pma_cursor_free (PMA_CURSOR *cursp) {
/* Make some space for a key to go at idx (the thing currently at idx should end up at to the right.) */
/* (Making space may involve moving things around, including the hole at index.) */
int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index) {
int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int idx, unsigned int *new_index, LSN *node_lsn) {
/* Within a range LO to HI we have a limit of how much packing we will tolerate.
* We allow the entire array to be 50% full.
* We allow a region of size lgN to be full.
......@@ -805,7 +809,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
size*=2;
// printf("pma_make_space_realloc %d to %d hi %d\n", pma->N, size, hi);
pma_resize_array(txn, filenum, offset, pma, size, hi);
pma_resize_array(txn, filenum, offset, pma, size, hi, node_lsn);
hi=size;
//printf("doubled N\n");
......@@ -818,7 +822,7 @@ int toku_pmainternal_make_space_at (TOKUTXN txn, FILENUM filenum, DISKOFF offset
//printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density);
{
int sub_new_index;
int r = toku_pmainternal_smooth_region(txn, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index);
int r = toku_pmainternal_smooth_region(txn, filenum, offset, pma->pairs+lo, hi-lo, idx-lo, lo, pma, &sub_new_index, node_lsn);
if (r!=0) return r;
*new_index=sub_new_index+lo;
return 0;
......@@ -870,7 +874,7 @@ int toku_pma_free (PMA *pmap) {
/* Copies keylen and datalen */
/* returns an error if the key is already present. */
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, u_int32_t rand4fingerprint, u_int32_t *fingerprint, LSN *node_lsn) {
int found;
unsigned int idx;
......@@ -885,13 +889,14 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
assert(pma->pairs[idx]);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
int r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx); /* returns the new idx. */
int r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, (LSN*)0); /* returns the new idx. */
if (r!=0) return r;
idx = newidx;
}
......@@ -905,7 +910,9 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKUTXN txn, FILENUM filenum, DISK
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
return toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
int r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return r;
}
}
......@@ -1050,7 +1057,8 @@ static void __pma_delete_at(PMA pma, int here) {
int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN txn, FILENUM filenum, DISKOFF diskoff,
u_int32_t rand4fingerprint, u_int32_t *fingerprint) {
u_int32_t rand4fingerprint, u_int32_t *fingerprint,
LSN *node_lsn) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int r;
unsigned int idx;
......@@ -1069,6 +1077,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r;
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
}
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
......@@ -1078,12 +1087,13 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
assert(pma->pairs[idx]);
}
r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx); /* returns the new idx. */
r = toku_pmainternal_make_space_at (txn, filenum, diskoff, pma, idx, &newidx, node_lsn); /* returns the new idx. */
if (r!=0) return r;
idx=newidx;
}
......@@ -1099,6 +1109,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
r = toku_log_insertinleaf (txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, key, data);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
}
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
......@@ -1218,9 +1229,9 @@ static void __pma_relocate_kvpairs(PMA pma) {
int toku_pma_split(TOKUTXN txn, FILENUM filenum,
DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p,
DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p, LSN *lsn,
DBT *splitk,
DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p) {
DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p, LSN *newlsn) {
int error;
int npairs;
struct kv_pair_tag *pairs;
......@@ -1294,11 +1305,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the second half of pairs into the right pma */
/* Do this first, so that the logging will move the stuff out of the left pma first, and then later when we redistribute in the left PMA, we won't overwrite something. */
n = npairs - spliti;
error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0);
error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0, newlsn);
assert(error == 0);
distribute_data(newpma->pairs, toku_pma_index_limit(newpma), &pairs[spliti], n, newpma);
{
int r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti]);
int r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti], lsn, newlsn);
if (r!=0) { toku_free(pairs); return r; }
}
#if PMA_USE_MEMPOOL
......@@ -1313,11 +1324,11 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the first half of pairs into the left pma */
n = spliti;
error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0); // zeros the elements
error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0, lsn); // zeros the elements
assert(error == 0);
distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma);
{
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0]);
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
}
// Don't have to relocate kvpairs, because these ones are still there.
......@@ -1344,7 +1355,7 @@ static void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n
pma_mfree_kv_pair(pma, pairs[i].pair);
}
int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum) {
int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4fp, u_int32_t *sum, LSN *node_lsn) {
struct kv_pair_tag *newpairs;
int i;
int error;
......@@ -1378,7 +1389,7 @@ int toku_pma_bulk_insert(TOKUTXN txn, FILENUM filenum, DISKOFF diskoff, PMA pma,
}
}
error = pma_resize_array(txn, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0);
error = pma_resize_array(txn, filenum, diskoff, pma, n_newpairs + n_newpairs/4, 0, node_lsn);
if (error) {
__pma_bulk_cleanup(pma, newpairs, n_newpairs);
toku_free(newpairs);
......
......@@ -47,7 +47,7 @@ int toku_pma_n_entries (PMA);
//enum pma_errors toku_pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/);
enum pma_errors toku_pma_insert (PMA, DBT*, DBT*, TOKUTXN, FILENUM, DISKOFF, u_int32_t /*random for fingerprint */, u_int32_t */*fingerprint*/, LSN *node_lsn);
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
......@@ -61,7 +61,8 @@ int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fin
int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
TOKUTXN /*txn*/, FILENUM, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/);
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/,
LSN */*node_lsn*/);
/* Exposes internals of the PMA by returning a pointer to the guts.
......@@ -87,9 +88,9 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
* The NEWPMA gets keys > pivot key
*/
int toku_pma_split(TOKUTXN, FILENUM,
DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/,
DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/, LSN* /*lsn*/,
DBT */*splitk*/,
DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/);
DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/, LSN* /*newlsn*/);
/*
* Insert several key value pairs into an empty pma.
......@@ -102,7 +103,7 @@ int toku_pma_split(TOKUTXN, FILENUM,
* vals - an array of values
* n_newpairs - the number of key value pairs
*/
int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint);
int toku_pma_bulk_insert(TOKUTXN, FILENUM, DISKOFF, PMA pma, DBT *keys, DBT *vals, int n_newpairs, u_int32_t rand4sem, u_int32_t *fingerprint, LSN */*node_lsn*/);
/* Move the cursor to the beginning or the end or to a key */
int toku_pma_cursor (PMA, PMA_CURSOR *, void** /*sskey*/, void ** /*ssval*/); // the sskey and ssval point to variables that hold blocks that can be used to return values for zero'd DBTS.
......
......@@ -194,6 +194,7 @@ void toku_recover_newbrtnode (struct logtype_newbrtnode *c) {
VERIFY_COUNTS(n);
n->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(n));
assert(r==0);
}
......@@ -246,6 +247,7 @@ void toku_recover_addchild (struct logtype_addchild *le) {
node->u.n.n_bytes_in_buffer[le->childnum] = 0;
node->u.n.n_cursors[le->childnum] = 0;
node->u.n.n_children++;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
......@@ -263,6 +265,7 @@ void toku_recover_setchild (struct logtype_setchild *le) {
assert(node->height>0);
assert(le->childnum < (unsigned)node->u.n.n_children);
node->u.n.children[le->childnum] = le->child;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
......@@ -280,6 +283,7 @@ void toku_recover_setpivot (struct logtype_setpivot *le) {
node->u.n.childkeys[le->childnum] = kv_pair_malloc(le->pivotkey.data, le->pivotkey.len, 0, 0);
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(pair->brt, node->u.n.childkeys[le->childnum]);
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......@@ -297,6 +301,7 @@ void toku_recover_changechildfingerprint (struct logtype_changechildfingerprint
BRTNODE node = node_v;
assert(node->height>0);
BRTNODE_CHILD_SUBTREE_FINGERPRINTS(node, le->childnum) = le->newfingerprint;
node->log_lsn = le->lsn;
r = toku_cachetable_unpin(pair->cf, le->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
......@@ -339,6 +344,7 @@ void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->key.data);
......@@ -359,6 +365,7 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
return r;
}
......@@ -380,6 +387,7 @@ void toku_recover_resizepma (struct logtype_resizepma *c) {
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
}
......@@ -413,6 +421,8 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb);
nodea->log_lsn = c->lsn;
nodeb->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
assert(r==0);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
......@@ -449,6 +459,8 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn)
&nodeb->u.l.n_bytes_in_buffer, &nodea->u.l.n_bytes_in_buffer
);
if (r!=0) goto died1;
nodea->log_lsn = le->lsn;
nodeb->log_lsn = le->lsn;
r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment