Commit d0e64cf7 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log and recover for note splits works a little better. Addresses #27

git-svn-id: file:///svn/tokudb@1691 c7de825b-a66e-492c-adef-691d508d4ae1
parent 89985a31
......@@ -371,9 +371,10 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0);
int r;
r = toku_pma_split(txn, filenum, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, splitk,
A->thisnodename, A->u.l.buffer, &A->u.l.n_bytes_in_buffer, A->rand4fingerprint, &A->local_fingerprint,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
r = toku_pma_split(txn, filenum,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, splitk,
A->thisnodename, A->u.l.buffer, &A->u.l.n_bytes_in_buffer, A->rand4fingerprint, &A->local_fingerprint,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
......
......@@ -884,7 +884,7 @@ static void test_pma_split_n(int n) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, 0,
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
......@@ -956,9 +956,9 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
DBT splitk;
error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, &splitk,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
null_diskoff, pmaa, 0, &splitk,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmab);
......@@ -1032,9 +1032,9 @@ static void test_pma_split_varkey(void) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
pmaa, 0, 0,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmab);
......@@ -1180,9 +1180,10 @@ static void test_pma_split_cursor(void) {
// print_cursor("cursorc", cursorc);
assert_cursor_val(cursorc, 16);
error = toku_pma_split(null_txn, null_filenum, pmaa, 0, 0,
(DISKOFF)0, pmab, 0, brand, &bsum,
(DISKOFF)0, pmac, 0, crand, &csum);
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify_fingerprint(pmab, brand, bsum);
......
......@@ -1341,8 +1341,8 @@ static void __pma_relocate_kvpairs(PMA pma) {
int toku_pma_split(TOKUTXN txn, FILENUM filenum,
PMA origpma, unsigned int *origpma_size, DBT *splitk,
DISKOFF leftdiskoff, PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4fp, u_int32_t *leftfingerprint,
DISKOFF origdiskoff, PMA origpma, unsigned int *origpma_size, DBT *splitk,
DISKOFF leftdiskoff, PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4fp, u_int32_t *leftfingerprint,
DISKOFF rightdiskoff, PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4fp, u_int32_t *rightfingerprint) {
int error;
int npairs;
......@@ -1429,6 +1429,8 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
error = pma_resize_array(txn, filenum, leftdiskoff, leftpma, n + n/4, 0);
assert(error == 0);
distribute_data(leftpma->pairs, toku_pma_index_limit(leftpma), &pairs[0], n, leftpma);
int r = pma_log_distribute(txn, filenum, origdiskoff, leftdiskoff, spliti, &pairs[0]);
if (r!=0) { toku_free(pairs); return r; }
#if PMA_USE_MEMPOOL
__pma_relocate_kvpairs(leftpma);
#endif
......@@ -1440,6 +1442,8 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
error = pma_resize_array(txn, filenum, rightdiskoff, rightpma, n + n/4, 0);
assert(error == 0);
distribute_data(rightpma->pairs, toku_pma_index_limit(rightpma), &pairs[spliti], n, rightpma);
r = pma_log_distribute(txn, filenum, origdiskoff, rightdiskoff, n, &pairs[spliti]);
if (r!=0) { toku_free(pairs); return r; }
#if PMA_USE_MEMPOOL
__pma_relocate_kvpairs(rightpma);
#endif
......
......@@ -79,7 +79,7 @@ enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
* rightpma - the pma assigned keys > pivot key
*/
int toku_pma_split(TOKUTXN, FILENUM,
PMA /*origpma*/, unsigned int */*origpma_size*/, DBT */*splitk*/,
DISKOFF /*origdiskoff*/, PMA /*origpma*/, unsigned int */*origpma_size*/, DBT */*splitk*/,
DISKOFF /*leftdiskoff*/, PMA /*leftpma*/, unsigned int */*leftpma_size*/, u_int32_t /*leftrand4sum*/, u_int32_t */*leftfingerprint*/,
DISKOFF /*rightdiskoff*/, PMA /*rightpma*/, unsigned int */*rightpma_size*/, u_int32_t /*rightrand4sum*/, u_int32_t */*rightfingerprint*/);
......
......@@ -308,7 +308,7 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(pair->cf, c->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
assert(r==0);
r = toku_cachetable_unpin(pair->cf, c->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment